問題描述
存在一些文件名類似 2016-07-16_161.res 2016-07-18_161.res 的文件,文件內容如下:
less 2016-07-16_161.res
1.34.0.68
1.34.1.37
1.34.1.121
1.34.5.87
1.34.5.182
1.34.6.72
1.34.6.245
1.34.9.149
1.34.11.74
1.34.13.161
...
希望通過 Spark 統(tǒng)計出存活超過 n 天的 IP,并可以同時看到該 IP 分別在哪天存活。
因此需要先將這些文件轉換成 Pair RDD,其中的每一項以 IP 為 key,以日期為 value。
編寫 Spark 腳本如下:
rdds = [sc.textFile(f).map(lambda x: (x, f.split('_')[0])) for f in glob('*.res')]
預期得到結果:
rdds[0].first()
(u'210.240.117.126', '2016-07-16')
rdds[1].first()
(u'210.240.117.126', '2016-07-18')
但實際得到的結果為:
rdds[0].first()
(u'210.240.117.126', '2016-07-18')
rdds[1].first()
(u'210.240.117.126', '2016-07-18')
即所有的 RDD 中每一項的 value 都為同一值。
問題定位
看到這個現(xiàn)象基本把可能出問題的點鎖定在了 map(lambda x: (x, f.split('_')[0])) 附近。
查找了 pyspark 中 map 函數(shù)的實現(xiàn),也沒發(fā)現(xiàn)有什么不妥的地方。rdd.py 中 RDD::map() 的實現(xiàn)如下:
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
"""
def func(_, iterator):
return map(f, iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
另一個值得懷疑的點就是 lambda 的行為是不是真的符合預期,但是一直只是懷疑,沒有找到有效的方法來驗證自己的猜測。直到看了 @張通 轉發(fā)的 《Python 中的 lambda 和「真正的」lambda 有什么區(qū)別?》[1],才確認并搞清楚問題發(fā)生的根本原因。
問題發(fā)生的根本原因在于 Python 中的 lambda 在實現(xiàn)上存在缺陷[2],導致 Spark 腳本中傳入 map 函數(shù)的 lambda 表達式共享了同一個變量 f,從而導致了上述問題的發(fā)生。
舉一反三
目前大概可以確定,Python 實現(xiàn)的 lambda 表達式中的變量可能并不像正常的函數(shù)那樣具有獨立的作用域。
以下述代碼為例:
test = [lambda x: x+i for i in range(10)]
print test[0](1)
print test[9](1)
這段代碼并不會如預期的輸出 1 和 10,而是會輸出 10 和 10。使用 dis 模塊分析列表中的任意一個 lambda 表達式得到如下結果。
In [3]: dis.dis(test[0])
1 0 LOAD_FAST 0 (x)
3 LOAD_GLOBAL 0 (i)
6 BINARY_ADD
7 RETURN_VALUE
從上述 Python bytecode 中可以看出 lambda 表達式中的變量 i 的確沒有一個獨立的作用域,而是使用了相對全局的作用域,而此時該作用域中的變量 i 已經變成了 9,因此得到了上述結果。
更近一步,list comprehension 中的變量的作用域又是怎樣的呢?是僅僅作用于 list comprehension 內部,還是也會影響到外部呢?
實測代碼如下:
In [13]: p = 100
In [14]: a = [p for p in range(10)]
In [15]: print a
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [16]: print p
9
可見 list comprehension 中的變量也會對其外的變量產生影響,即 list comprehension 中的變量也不具有獨立的作用域。所以,雖然 list comprehension 具有執(zhí)行效率高和可讀性強等優(yōu)點,在實際的編碼中也需要多注意這些副作用,防止被坑。
問題解決
下面兩個方法均可解決該問題:
def gen_rdd(f):
return sc.textFile(f).map(lambda x: (x, f.split('_')[0]))
rdds = [gen_rdd(f) for f in glob('*.res')]
rdds = map(
lambda f: sc.textFile(f).map(lambda x: (x, f.split('_')[0])), glob('*.res')
)
推薦使用第二種方式。