0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Pandas與PySpark強強聯手,功能與速度齊飛

數據分析與開發(fā) ? 來源:數據分析與開發(fā) ? 2023-04-25 10:51 ? 次閱讀

使用Python做數據處理的數據科學家或數據從業(yè)者,對數據科學包pandas并不陌生,也不乏像主頁君一樣的pandas重度使用者,項目開始寫的第一行代碼,大多是 import pandas as pdpandas做數據處理可以說是yyds!而他的缺點也是非常明顯,pandas 只能單機處理,它不能隨數據量線性伸縮。例如,如果 pandas 試圖讀取的數據集大于一臺機器的可用內存,則會因內存不足而失敗。

另外 pandas 在處理大型數據方面非常慢,雖然有像Dask 或 Vaex 等其他庫來優(yōu)化提升數據處理速度,但在大數據處理神之框架Spark面前,也是小菜一碟。

幸運的是,在新的 Spark 3.2 版本中,出現了一個新的Pandas API,將pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因為 Spark 上的 Pandas API 在后臺使用 Spark,這樣就能達到強強聯手的效果,可以說是非常強大,非常方便。

這一切都始于 2019 年 Spark + AI 峰會。Koalas 是一個開源項目,可以在 Spark 之上使用 Pandas。一開始,它只覆蓋了 Pandas 的一小部分功能,但后來逐漸壯大起來?,F在,在新的 Spark 3.2 版本中,Koalas 已合并到 PySpark。

Spark 現在集成了 Pandas API,因此可以在 Spark 上運行 Pandas。只需要更改一行代碼:

importpyspark.pandasasps

由此我們可以獲得諸多的優(yōu)勢:

  • 如果我們熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需復雜的學習過程而立即使用PySpark。
  • 可以為所有內容使用一個代碼庫:無論是小數據和大數據,還是單機和分布式機器。
  • 可以在Spark分布式框架上,更快地運行 Pandas 代碼。

最后一點尤其值得注意。

一方面,可以將分布式計算應用于在 Pandas 中的代碼。且借助 Spark 引擎,代碼即使在單臺機器上也會更快!下圖展示了在一臺機器(具有 96 個 vCPU 和 384 GiBs 內存)上運行 Spark 和單獨調用 pandas 分析 130GB 的 CSV 數據集的性能對比。

7bf05686-ddad-11ed-bfe3-dac502259ad0.png

多線程和 Spark SQL Catalyst Optimizer 都有助于優(yōu)化性能。例如,Join count 操作在整個階段代碼生成時快 4 倍:沒有代碼生成時為 5.9 秒,代碼生成時為 1.6 秒。

Spark 在鏈式操作(chaining operations)中具有特別顯著的優(yōu)勢。Catalyst 查詢優(yōu)化器可以識別過濾器以明智地過濾數據并可以應用基于磁盤的連接(disk-based joins),而 Pandas 傾向于每一步將所有數據加載到內存中。

現在是不是迫不及待的想嘗試如何在 Spark 上使用 Pandas API 編寫一些代碼?我們現在就開始吧!

在 Pandas / Pandas-on-Spark / Spark 之間切換

需要知道的第一件事是我們到底在使用什么。在使用 Pandas 時,使用類pandas.core.frame.DataFrame。在 Spark 中使用 pandas API 時,使用pyspark.pandas.frame.DataFrame。雖然兩者相似,但不相同。主要區(qū)別在于前者在單機中,而后者是分布式的。

可以使用 Pandas-on-Spark 創(chuàng)建一個 Dataframe 并將其轉換為 Pandas,反之亦然:

#importPandas-on-Spark
importpyspark.pandasasps

#使用Pandas-on-Spark創(chuàng)建一個DataFrame
ps_df=ps.DataFrame(range(10))

#將Pandas-on-SparkDataframe轉換為PandasDataframe
pd_df=ps_df.to_pandas()

#將PandasDataframe轉換為Pandas-on-SparkDataframe
ps_df=ps.from_pandas(pd_df)

注意,如果使用多臺機器,則在將 Pandas-on-Spark Dataframe 轉換為 Pandas Dataframe 時,數據會從多臺機器傳輸到一臺機器,反之亦然(可參閱PySpark 指南[1])。

還可以將 Pandas-on-Spark Dataframe 轉換為 Spark DataFrame,反之亦然:

#使用Pandas-on-Spark創(chuàng)建一個DataFrame
ps_df=ps.DataFrame(range(10))

