《Python分布式計算》 第4章 Celery分布式應用 (Distributed Computing with Python)


序言
第1章 并行和分布式計算介紹
第2章 異步編程
第3章 Python的并行計算
第4章 Celery分布式應用
第5章 云平臺部署Python
第6章 超級計算機群使用Python
第7章 測試和調(diào)試分布式應用
第8章 繼續(xù)學習


本章是前面某些知識點的延續(xù)。特別的,本章以實例詳細的探討了異步編程和分布式計算。本章關注Celery,一個復雜的用于構(gòu)建分布應用的Python框架。最后,對比了Celery的對手:PyroPython-RQ

此時,你應該已經(jīng)明白了并行、分布和異步編程的基本含義。如果沒有的話,最好再學習下前面幾章。

搭建多機環(huán)境

學習Celery和其它Python包之前,先來搭建測試環(huán)境。我們開發(fā)的是分布應用,因此需要多機環(huán)境。

可以使用至少兩臺聯(lián)網(wǎng)機器的讀者可以跳過這部分。其余讀者,請繼續(xù)閱讀。對于后者,仍然有免費或便宜的解決方案。

其一是在主機上使用虛擬機VM(例如VirtualBox,https://www.virtualbox.org)。創(chuàng)建幾個VM,安裝Linux,讓它們在后臺運行。因為它們不需要圖像化桌面,所以可以很輕量,使用少量RAM和CPU即可。

另一方法是買幾個便宜的小型計算機主板,比如樹莓派(https://www.raspberrypi.org),在它上面安裝Linux,連上局域網(wǎng)。

第三種方案是用云服務器,比如Amazon EC2,使用它的虛擬機。如果使用這種方法,要確認這些包的端口在防火墻是打開的。

無論是用哪種方法,緊跟著的問題就是沒有在集群上安裝完整的DNS。最便捷的方法是在所有機器上編輯/etc/hosts文件。查看IP地址,為每臺機器起一個名字,并將它們添加到/etc/hosts

我在Mac主機上使用了兩個虛擬機,這是我的hosts文件:

$ cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1             localhost 
fe80::1%lo0 localhost

# Development VMs
192.168.123.150 ubuntu1 ubuntu1.local
192.168.123.151 ubuntu2 ubuntu2.local

相似的,這是我的兩個虛擬機(運行Ubuntu 15.04)上的host文件:

$ cat /etc/hosts
127.0.0.1 localhost
192.168.123.151 ubuntu2
192.168.123.150 ubuntu1

# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

你要確保hosts文件上的IP地址和名字是要使用的機器。本書,會命名這些機器命名為HOST1、HOST2、HOST3等等。

搭建好多機環(huán)境之后,就可以開始寫分布應用了。

安裝Celery

目前為止,我們用的都是Python的標準庫,Celery(http://www.celeryproject.org)是用到的第一個第三方庫。Celery是一個分布任務隊列,就是一個以隊列為基礎的系統(tǒng),和之前的某些例子很像。它還是分布式的,意味著工作進程和保存結(jié)果的和請求的隊列,在不同機器上。

首先安裝Celery和它的依賴。在每臺機器上建立一個虛擬環(huán)境(起名為book),代碼如下(環(huán)境是Unix):

$ pip install virtualenvwrapper

如果這個命令被拒絕,可以加上sudo,用超級用戶權限來安裝virtualenvwrapper,代碼如下:

$ sudo pip install virtualenvwrapper

sudo命令會向你詢問Unix用戶密碼?;蛘?,可以用下面代碼安裝virtualenvwrapper

$ pip install --user virtualenvwrapper

不管使用哪種方法,完成安裝virtualenvwrapper之后,都需要配置它,定義三個環(huán)境變量(用于bash類的shell,假定virtualenvwrapper安裝在/usr/local/bin):

$ export WORKON_HOME=$HOME/venvs
$ export PROJECT_HOME=$HOME/workspace
$ source /usr/local/bin/virtualenvwrapper.sh

你需要修改前置路徑,來決定虛擬環(huán)境所在的位置($WORKON_HOME)和代碼的根目錄($PROJECT_HOME)。virtualenvwrapper.sh的路徑也可能需要變動。這三行代碼最好添加到相關的shell啟動文件(例如,~/.bashrc~/.profile)。

做好了前面的設置,我們就可以創(chuàng)建要使用的虛擬環(huán)境了,如下所示:

$ mkvirtualenv book --python=`which python3.5`

這個命令會在$WORKON_HOME之下建立新的虛擬環(huán)境,名字是book,使用的是Python 3.5。以后,可以用下面命令啟動這個虛擬環(huán)境:

$ workon book

使用虛擬環(huán)境的好處是,可以在里面安裝所有需要的包,而不污染系統(tǒng)的Python。以后不再需要這個虛擬環(huán)境時,可以方便的刪除(參考rmvirtualenv命令)。

現(xiàn)在就可以安裝Celery了。和以前一樣,(在每臺機器上)使用pip

$ pip install celery

該命令可以在激活的虛擬環(huán)境中下載、解壓、安裝所有的依賴。

快完成了,現(xiàn)在只需安裝配置一個中間代理,Celery用它主持任務隊列,并向工作進程(只有一臺機器,HOST1)發(fā)送消息。從文檔中可以看到,Celery支持多種中間代理,包括SQLAlchemyhttp://www.sqlalchemy.org),用以本地開發(fā)和測試。這里推薦使用的中間代理是RabbitMQhttps://www.rabbitmq.com)。

https://www.rabbitmq.com上有安裝指導、文檔和下載。在Mac主機上,安裝的最簡方法是使用homebrewhttp://brew.sh),如下所示:

$ brew install rabbitmq

對于Windows用戶,最好使用官方的安裝包。對于Linux,官方也提供了安裝包。

安裝好RabbitMQ之后,就可以立即使用了。這里還有一個簡單的配置步驟,因為在例子中,訪問隊列不會創(chuàng)建用戶和密碼。只要編輯RabbitMQ的配置文件(通常位于/usr/local/etc/rabbitmq/rabbitmq.config),添加下面的條目,允許網(wǎng)絡中的默認guest賬戶:

[
  {rabbit, [{loopback_users, []}]}
].

手動啟動RabbitMQ,如下所示(服務器腳本可能不在$PATH環(huán)境,通常存儲在/usr/local/sbin):

$ sudo rabbitmq-server

sudo會向你詢問用戶密碼。對于我們的例子,我們不會進一步配置中間代理,使用默認訪客賬戶就行。

注意:感興趣的讀者可以在http://www.rabbitmq.com/admin-guide.html閱讀RabbitMQ的管理指導。

到這里,我們就安裝好了所有需要的東西,可以開始使用Celery了。有另外一個依賴,也值得考慮安裝,盡管不是嚴格需要的,尤其是我們只想使用Celery。它是結(jié)果后臺,即Celery的工作進程用其存儲計算的結(jié)果。它就是Redis(http://redis.io)。安裝Redis是非必須的,但極力推薦安裝,和RabbitMQ類似,Redis運行在另一臺機器上,稱作HOST2。

Redis的安裝十分簡單,安裝代碼適用于Linux,Mac OS X和Windows。我們在Mac上用homebrew安裝,如下:

$ brew install redis

在其它操作系統(tǒng)上,例如Linux,可以方便的用二進制碼安裝(例如對于Ubuntu,sudo apt-get install redis-server)。

啟動Redis的命令如下:

$ sudo redis-server

本章剩下的部分會假定結(jié)果后臺存在,如果沒有安裝,會到時指出配置和代碼的不同。同時,任何在生產(chǎn)環(huán)境中使用Celery的人,都應該考慮使用結(jié)果后臺。

測試安裝

快速嘗試一個例子,以驗證Celery是正確安裝的。我們需要四個終端窗口,三個不同的機器(命名為HOST1、HOST2、HOST3和HOST4)。在HOST1的窗口啟動RabbitMQ(確保rabbitmq-server路徑正確):

HOST1 $ sudo /usr/local/sbin/rabbitmq-server

在HOST2的窗口,啟動Redis(沒安裝的話,跳到下一段):

HOST2 $ sudo /usr/local/bin/redis-server

最后,在HOST3的窗口,創(chuàng)建如下Python文件(記得使用workon book激活虛擬環(huán)境),命名為test.py

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def echo(message):
    return message

這段代碼很簡單。先引入了Celery包,然后定義了一個Celery應用(app),名字是test。這個應用使用HOST1的中間代理RabbitMQ和HOST2的Redis數(shù)據(jù)庫的默認賬戶和消息隊列。

要是想用RabbitMQ作為結(jié)果后臺而不用Redis,需要修改前面的代碼,將backend進行如下修改:

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend=amqp://HOST1')

@app.task
def echo(message):
    return message

有了應用實例,就可以用它裝飾遠程的worker(使用裝飾器@app.task)。在這個例子中,我們裝飾一個簡單的函數(shù),它可以返回傳遞給它的消息(echo)。

之后,在終端HOST3,建立worker池,如下所示:

HOST3 $ celery -A test worker --loglevel=info

記得要在test.py的目錄(或?qū)?code>PYTHONPATH環(huán)境變量指向test.py的目錄),好讓Celery可以引入代碼。

celery命令會默認啟動CPU數(shù)目相同的worker進程。worker會使用test模塊中的應用app(我們可以使用實例的名字celery -A test.app worker),并使用INFO等級在控制臺顯示日志。在我的電腦上(有HyperThreading的四核電腦),Celery默認啟用了八個worker進程。

在HOST4終端,復制test.py代碼,啟動book虛擬環(huán)境,在test.py目錄打開Python shell,如下所示:

HOST4 $ python3.5
Python 3.5.0 (v3.5.0:374f501f4567, Sep 12 2015, 11:00:19)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

從復制的test模塊引入echo函數(shù),如下:

>>> from test import echo

我們現(xiàn)在可以像普通Python函數(shù)一樣調(diào)用echo,echo可以直接在本地(即HOST4)運行,如下所示:

>>> res = echo('Python rocks!')
>>> print(res)
Python rocks!

為了讓HOST3的worker進程運行echo()函數(shù),我們不能像之前那樣直接調(diào)用。我們需要調(diào)用它的delay方法(裝飾器@app.task注入的),見下面的命令:

>>> res = echo.delay('Python rocks!'); print(type(res)); print(res)
<class 'celery.result.AsyncResult'>
1423ec2b-b6c7-4c16-8769-e62e09c1fced
>>> res.ready()
True
>>> res.result
'Python rocks!'

我們看到,調(diào)用echo.delay('Python rocks!')不會返回字符串。相反,它在任務隊列(運行在HOST1的RabbitMQ服務器)中安排了一個請求以執(zhí)行echo函數(shù),并返回Future,準確的說是AsyncResult(Celery的Future)。正如concurrent.futures模塊,這個對象是一個異步調(diào)用結(jié)果的占位符。在我們的例子中,異步調(diào)用的是我們安插在任務隊列的echo函數(shù),調(diào)用它的是其它位置的Celery的worker進程(我們的例子中是HOST3)。

我們可以查詢AsyncResult對象來確定它們是否預備好。如果是的話,我們可以訪問它們的結(jié)果,在我們的例子中是字符串'Python rocks!'。

切換到啟用worker進程的窗口,我們可以看到worker池接收到了echo任務請求,如下所示:

[2015-11-10 08:30:12,869: INFO/MainProcess] Received task: test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced]
[2015-11-10 08:30:12,886: INFO/MainProcess] Task test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced] succeeded in 0.01469148206524551s: 'Python rocks!'

我們現(xiàn)在可以退出Python shell和worker進程(在發(fā)起celery worker命令的終端窗口按CTRL+C):Celery安裝成功。

Celery介紹

什么是分布式任務隊列,Celery是怎么運行分布式任務隊列的呢?分布式任務隊列這種架構(gòu)已經(jīng)存在一定時間了。這是一種master-worker架構(gòu),有一個中間件層,中間件層使用多個任務請求隊列(即任務隊列),和一個用于存儲結(jié)果的隊列(即結(jié)果后臺)。

主進程(也叫作clientproducer)將任務請求安插到某個任務隊列,從結(jié)果后臺獲取數(shù)據(jù)。worker進程訂閱任務隊列以明確任務是什么,并把結(jié)果放到結(jié)果后臺。

這是一個簡單靈活的架構(gòu)。主進程不需要知道有多少個可用的worker,也不需要知道worker運行在哪臺機器。它只需要知道隊列在哪,以及如何發(fā)送任務請求。

worker進程也是如此。它們不需要知道任務請求來自何處,也不需要知道結(jié)果用來做什么。它們只需知道從哪里取得任務,存儲在哪里。

這樣的優(yōu)點是worker的數(shù)量、種類、形態(tài)可以隨意變化,而不對總系統(tǒng)的功能產(chǎn)生影響(但會影響性能和延遲)。分布式任務隊列可以方便地進行擴展(添加新worker),規(guī)劃優(yōu)先級(給隊列定義不同的優(yōu)先級,給不同的隊列安排不同數(shù)量的worker)。

另一個優(yōu)點是,這個去耦合化的系統(tǒng)在原則上,worker和producer可以用不同語言來寫。例如,Python代碼生成的工作由C語言寫的worker進程來做,這樣性能是最高的。

Celery使用了第三方、健壯的、實地驗證的系統(tǒng)來做它的隊列和結(jié)果后臺。推薦的中間代理是RabbitMQ,我們之前用過。RabbitMQ是一個非常復雜的消息代理,有許多特性,本書不會對它做深入探索。結(jié)果后臺也是如此,它可以是一個簡單的RabbitMQ隊列,或者更優(yōu)的,使用專門的服務比如Redis。

下圖展示了典型的使用RabbitMQ和Redis的Celery應用架構(gòu):

每個方框中的進程(即RabbitMQ、Redis、worker和master.py)都可以運行在不同的機器上。小型的安裝方案是將RabbitMQ和Redis放在同一個主機上,worker幾點可能只有一個或兩個。大型方案會使用更多的機器,或者專門的服務器。

更復雜的Celery應用

我們用Celery做兩個簡單有趣的應用。第一個仿照第3章中匯率例子,第二個是一個分布式排序算法。

我們還是使用四臺機器(HOST1、HOST2、HOST3、HOST4)。和以前一樣,HOST1運行RabbitMQ,HOST2運行Redis,HOST3運行Celery的worker,HOST運行主代碼。

先從簡單的例子開始。創(chuàng)建一個Python文件(celery/currency.py),代碼如下(如果你沒有使用Redis,記得修改backend'amqp://HOST1'):

import celery
import urllib.request


app = celery.Celery('currency',
                    broker='amqp://HOST1',
                    backend='redis://HOST2')


URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@app.task
def get_rate(pair, url_tmplt=URL):
    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('pairs', type=str, nargs='+')
    args = parser.parse_args()

    results = [get_rate.delay(pair) for pair in args.pairs]
    for result in results:
        pair, rate = result.get()
        print(pair, rate)

這段代碼和第3章的多線程版本差不多。主要的區(qū)別是,因為使用的是Celery,我們不需要創(chuàng)建隊列,Celery負責建立隊列。另外,除了為每個匯率對建一個線程,我們只需讓worker負責從隊列獲取任務請求,執(zhí)行相應的函數(shù)請求,完畢之后返回結(jié)果。

探討調(diào)用的行為是有益的,比如成功的調(diào)用、由于缺少worker而不工作的調(diào)用、失敗且拋出異常的調(diào)用。我們從成功的調(diào)用開始。

echo的例子一樣,在各自的終端啟動RabbitMQ和Redis(通過redis-serverrabbitmq-server命令)。

然后,在worker主機(HOST3)上,復制currency.py文件,切換到它的目錄,創(chuàng)建worker池(記住,Celery啟動的worker數(shù)目盡可能和CPU核數(shù)一樣多):

HOST3 $ celery -A currency worker --loglevel=info

最后,復制相同的文件到HOST4,并運行如下:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0644
CHFUSD 0.986
GBPUSD 1.5216
GBPEUR 1.4296
CADUSD 0.751
CADEUR 0.7056

一切工作正常,我么得到了五個匯率。如果查看啟動worker池的主機(HOST3),我們會看到類似下圖的日志:

這是日志等級loglevel=info時,Celery worker的日志。每個任務都被分配了一個獨立ID(例如GBP兌USD的任務ID是f8658917-868c-4eb5-b744-6aff997c6dd2),基本的時間信息也被打印了出來。

如果沒有可用的worker呢?最簡單的方法是停止worker(在終端窗口按CTRL+C),返回HOST4的currency.py,如下所示:

OST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR

什么都沒發(fā)生,currency.py一直處于等待worker的狀態(tài)。這樣的狀態(tài)可能也可能不是我們想要的:其一,讓文件等待而不發(fā)生崩潰,是很方便的;其二,我們可能想在一定時間后,停止等待??梢栽?code>result.get()用timeout參數(shù)。

例如,修改代碼,使用result.get(timeout=1),會有如下結(jié)果(還是在沒有worker的情況下):

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
 Traceback (most recent call last):
  File "currency.py", line 29, in <module>
    pair, rate = result.get(timeout=1)
  File "/venvs/book/lib/python3.5/site-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File " /venvs/book/lib/python3.5/site-packages/celery/backends/base.py", line 226, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

當然,我們應該總是使用超時,以捕獲對應的異常,作為錯誤處理的策略。

要記住,默認下,任務隊列是持續(xù)的,它的日志不會停止(Celery允許用戶定制)。這意味著,如果我們現(xiàn)在啟動了一些worker,它們就會開始從隊列獲取懸掛的任務,并返回結(jié)果。我們可以用如下命令清空隊列:

HOST4 $ celery purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 12 messages from 1 known task queue.

接下來看任務產(chǎn)生異常的情況。修改HOST3的currency.py文件,讓get_rate拋出一個異常,如下所示:

@app.task
def get_rate(pair, url_tmplt=URL):
    raise Exception('Booo!')

現(xiàn)在,重啟HOST3的worker池(即HOST3 $ celery -A currency worker --loglevel=info),然后在HOST4啟動主程序:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
Traceback (most recent call last):
  File "currency.py", line 31, in <module>
    pair, rate = result.get(timeout=1)
  File "/Users/fpierfed/Documents/venvs/book/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
Exception: Booo!

所有的worker都拋出了異常,異常傳遞到了調(diào)用的代碼,在首次調(diào)用result.get()返回。

任務拋出任何異常,我們都要小心。遠程運行的代碼失敗的原因可能有很多,不一定和代碼本身有關,因此需要謹慎應對。

Celery可以用如下的方法提供幫助:我們可以用timeout獲取結(jié)果;重新提交失敗的任務(參考task裝飾器的retry參數(shù))。還可以取消任務請求(參考任務的apply_async方法的expires參數(shù),它比之前我們用過的delay功能強大)。

有時,任務圖會很復雜。一項任務的結(jié)果還要傳遞給另一個任務。Celery支持復雜的調(diào)用方式,但是會有性能損耗。

用第二個例子來探討:一個分布式的歸并排序算法。這是包含兩個文件的長代碼:一個是算法本身(mergesory.py),一個是主代碼(main.py)。

歸并排序是一個簡單的基于遞歸二分輸入列表的算法,將兩個部分排序,再將結(jié)果合并。建立一個新的Python文件(celery/mergesort.py),代碼如下:

import celery


app = celery.Celery('mergesort',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def sort(xs):
    lenxs = len(xs)
    if(lenxs <= 1):
        return(xs)

    half_lenxs = lenxs // 2
    left = xs[:half_lenxs]
    right = xs[half_lenxs:]
    return(merge(sort(left), sort(right)))

def merge(left, right):
    nleft = len(left)
    nright = len(right)

    merged = []
    i = 0
    j = 0
    while i < nleft and j < nright:
        if(left[i] < right[j]):
            merged.append(left[i])
            i += 1
        else:
            merged.append(right[j])
            j += 1
    return merged + left[i:] + right[j:]

這段代碼很直白。Celery應用命名為app,它使用RabbitMQ作為任務隊列,使用Redis作為結(jié)果后臺。然后,定義了sort算法,它使用了附屬的merge函數(shù)以合并兩個排好序的子列表,成為一個排好序的單列表。

對于主代碼,另建一個文件(celery/main.py),它的代碼如下:

#!/usr/bin/env python3.5
import random
import time
from celery import group
from mergesort import sort, merge


# Create a list of 1,000,000 elements in random order.
sequence = list(range(1000000))
random.shuffle(sequence)

t0 = time.time()

# Split the sequence in a number of chunks and process those 
# independently.
n = 4
l = len(sequence) // n
subseqs = [sequence[i * l:(i + 1) * l] for i in range(n - 1)]
subseqs.append(sequence[(n - 1) * l:])

# Ask the Celery workers to sort each sub-sequence.
# Use a group to run the individual independent tasks as a unit of work.
partials = group(sort.s(seq) for seq in subseqs)().get()

# Merge all the individual sorted sub-lists into our final result.
result = partials[0]
for partial in partials[1:]:
    result = merge(result, partial)

dt = time.time() - t0
print('Distributed mergesort took %.02fs' % (dt))

# Do the same thing locally and compare the times.
t0 = time.time()
truth = sort(sequence)
dt = time.time() - t0
print('Local mergesort took %.02fs' % (dt))

# Final sanity checks.
assert result == truth
assert result == sorted(sequence)

我們先生成一個足夠長的無序(random.shuffle)整數(shù)序列(sequence = list(range(1000000)))。然后,分成長度相近的子列表(n=4)。

有了子列表,就可以對它們進行并行處理(假設至少有四個可用的worker)。問題是,我們要知道什么時候這些列表排序好了,好進行合并。

Celery提供了多種方法讓任務協(xié)同執(zhí)行,group是其中之一。它可以在一個虛擬的任務里,將并發(fā)的任務捆綁執(zhí)行。group的返回值是GroupResult(與類AsyncResult的層級相同)。如果沒有結(jié)果后臺,GroupResult get()方法是必須要有的。當組中所有的任務完成并返回值,group方法會獲得一個任務簽名(用參數(shù)調(diào)用任務s()方法,比如代碼中的sort.s(seq))的列表。任務簽名是Celery把任務當做參數(shù),傳遞給其它任務(但不執(zhí)行)的機制。

剩下的代碼是在本地合并排好序的列表,每次合并兩個。進行完分布式排序,我們再用相同的算法重新排序原始列表。最后,對比歸并排序結(jié)果與內(nèi)建的sorted調(diào)用。

要運行這個例子,需要啟動RabbitMQ和Redis。然后,在HOST3啟動一些worker,如下所示:

HOST3 $ celery -A mergesort worker --loglevel=info

記得拷貝mergesort.py文件,并切換到其目錄運行(或者,定義PYTHONPATH指向它所在的位置)。

之后,在HOST4上運行:

HOST4 $ python3.5 main.py
Distributed mergesort took 10.84s
Local mergesort took 26.18s

查看Celery日志,我們看到worker池接收并執(zhí)行了n個任務,結(jié)果發(fā)回給了caller。

性能和預想的不一樣。使用多進程(使用multiprocessingconcurrent.futures)來運行,與前面相比,可以有n倍的性能提升(7秒,使用四個worker)。

這是因為Celery同步耗時長,最好在只有不得不用的時候再使用。Celery持續(xù)詢問組中的部分結(jié)果是否準備好,好進行后續(xù)的工作。這會非常消耗資源。

生產(chǎn)環(huán)境中使用Celery

下面是在生產(chǎn)環(huán)境中使用Celery的tips。

第一個建議是在Celery應用中使用配置模塊,而不要在worker代碼中進行配置。假設,配置文件是config.py,可以如下將其傳遞給Celery應用:

import celery
app = celery.Celery('mergesort')
app.config_from_object('config')

然后,與其他可能相關的配置指令一起,在config.py中添加:

BROKER_URL = 'amqp://HOST1'
CELERY_RESULT_BACKEND = 'redis://HOST2'

關于性能的建議是,使用至少兩個隊列,好讓任務按照執(zhí)行時間劃分優(yōu)先級。使用多個隊列,將任務劃分給合適的隊列,是分配worker的簡便方法。Celery提供了詳盡的方法將任務劃分給隊列。分成兩步:首先,配置Celery應用,啟動worker,如下所示:

# In config.py
CELERY_ROUTES = {project.task1': {'queue': 'queue1'},
                    'project.task2': {'queue': 'queue2'}}

為了在隊列中啟動worker,在不同的機器中使用下面的代碼:

HOST3 $ celery –A project worker –Q queue1
HOST5 $ celery –A project worker –Q queue2

使用Celery命令行工具的-c標志,可以控制worker池的大小,例如,啟動一個有八個worker的池:

HOST3 $ celery –A project worker –c 8

說道worker,要注意,Celery默認使用多進程模塊啟動worker池。這意味著,每個worker都是一個完整的Python進程。如果某些worker只處理I/O密集型任務,可以將它們轉(zhuǎn)換成協(xié)程或多線程,像前面的例子。這樣做的話,可以使用-P標志,如下所示:

$ celery –A project worker –P threads

使用線程和協(xié)程可以節(jié)省資源,但不利于CPU制約型任務,如前面的菲波那切數(shù)列的例子。

談到性能,應該盡量避免同步原語(如前面的group()),除非非用不可。當同步無法回避時,好的方法是使用結(jié)果后臺(如Redis)。另外,如果可能的話,要避免傳遞復雜的對象給遠程任務,因為這些對象需要序列化和去序列化,通常很耗時。

額外的,如果不需要某個任務的結(jié)果,應該確保Celery不去獲取這些結(jié)果。這是通過裝飾器@task(ignore_result=True)來做的。如果所有的任務結(jié)果都忽略了,就不必定義結(jié)果后臺。這可以讓性能大幅提高。

除此之外,還要指出,如何啟動worker、在哪里運行worker、如何確保它們持續(xù)運行是很重要的。默認的方法是使用工具,例如supervisord (http://supervisord.org) ,來管理worker進程。

Celery帶有一個supervisord的配置案例(在安裝文件的extra/supervisord目錄)。一個監(jiān)督的優(yōu)秀方案是flower(https://github.com/mher/flower),一個worker的網(wǎng)絡控制和監(jiān)督工具。

最后,RabbitMQ和Redis結(jié)合起來,是一個很好的中間代理和結(jié)果后臺解決方案,適用于大多數(shù)項目。

Celery的替代方案:Python-RQ

Celery的輕量簡易替代方案之一是 Python-RQ (http://python-rq.org)。它單單基于Redis作為任務隊列和結(jié)果后臺。沒有復雜任務或任務路由,使用它很好。

因為Celery和Python-RQ在概念上很像,讓我們立即重寫一個之前的例子。新建一個文件(rq/currency.py),代碼如下:

import urllib.request

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

def get_rate(pair, url_tmplt=URL):
    # raise Exception('Booo!')

    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

這就是之前的匯率例子的代碼。區(qū)別是,與Celery不同,這段代碼不需要依賴Python-RQ或Redis。將這段代碼拷貝到worker節(jié)點(HOST3)。

主程序也同樣簡單。新建一個Python文件(rq/main.py),代碼如下:

#!/usr/bin/env python3
import argparse
import redis
import rq
from currency import get_rate

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

conn = redis.Redis(host='HOST2')
queue = rq.Queue(connection=conn)

jobs = [queue.enqueue(get_rate, pair) for pair in args.pairs]

for job in jobs:
    while job.result is None:
        pass
    print(*job.result)

我們在這里看到Python-RQ是怎么工作的。我們需要連接Redis服務器(HOST2),然后將新建的連接對象傳遞給Queue類構(gòu)造器。結(jié)果Queue對象用來向其提交任務請求。這是通過傳遞函數(shù)對象和其它參數(shù)給queue.enqueue。

函數(shù)排隊調(diào)用的結(jié)果是job實例,它是個異步調(diào)用占位符,之前見過多次。

因為Python-RQ沒有Celery的阻塞AsyncResult.get()方法,我們要手動建一個事件循環(huán),持續(xù)向job實例查詢,以確認是否它們的result不是None這種方法不推薦在生產(chǎn)環(huán)境中使用,因為持續(xù)的查詢會浪費資源,查詢不足會浪費時間,但對于這個簡易例子沒有問題。

為了運行代碼,首先要安裝Python-RQ,用pip進行安裝:

$ pip install rq

在所有機器上都要安裝。然后,在HOST2運行Redis:

$ sudo redis-server

在HOST3上,啟動一些worker。Python-RQ不自動啟動worker池。啟動多個worker的簡易的方法是使用一個文件(start_workers.py):

#!/usr/bin/env python3
import argparse
import subprocess

def terminate(proc, timeout=.5):
    """
    Perform a two-step termination of process `proc`: send a SIGTERM
    and, after `timeout` seconds, send a SIGKILL. This should give 
    `proc` enough time to do any necessary cleanup.
    """
    if proc.poll() is None:
        proc.terminate()
        try:
            proc.wait(timeout)
        except subprocess.TimeoutExpired:
            proc.kill()
    return

parser = argparse.ArgumentParser()
parser.add_argument('N', type=int)
args = parser.parse_args()

workers = []
for _ in range(args.N):
    workers.append(subprocess.Popen(['rqworker',
                                            '-u', 'redis://yippy']))
try:
    running = [w for w in workers if w.poll() is None]
    while running:
        proc = running.pop(0)
        try:
            proc.wait(timeout=1.)
        except subprocess.TimeoutExpired:
            running.append(proc)
except KeyboardInterrupt:
    for w in workers:
        terminate(w)

這個文件會啟動用戶指定書目的Python-RQ worker進程(通過使用rqworker腳本,Python-RQ源碼的一部分),通過Ctrl+C殺死進程。更健壯的方法是使用類似之前提過的supervisord工具。

在HOST3上運行:

HOST3 $ ./start_workers.py 6

現(xiàn)在可以運行代碼。在HOST4,運行main.py

HOST4 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0635
CHFUSD 0.9819
GBPUSD 1.5123
GBPEUR 1.422
CADUSD 0.7484
CADEUR 0.7037

效果與Celery相同。

Celery的替代方案:Pyro

Pyro (http://pythonhosted.org/Pyro4/)的意思是Python Remote Objects,是1998年創(chuàng)建的一個包。因此,它十分穩(wěn)定,且功能完備。

Pyro使用的任務分布方法與Celery和Python-RQ十分不同,它是在網(wǎng)絡中將Python對象作為服務器。然后創(chuàng)建它們的代理對象,讓調(diào)用代碼可以將其看做本地對象。這個架構(gòu)在90年代末的系統(tǒng)很流行,比如COBRA和Java RMI。

Pyro掩蓋了代碼中的對象是本地還是遠程的,是讓人詬病的一點。原因是,遠程代碼運行錯誤的原因很多,當遠程代碼隱藏在代理對象后面執(zhí)行,就不容易發(fā)現(xiàn)錯誤。

另一個詬病的地方是,Pyro在點對點網(wǎng)絡(不是所有主機名都可以解析)中,或者UDP廣播無效的網(wǎng)絡中,很難正確運行。

盡管如此,大多數(shù)開發(fā)者認為Pyro非常簡易,在生產(chǎn)環(huán)境中足夠健壯。

Pyro安裝很簡單,它是純Python寫的,依賴只有幾個,使用pip:

$ pip install pyro4

這個命令會安裝Pyro 4.x和Serpent,后者是Pyro用來編碼和解碼Python對象的序列器。

用Pyro重寫之前的匯率例子,要比用Python-RQ復雜,它需要另一個軟件:Pyro nameserver。但是,不需要中間代理和結(jié)果后臺,因為Pyro對象之間可以直接進行通訊。

Pyro運行原理如下。每個遠程訪問的對象都封裝在處于連接監(jiān)聽的socket服務器框架中。每當調(diào)用遠程對象中的方法,被調(diào)用的方法,連同它的參數(shù),就被序列化并發(fā)送到適當?shù)膶ο?服務器上。此時,遠程對象執(zhí)行被請求的任務,經(jīng)由相同的連接,將結(jié)果發(fā)回到(同樣是序列化的)調(diào)用它的代碼。

因為每個遠程對象自身就可以調(diào)用遠程對象,這個架構(gòu)可以是相當去中心化的。另外,一旦建立通訊,對象之間就是p2p的,這與分布式任務隊列的輕度耦合架構(gòu)十分不同。另一點,每個遠程對象既可以做master,也可以做worker。

接下來重寫匯率的例子,來看看具體是怎么運行的。建立一個Python文件(pyro/worker.py),代碼如下:

import urllib.request
import Pyro4

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@Pyro4.expose(instance_mode="percall")
class Worker(object):
    def get_rate(self, pair, url_tmplt=URL):
        with urllib.request.urlopen(url_tmplt.format(pair)) as res:
            body = res.read()
        return (pair, float(body.strip()))

# Create a Pyro daemon which will run our code.
daemon = Pyro4.Daemon()
uri = daemon.register(Worker)
Pyro4.locateNS().register('MyWorker', uri)

# Sit in an infinite loop accepting connections
print('Accepting connections')
try:
    daemon.requestLoop()
except KeyboardInterrupt:
    daemon.shutdown()
print('All done')

worker的代碼和之前的很像,不同點是將get_rate函數(shù)變成了Worker類的一個方法。變動的原因是,Pyro允許導出類的實例,但不能導出函數(shù)。

剩下的代碼是Pyro特有的。我們需要一個Daemon實例(它本質(zhì)上是后臺的網(wǎng)絡服務器),它會獲得類,并在網(wǎng)絡上發(fā)布,好讓其它的代碼可以調(diào)用方法。分成兩步來做:首先,創(chuàng)建一個類Pyro4.Daemon的實例,然后添加類,通過將其傳遞給register方法。

每個Pyro的Daemon實例可以隱藏任意數(shù)目的類。內(nèi)部,需要的話,Daemon對象會創(chuàng)建隱藏類的實例(也就是說,如果沒有代碼需要這個類,相應的Daemon對象就不會將其實例化)。

每一次網(wǎng)絡連接,Daemon對象默認會實例化一次注冊的類,如果要進行并發(fā)任務,這樣就不可以??梢酝ㄟ^裝飾注冊的類修改,@Pyro4.expose(instance_mode=...)

instance_mode支持的值有三個:single、sessionpercall。使用single意味Daemon只為類創(chuàng)建一個實例,使用它應付所有的客戶請求。也可以通過注冊一個類的實例(而不是類本身)。

使用session可以采用默認模式:每個client連接都會得到一個新的實例,client始終都會使用它。使用instance_mode="percall",會為每個遠程方法調(diào)用建立一個新實例。

無論創(chuàng)建實例的模式是什么,用Daemon對象注冊一個類(或?qū)嵗┒紩祷匾粋€唯一的識別符(即URI),其它代碼可以用識別符連接對象。我們可以手動傳遞URI,但更方便的方法是在Pyro nameserver中存儲它,這樣通過兩步來做。先找到nameserver,然后給URI注冊一個名字。在前面的代碼中,是通過下面來做的:

Pyro4.locateNS().register('MyWorker', uri)

nameserver的運行類似Python的字典,注冊兩個名字相同的URI,第二個URI就會覆蓋第一個。另外,我們看到,client代碼使用存儲在nameserver中的名字控制了許多遠程對象。這意味著,命名需要特別的留意,尤其是當許多worker進程提供的功能相同時。

最后,在前面的代碼中,我們用daemon.requestLoop()進入了一個Daemon事件循環(huán)。Daemon對象會在無限循環(huán)中服務client的請求。

對于client,創(chuàng)建一個Python文件(pyro/main.py),它的代碼如下:

#!/usr/bin/env python3
import argparse
import time
import Pyro4

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

# Retrieve the rates sequentially.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")

for pair in args.pairs:
    print(worker.get_rate(pair))
print('Sync calls: %.02f seconds' % (time.time() - t0))

# Retrieve the rates concurrently.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

results = [async_worker.get_rate(pair) for pair in args.pairs]
for result in results:
    print(result.value)
print('Async calls: %.02f seconds' % (time.time() - t0))

可以看到,client把相同的工作做了兩次。這么做的原因是展示Pyro兩種調(diào)用方式:同步和異步。

來看代碼,我們使用argparse包從命令行獲得匯率對。然后,對于同步的方式,通過名字worker = Pyro4.Proxy("PYRONAME:MyWorker")獲得了一些遠程worker對象。前綴PYRONAME:告訴Pyro在nameserver中該尋找哪個名字。這樣可以避免手動定位nameserver。

一旦有了worker對象,可以把它當做本地的worker類的實例,向其調(diào)用方法。這就是我們在第一個循環(huán)中做的:

for pair in args.pairs:
    print(worker.get_rate(pair))

對每個worker.get_rate(pair)聲明,Proxy對象會用它的遠程Daemon對象連接,發(fā)送請求,以運行get_rate(pair)。我們例子中的Daemon對象,每次會創(chuàng)建一個Worker類的的實例,并調(diào)用它的get_rate(pair)方法。結(jié)果序列化之后發(fā)送給client,然后打印出來。每個調(diào)用都是同步的,任務完成后會封鎖。

在第二個循環(huán)中,做了同樣的事,但是使用的是異步調(diào)用。我們需要向遠程的類創(chuàng)建一個Proxy對象,然后,將它封裝在一個異步handler中。這就是下面代碼的功能:

worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

我們現(xiàn)在可以在后臺用async_worker獲取匯率。每次調(diào)用async_worker.get_rate(pair)是非阻塞的,會返回一個Pyro4.futures.FutureResult的實例,它和concurrent.futures模塊中Future對象很像。訪問它的value需要等待,直到相應的異步調(diào)用完成。

為了運行這個例子,需要三臺機器的三個窗口:一個是nameserver(HOST1),一個是Worker類和它的Daemon(HOST2),第三個(HOST3)是client(即main.py)。

在第一個終端,啟動nameserver,如下:

HOST1 $ pyro4-ns --host 0.0.0.0
Broadcast server running on 0.0.0.0:9091
NS running on 0.0.0.0:9090 (0.0.0.0)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@0.0.0.0:9090

簡單來說,nameserver綁定為0.0.0.0,任何人都可以連接它。我們沒有設置認證,因此在倒數(shù)第二行彈出了一個警告。

nameserver運行起來了,在第二個終端啟動worker:

HOST2 $ python3.5 worker.py
Accepting connections

Daemon對象接收連接,現(xiàn)在去第三個終端窗口運行client代碼,如下:

HOST3 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Sync calls: 1.55 seconds
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Async calls: 0.29 seconds

結(jié)果和預想一致,IO限制型代碼可以方便的進行擴展,異步代碼的速度六倍于同步代碼。

這里,還有幾個提醒。第一是,Pyro的Daemon實例要能解析主機的名字。如果不能解析,那么它只能接受127.0.0.1的連接,這意味著,不能被遠程連接(只能本地連接)。解決方案是將其與運行的主機進行IP綁定,確保它不是環(huán)回地址。可以用下面的Python代碼選擇一個可用的IP:

from socket import gethostname, gethostbyname_ex

ips = [ip for ip in gethostbyname_ex(gethostname())[-1] 
        if ip != '127.0.0.1']
ip = ips.pop()

另一個要考慮的是:作為Pyro使用“直接連接被命名對象”方法的結(jié)果,很難像Celery和Python-RQ那樣直接啟動一批worker。在Pyro中,必須用不同的名字命名worker,然后用名字進行連接(通過代理)。這就是為什么,Pyro的client用一個mini的規(guī)劃器來向可用的worker分配工作。

另一個要注意的是,nameserver不會跟蹤worker的斷開,因此,用名字尋找一個URI對象不代表對應的遠程Daemon對象是真實運行的。最好總是這樣對待Pyro調(diào)用:遠程服務器的調(diào)用可能成功,也可能不成功。

記住這些點,就可以用Pyro搭建復雜的網(wǎng)絡和分布式應用。

總結(jié)

這一章很長。我們學習了Celery,他是一個強大的包,用以構(gòu)建Python分布式應用。然后學習了Python-RQ,一個輕量且簡易的替代方案。兩個包都是使用分布任務隊列架構(gòu),它是用多個機器來運行相同系統(tǒng)的分布式任務。

然后介紹了另一個替代方案,Pyro。Pyro的機理不同,它使用的是代理方式和遠程過程調(diào)用(RPC)。

兩種方案都有各自的優(yōu)點,你可以選擇自己喜歡的。

下一章會學習將分布式應用部署到云平臺,會很有趣。


序言
第1章 并行和分布式計算介紹
第2章 異步編程
第3章 Python的并行計算
第4章 Celery分布式應用
第5章 云平臺部署Python
第6章 超級計算機群使用Python
第7章 測試和調(diào)試分布式應用
第8章 繼續(xù)學習


最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容