詭異的 list comprehension 展開

問題描述

存在一些文件名類似 2016-07-16_161.res 2016-07-18_161.res 的文件,文件內容如下:

less 2016-07-16_161.res
1.34.0.68
1.34.1.37
1.34.1.121
1.34.5.87
1.34.5.182
1.34.6.72
1.34.6.245
1.34.9.149
1.34.11.74
1.34.13.161
...

希望通過 Spark 統(tǒng)計出存活超過 n 天的 IP,并可以同時看到該 IP 分別在哪天存活。

因此需要先將這些文件轉換成 Pair RDD,其中的每一項以 IP 為 key,以日期為 value。

編寫 Spark 腳本如下:

rdds = [sc.textFile(f).map(lambda x: (x, f.split('_')[0])) for f in glob('*.res')]

預期得到結果:

rdds[0].first()
(u'210.240.117.126', '2016-07-16')

rdds[1].first()
(u'210.240.117.126', '2016-07-18')

但實際得到的結果為:

rdds[0].first()
(u'210.240.117.126', '2016-07-18')

rdds[1].first()
(u'210.240.117.126', '2016-07-18')

即所有的 RDD 中每一項的 value 都為同一值。

問題定位

看到這個現(xiàn)象基本把可能出問題的點鎖定在了 map(lambda x: (x, f.split('_')[0])) 附近。

查找了 pyspark 中 map 函數(shù)的實現(xiàn),也沒發(fā)現(xiàn)有什么不妥的地方。rdd.py 中 RDD::map() 的實現(xiàn)如下:

    def map(self, f, preservesPartitioning=False):
        """
        Return a new RDD by applying a function to each element of this RDD.
        >>> rdd = sc.parallelize(["b", "a", "c"])
        >>> sorted(rdd.map(lambda x: (x, 1)).collect())
        [('a', 1), ('b', 1), ('c', 1)]
        """
        def func(_, iterator):
            return map(f, iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

另一個值得懷疑的點就是 lambda 的行為是不是真的符合預期,但是一直只是懷疑,沒有找到有效的方法來驗證自己的猜測。直到看了 @張通 轉發(fā)的 《Python 中的 lambda 和「真正的」lambda 有什么區(qū)別?》[1],才確認并搞清楚問題發(fā)生的根本原因。

問題發(fā)生的根本原因在于 Python 中的 lambda 在實現(xiàn)上存在缺陷[2],導致 Spark 腳本中傳入 map 函數(shù)的 lambda 表達式共享了同一個變量 f,從而導致了上述問題的發(fā)生。

舉一反三

目前大概可以確定,Python 實現(xiàn)的 lambda 表達式中的變量可能并不像正常的函數(shù)那樣具有獨立的作用域。

以下述代碼為例:

test = [lambda x: x+i for i in range(10)]
print test[0](1)
print test[9](1)

這段代碼并不會如預期的輸出 110,而是會輸出 1010。使用 dis 模塊分析列表中的任意一個 lambda 表達式得到如下結果。

In [3]: dis.dis(test[0])
  1           0 LOAD_FAST                0 (x)
              3 LOAD_GLOBAL              0 (i)
              6 BINARY_ADD
              7 RETURN_VALUE

從上述 Python bytecode 中可以看出 lambda 表達式中的變量 i 的確沒有一個獨立的作用域,而是使用了相對全局的作用域,而此時該作用域中的變量 i 已經變成了 9,因此得到了上述結果。

更近一步,list comprehension 中的變量的作用域又是怎樣的呢?是僅僅作用于 list comprehension 內部,還是也會影響到外部呢?

實測代碼如下:

In [13]: p = 100

In [14]: a = [p for p in range(10)]

In [15]: print a
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [16]: print p
9

可見 list comprehension 中的變量也會對其外的變量產生影響,即 list comprehension 中的變量也不具有獨立的作用域。所以,雖然 list comprehension 具有執(zhí)行效率高和可讀性強等優(yōu)點,在實際的編碼中也需要多注意這些副作用,防止被坑。

問題解決

下面兩個方法均可解決該問題:

def gen_rdd(f):
    return sc.textFile(f).map(lambda x: (x, f.split('_')[0]))
    
rdds = [gen_rdd(f) for f in glob('*.res')]
rdds = map(
    lambda f: sc.textFile(f).map(lambda x: (x, f.split('_')[0])), glob('*.res')
)

推薦使用第二種方式。

參考鏈接


  1. http://www.zhihu.com/question/22819202/answer/113794339?from=timeline&isappinstalled=0 ?

  2. http://www.yinwang.org/blog-cn/2013/03/26/lisp-dead-alive ?

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容