#將Pandas-on-SparkDataframe轉換為SparkDataframe
spark_df=ps_df.to_spark()

#將SparkDataframe轉換為Pandas-on-SparkDataframe
ps_df_new=spark_df.to_pandas_on_spark()

數據類型如何改變?

在使用 Pandas-on-Spark 和 Pandas 時,數據類型基本相同。將 Pandas-on-Spark DataFrame 轉換為 Spark DataFrame 時,數據類型會自動轉換為適當的類型(請參閱PySpark 指南[2]

下面的示例顯示了在轉換時是如何將數據類型從 PySpark DataFrame 轉換為 pandas-on-Spark DataFrame。

>>>sdf=spark.createDataFrame([
...(1,Decimal(1.0),1.,1.,1,1,1,datetime(2020,10,27),"1",True,datetime(2020,10,27)),
...],'tinyinttinyint,decimaldecimal,floatfloat,doubledouble,integerinteger,longlong,shortshort,timestamptimestamp,stringstring,booleanboolean,datedate')
>>>sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0),
float: float, double: double, integer: int,
long: bigint, short: smallint, timestamp: timestamp, 
string: string, boolean: boolean, date: date]
psdf=sdf.pandas_api()
psdf.dtypes
tinyint                int8
decimal              object
float               float32
double              float64
integer               int32
long                  int64
short                 int16
timestamp    datetime64[ns]
string               object
boolean                bool
date                 object
dtype: object

Pandas-on-Spark vs Spark 函數

在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函數。注意,Pandas-on-Spark 和 Pandas 在語法上的唯一區(qū)別就是 import pyspark.pandas as ps 一行。

當你看完如下內容后,你會發(fā)現,即使您不熟悉 Spark,也可以通過 Pandas API 輕松使用。

導入庫

#運行Spark
frompyspark.sqlimportSparkSession
spark=SparkSession.builder
.appName("Spark")
.getOrCreate()
#在Spark上運行Pandas
importpyspark.pandasasps

讀取數據

以 old dog iris 數據集為例。

#SPARK
sdf=spark.read.options(inferSchema='True',
header='True').csv('iris.csv')
#PANDAS-ON-SPARK
pdf=ps.read_csv('iris.csv')

選擇

#SPARK
sdf.select("sepal_length","sepal_width").show()
#PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].head()

刪除列

#SPARK
sdf.drop('sepal_length').show()#PANDAS-ON-SPARK
pdf.drop('sepal_length').head()

刪除重復項

#SPARK
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()
#PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].drop_duplicates()

篩選

#SPARK
sdf.filter((sdf.flower_type=="Iris-setosa")&(sdf.petal_length>1.5)).show()
#PANDAS-ON-SPARK
pdf.loc[(pdf.flower_type=="Iris-setosa")&(pdf.petal_length>1.5)].head()

計數

#SPARK
sdf.filter(sdf.flower_type=="Iris-virginica").count()
#PANDAS-ON-SPARK
pdf.loc[pdf.flower_type=="Iris-virginica"].count()

唯一值

#SPARK
sdf.select("flower_type").distinct().show()
#PANDAS-ON-SPARK
pdf["flower_type"].unique()

排序

#SPARK
sdf.sort("sepal_length","sepal_width").show()
#PANDAS-ON-SPARK
pdf.sort_values(["sepal_length","sepal_width"]).head()

分組

#SPARK
sdf.groupBy("flower_type").count().show()
#PANDAS-ON-SPARK
pdf.groupby("flower_type").count()

替換

#SPARK
sdf.replace("Iris-setosa","setosa").show()
#PANDAS-ON-SPARK
pdf.replace("Iris-setosa","setosa").head()

連接

#SPARK
sdf.union(sdf)
#PANDAS-ON-SPARK
pdf.append(pdf)

transform 和 apply 函數應用

有許多 API 允許用戶針對 pandas-on-Spark DataFrame 應用函數,例如:

DataFrame.transform()
DataFrame.apply()
DataFrame.pandas_on_spark.transform_batch()
DataFrame.pandas_on_spark.apply_batch()
Series.pandas_on_spark.transform_batch()

每個 API 都有不同的用途,并且在內部工作方式不同。

transform 和 apply

DataFrame.transform()DataFrame.apply()之間的主要區(qū)別在于,前者需要返回相同長度的輸入,而后者不需要。

#transform
psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pser):
returnpser+1#應該總是返回與輸入相同的長度。

