
有時候你在做 Python 數(shù)據(jù)分析的時候,可能會出現(xiàn)這么個情況:用 Pandas 打開一個超大型數(shù)據(jù)集,想得到一些度量(metrics),然后就尷尬地卡住了。
大家都知道,如果你處理大數(shù)據(jù),手里用的是 Pandas,有時要等上一小時才能得到一個 Series 的平均值,甚至都還沒調(diào)用 apply 函數(shù)。這還只是幾百萬行啊,如果是幾十億行,那最好還是用 Spark 之類的高級工具吧。
那么就沒有好辦法了嗎?有的,就有這么一個工具,能夠加速 Python 數(shù)據(jù)分析,既不需要你使用配置更高的硬件設(shè)施,也不必切換編程語言。當(dāng)然,如果你的數(shù)據(jù)集超級超級大,它的最終作用也會有限,但比普通的 Python 擴(kuò)展工具好多了。特別是如果你不用做大量的重建索引,那么這個工具非常適合你。
這個工具叫 Dask,數(shù)據(jù)科學(xué)家 Luciano Strika 專門試用了這個工具,并做了測試,發(fā)現(xiàn) Dask 在做并行數(shù)據(jù)分析時,比常規(guī) Pandas 快出許多倍。
什么是Dask?
Dask 是一個開源項(xiàng)目,能提供 NumPy Arrays,Pandas Dataframes 和常規(guī)列表的抽象,允許你使用多核處理并行運(yùn)行它們。
下面這段直接摘自教程:
Dask 提供模仿了 NumPy,列表和 Pandas 的高級 Array,Bag 以及 DataFrame 集合,但能夠在無法放入主內(nèi)存的數(shù)據(jù)集上并行運(yùn)行。對大型數(shù)據(jù)集來說,Dask 的高級集合是 NumPy 和 Pandas 的替代方案。
聽起來真不錯!于是我(作者Luciano Strika)決定親自試試 Dask Dataframes,并對它們進(jìn)行了幾個基準(zhǔn)測試。
閱讀文檔
我首先閱讀了官方文檔,看看建議我們使用 Dask 做哪些工作。以下是官方文檔(http://docs.dask.org/en/latest/dataframe.html)中的相關(guān)部分:
- 操縱大型數(shù)據(jù)集,即使這些數(shù)據(jù)集無法放入內(nèi)存
- 使用許多核來加速長計(jì)算
- 使用標(biāo)準(zhǔn)Pandas操作(如 groupby,join 和時間序列計(jì)算)對大型數(shù)據(jù)集進(jìn)行分布式計(jì)算
然后在下面,它列出了一些如果使用 Dask Dataframes 會快速完成的事情:
- 算術(shù)運(yùn)算(乘以或添加到Series)
- 常見聚合(平均值,最小值,最大值,求和等)
- 調(diào)用 apply(只要它在索引中,也就是說,不是在 groupby('y')之后'y'不是索引)
- 調(diào)用 value_counts(),drop_duplicates()或corr()
- 使用 loc,isin 和行式選擇進(jìn)行過濾
#returns only the rows where x is >5, by reference (writing on them alters original df)
df2 = df.loc[df['x'] > 5]
#returns only the rows where x is 0,1,2,3 or 4, by reference
df3 = df.x.isin(range(4))
#returns only the rows where x is >5, by read-only reference (can't be written on)
df4 = df[df['x']>5]
如何使用 Dask Dataframes
Dask Dataframes 與 Pandas Dataframes 具有相同的 API,只是聚合和 apply 函數(shù)延遲執(zhí)行,并且需要通過調(diào)用 compute 方法來計(jì)算。要想生成 Dask Dataframe,可以像在 Pandas 中一樣調(diào)用 read_csv 方法,或者,如果給出 Pandas Dataframe df,只需調(diào)用
dd = ddf.from_pandas(df, npartitions=N)
其中 ddf 是你導(dǎo)入 Dask Dataframes 的名稱,而 npartitions 是一個參數(shù),告訴 Dataframe 如何對其進(jìn)行分區(qū)。
根據(jù) StackOverflow 上的說法,建議將 Dataframe 劃分為與計(jì)算機(jī)核數(shù)數(shù)量相同的分區(qū),或者是該數(shù)量的幾倍,因?yàn)槊總€分區(qū)將在不同的線程上運(yùn)行。如果分區(qū)過多,它們之間的通信代價會高很多。
動手吧:做點(diǎn)基準(zhǔn)測試
主要有以下要點(diǎn):
def get_big_mean():
return dfn.salary.mean().compute()
def get_big_mean_old():
return df3.salary.mean()
def get_big_max():
return dfn.salary.max().compute()
def get_big_max_old():
return df3.salary.max()
def get_big_sum():
return dfn.salary.sum().compute()
def get_big_sum_old():
return df3.salary.sum()
def filter_df():
df = dfn[dfn['salary']>5000]
def filter_df_old():
df = df3[df3['salary']>5000]
這里df3是一個常規(guī)的 Pandas Dataframe,擁有 2500 萬行,使用這段腳本生成,其中列是名稱,姓氏和薪水,從列表中隨機(jī)抽樣。我用了一個有 50 行的數(shù)據(jù)集并連接了 500000 次,因?yàn)槲覍Ψ治霰旧聿⒉惶信d趣,但運(yùn)行它時才會。
dfn 就是基于 df3 的 Dask Dataframe。
第一批結(jié)果:不太樂觀
我首先嘗試使用 3 個分區(qū)進(jìn)行測試,因?yàn)槲业碾娔X只有 4 個內(nèi)核,不想過度使用它。這次使用 Dask 的結(jié)果非常差,而且還要等很久才能得到結(jié)果,不過我懷疑這可能是分區(qū)過少的原因:
204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old
131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old
120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old
0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old
可以看到在我使用 Dask 時大多數(shù)操作變慢了很多。這給了我一些啟示,可能必須使用更多分區(qū)才行。產(chǎn)生延遲計(jì)算所花的成本也可以忽略不計(jì)(在某些情況下不到半秒),所以如果重復(fù)使用它,成本不會隨著時間推移而攤銷。
我還用 apply 方法嘗試了這個測試:
def f(x):
return (13*x+5)%7
def apply_random_old():
df3['random']= df3['salary'].apply(f)
def apply_random():
dfn['random']= dfn['salary'].apply(f).compute()
并有非常相似的結(jié)果:
369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old
因此,一般來說,大多數(shù)操作的速度都是初始操作的兩倍,盡管過濾器的速度要快得多。我覺得或許應(yīng)該在上面調(diào)用 compute,所以對這個結(jié)果持保留態(tài)度。
更多分區(qū):加速驚人
得到前面這些不盡人意的結(jié)果之后,我決定我可能只是沒有使用足夠的分區(qū)。畢竟,整件事情的重點(diǎn)是并行運(yùn)行,所以或許我只需進(jìn)一步并行化?所以我嘗試了 8 個分區(qū)的相同測試,得到了如下結(jié)果(省略了非并行數(shù)據(jù)幀的結(jié)果,因?yàn)樗鼈兓鞠嗤?/p>
3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df
112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test
這就對了!大多數(shù)操作的運(yùn)行速度比常規(guī) Dataframe 快十倍,甚至 apply 的運(yùn)行速度也更快了!我還運(yùn)行了 value_count 測試,它只調(diào)用“薪水”Series 上的 value_count 方法。對于上下文,在我足足等 10 分鐘后在常規(guī) Dataframe 上運(yùn)行此測試時,必須終止該過程。這次只用了 50 秒!
所以之前都沒用對工具,它的速度可比常規(guī) Dataframe 快多了。
最后再說一點(diǎn)
鑒于我是在一臺相當(dāng)舊的 4 核 PC 上運(yùn)行了 2500 萬行數(shù)據(jù),所以是相當(dāng)了不起的。所以我的建議是,下回你必須在本地或從單個 AWS 實(shí)例上理數(shù)據(jù)集時,一定要試試 Dask 這個框架。運(yùn)行速度簡直不要太快。