psdf.transform(pandas_plus)

#apply
psdf=ps.DataFrame({'a':[1,2,3],'b':[5,6,7]})
defpandas_plus(pser):
returnpser[pser%2==1]#允許任意長度

psdf.apply(pandas_plus)

在這種情況下,每個函數采用一個 pandas Series,Spark 上的 pandas API 以分布式方式計算函數,如下所示。

7c02cf82-ddad-11ed-bfe3-dac502259ad0.png

在“列”軸的情況下,該函數將每一行作為一個熊貓系列。

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pser):
returnsum(pser)#允許任意長度
psdf.apply(pandas_plus,axis='columns')

上面的示例將每一行的總和計算為pands Series

7c31b34c-ddad-11ed-bfe3-dac502259ad0.png

pandas_on_spark.transform_batchpandas_on_spark.apply_batch

batch 后綴表示 pandas-on-Spark DataFrame 或 Series 中的每個塊。API 對 pandas-on-Spark DataFrame 或 Series 進行切片,然后以 pandas DataFrame 或 Series 作為輸入和輸出應用給定函數。請參閱以下示例:

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pdf):
returnpdf+1#應該總是返回與輸入相同的長度。

psdf.pandas_on_spark.transform_batch(pandas_plus)

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pdf):
returnpdf[pdf.a>1]#允許任意長度

psdf.pandas_on_spark.apply_batch(pandas_plus)

兩個示例中的函數都將 pandas DataFrame 作為 pandas-on-Spark DataFrame 的一個塊,并輸出一個 pandas DataFrame。Spark 上的 Pandas API 將 pandas 數據幀組合為 pandas-on-Spark 數據幀。

7c6fae22-ddad-11ed-bfe3-dac502259ad0.png在 Spark 上使用 pandas API的注意事項

避免shuffle

某些操作,例如sort_values在并行或分布式環(huán)境中比在單臺機器上的內存中更難完成,因為它需要將數據發(fā)送到其他節(jié)點,并通過網絡在多個節(jié)點之間交換數據。

避免在單個分區(qū)上計算

另一種常見情況是在單個分區(qū)上進行計算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分區(qū)規(guī)范。這會將所有數據移動到單個機器中的單個分區(qū)中,并可能導致嚴重的性能下降。對于非常大的數據集,應避免使用此類 API。

不要使用重復的列名

不允許使用重復的列名,因為 Spark SQL 通常不允許這樣做。Spark 上的 Pandas API 繼承了這種行為。例如,見下文:

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'b':[3,4]})
psdf.columns=["a","a"]
Reference 'a' is ambiguous, could be: a, a.;

此外,強烈建議不要使用區(qū)分大小寫的列名。Spark 上的 Pandas API 默認不允許它。

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'A':[3,4]})
Reference 'a' is ambiguous, could be: a, a.;

但可以在 Spark 配置spark.sql.caseSensitive中打開以啟用它,但需要自己承擔風險。

frompyspark.sqlimportSparkSession
builder=SparkSession.builder.appName("pandas-on-spark")
builder=builder.config("spark.sql.caseSensitive","true")
builder.getOrCreate()

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'A':[3,4]})
psdf
   a  A
0  1  3
1  2  4

使用默認索引

pandas-on-Spark 用戶面臨的一個常見問題是默認索引導致性能下降。當索引未知時,Spark 上的 Pandas API 會附加一個默認索引,例如 Spark DataFrame 直接轉換為 pandas-on-Spark DataFrame。

如果計劃在生產中處理大數據,請通過將默認索引配置為distributeddistributed-sequence來使其確保為分布式。

有關配置默認索引的更多詳細信息,請參閱默認索引類型[3]

在 Spark 上使用 pandas API

盡管 Spark 上的 pandas API 具有大部分與 pandas 等效的 API,但仍有一些 API 尚未實現或明確不受支持。因此盡可能直接在 Spark 上使用 pandas API。

例如,Spark 上的 pandas API 沒有實現__iter__(),阻止用戶將所有數據從整個集群收集到客戶端(驅動程序)端。不幸的是,許多外部 API,例如 min、max、sum 等 Python 的內置函數,都要求給定參數是可迭代的。對于 pandas,它開箱即用,如下所示:

>>>importpandasaspd
>>>max(pd.Series([1,2,3]))
3
>>>min(pd.Series([1,2,3]))
1
>>>sum(pd.Series([1,2,3]))
6

Pandas 數據集存在于單臺機器中,自然可以在同一臺機器內進行本地迭代。但是,pandas-on-Spark 數據集存在于多臺機器上,并且它們是以分布式方式計算的。很難在本地迭代,很可能用戶在不知情的情況下將整個數據收集到客戶端。因此,最好堅持使用 pandas-on-Spark API。上面的例子可以轉換如下:

>>>importpyspark.pandasasps
>>>ps.Series([1,2,3]).max()
3
>>>ps.Series([1,2,3]).min()
1
>>>ps.Series([1,2,3]).sum()
6

pandas 用戶的另一個常見模式可能是依賴列表推導式或生成器表達式。但是,它還假設數據集在引擎蓋下是本地可迭代的。因此,它可以在 pandas 中無縫運行,如下所示:

importpandasaspd
data=[]
countries=['London','NewYork','Helsinki']
pser=pd.Series([20.,21.,12.],index=countries)
fortemperatureinpser:
asserttemperature>0
iftemperature>1000:
temperature=None
data.append(temperature**2)

pd.Series(data,index=countries)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64

但是,對于 Spark 上的 pandas API,它的工作原理與上述相同。上面的示例也可以更改為直接使用 pandas-on-Spark API,如下所示:

importpyspark.pandasasps
importnumpyasnp
countries=['London','NewYork','Helsinki']
psser=ps.Series([20.,21.,12.],index=countries)
defsquare(temperature)->np.float64:
asserttemperature>0
iftemperature>1000:
temperature=None
returntemperature**2

psser.apply(square)
London      400.0
New York    441.0
Helsinki    144.0

減少對不同 DataFrame 的操作

Spark 上的 Pandas API 默認不允許對不同 DataFrame(或 Series)進行操作,以防止昂貴的操作。只要有可能,就應該避免這種操作。

寫在最后

到目前為止,我們將能夠在 Spark 上使用 Pandas。這將會導致Pandas 速度的大大提高,遷移到 Spark 時學習曲線的減少,以及單機計算和分布式計算在同一代碼庫中的合并。

審核編輯 :李倩


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規(guī)問題,請聯系本站處理。 舉報投訴
  • API
    API
    +關注

    關注

    2

    文章

    1472

    瀏覽量

    61749
  • 代碼
    +關注

    關注

    30

    文章

    4722

    瀏覽量

    68234
  • 過濾器
    +關注

    關注

    1

    文章

    427

    瀏覽量

    19521

原文標題:Pandas 與 PySpark 強強聯手,功能與速度齊飛

文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發(fā)】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    東芝白電牽手創(chuàng)維 聯手布局中國市場

    近日,東芝生活電器株式會社(TLSC)正式宣布于十月開始與創(chuàng)維旗下白電產業(yè)公司創(chuàng)維電器就白電產品(冰箱,洗衣機,吸塵器)的銷售與生產開展戰(zhàn)略合作。此次合作旨在進一步推進東芝白電業(yè)務在中國市場的開展,通過強聯手促進雙方合作共贏。
    發(fā)表于 09-24 10:28 ?764次閱讀
    東芝白電牽手創(chuàng)維 <b class='flag-5'>強</b><b class='flag-5'>強</b><b class='flag-5'>聯手</b>布局中國市場

    百度Apollo和恩智浦聯手 推出高安全性的ECU安全集成方案

    百度Apollo攜手全球最大汽車電子和人工智能物聯網芯片公司恩智浦半導體(NXP Semiconductors)共同發(fā)布中國首款芯片級ECU信息安全解決方案,推出高安全性的集成式軟硬件平臺,保護汽車電子控制單元(ECU)安全,聯手
    的頭像 發(fā)表于 07-05 15:40 ?6970次閱讀

    聯手,瑞芯微與商湯科技聯合發(fā)布AI人臉識別一站式解決方案

    、RK3399、RK3288三大主力平臺。瑞芯微與商湯科技硬件與軟件的聯手,將加速“中國智造”在人臉識別領域的場景化、商用化落地。
    的頭像 發(fā)表于 08-16 09:47 ?7268次閱讀

    AI時代,存

    以存算,以存訓,以存
    的頭像 發(fā)表于 03-22 09:17 ?2090次閱讀
    AI時代,存<b class='flag-5'>強</b>則<b class='flag-5'>強</b>

    單絲伸儀簡介

    家技術監(jiān)督局鑒定。2、該機采用單片機控制系統(tǒng),自動處理數據,可顯示并打印輸出,采用等速伸長(CRE)檢測原理。3、整機接插件少,可靠性,達到準確、穩(wěn)定、效率高、該機操作簡單方便,具有自檢及斷電保護功能。4、顯示
    發(fā)表于 08-21 15:20

    FPC軟板補設計

    最近在某EDA畫了一塊FPC,有專門的FPC補工具,輸出的GERBER層名也有補信息,在他們平臺下單也可以自動識別補信息,而且還可以少50塊,不知道華秋DFM是否可以識別,如果可以檢查就比較完美了
    發(fā)表于 10-08 15:00

    攜手:世與傳感器領導者TE正式合作

    傳感器市場的領導者TE Connectivity(下文簡稱TE)與本土十大分銷商世正式聯手,二者達成代理協議,此后想購買TE的傳感器,在世和世旗下的世
    發(fā)表于 07-07 09:41 ?976次閱讀

    中國移動和華為聯手將我國的5G建設走在世界的前端

    在韓國已經正式商用,美國開始試商用的情況下,在爭奪全球領先的無形比賽中,我國的5G建設已經在加快腳步。基于中興被制裁和華為被拉近采購“黑名單”的事實,支持龍頭企業(yè)扛起國內5G建設大旗已經是最現實的選擇之一,這其中中國移動和華為的聯手
    發(fā)表于 12-24 09:04 ?2993次閱讀

    百度華為聯手為AI時代打造最強算力

    百度和華為共同宣布:百度槳(PaddlePaddle)深度學習平臺與華為麒麟芯片聯手,雙方將打通深度學習框架與芯片,為AI時代打造最強算力和最流暢的應用體驗。
    發(fā)表于 07-04 10:13 ?753次閱讀

    華為與美的的聯手或將引領進智能家電時代

    聯手!3554億科技巨頭牽手華為,智能家電時代或來臨
    的頭像 發(fā)表于 08-21 09:42 ?3141次閱讀

    鴻蒙系統(tǒng)不再“孤軍奮戰(zhàn)” 華為、美的聯手

    華為和美的聯手,率先完成了全景智能家居一體化戰(zhàn)略,實現了互惠互利,后續(xù)美的還會繼續(xù)推出搭載鴻蒙OS系統(tǒng)的智能家電產品,讓用戶享受到更加智能、便捷的服務。
    的頭像 發(fā)表于 02-25 10:21 ?1243次閱讀
    鴻蒙系統(tǒng)不再“孤軍奮戰(zhàn)” 華為、美的<b class='flag-5'>強</b><b class='flag-5'>強</b><b class='flag-5'>聯手</b>

    芯華章宣布傅勇出任首席技術官,聯手加速打造系統(tǒng)級數字驗證解決方案

    出任首席技術官,帶領聯手的研發(fā)團隊研發(fā)出更多具有競爭優(yōu)勢的數字驗證EDA產品,并實現快速量產和落地,為客戶提供更加靈活、高效的驗證解決方案。 ? 為了將產品盡快推向市場,大規(guī)模集成電路設計廠商在有限的設計周期
    發(fā)表于 09-26 10:03 ?330次閱讀

    Codasip和IAR聯手,共同演示用于RISC-V的雙核鎖步技術

    年 3 月 14 日—— Codasip和IAR共同宣布將聯手為低功耗嵌入式汽車應用提供全新的創(chuàng)新支持,雙方將聯手為客戶提供屢獲殊榮的Codasip L31內核和獲得安全性認證的
    發(fā)表于 03-17 17:26 ?788次閱讀

    新思科技與Arm聯手,加快下一代移動SoC開發(fā)

    新思科技業(yè)界領先的EDA和IP全方位解決方案與Arm全面計算解決方案結合,助力生態(tài)系統(tǒng)應對多裸晶芯片系統(tǒng)設計挑戰(zhàn)。
    發(fā)表于 06-05 11:55 ?492次閱讀

    LoRa和Sigfox的聯手

    領域的技術更迭逐漸走向競合。此番LoRaWAN和Sigfox的兩方巨頭合作,又會為IoT世界帶來怎樣的震撼和影響呢?筆者將基于此展開分析。 01技術
    的頭像 發(fā)表于 07-28 10:09 ?1053次閱讀
    LoRa和Sigfox的<b class='flag-5'>強</b><b class='flag-5'>強</b><b class='flag-5'>聯手</b>