翻譯自“http://zguide.zeromq.org/py:all”
拯救世界
略
開始的假設(shè)
我們假設(shè)你使用ZeroMQ 3.2以上的版本。我們假設(shè)你使用Linux或者類似的操作系統(tǒng)。我們假設(shè)你或多或少能看懂C語(yǔ)言,因?yàn)檫@是示例的默認(rèn)語(yǔ)言。我們假設(shè)當(dāng)看到類似PUSH或SUBSCRIBE這樣的常亮?xí)r,你能知道它們的真名是ZMQ_PUSH或ZMQ_SUBSCRIBE。
獲得示例
略
提問(wèn)和回答
讓我們開始寫代碼。我們從Hello World示例開始。我們會(huì)開發(fā)一個(gè)客戶端和一個(gè)服務(wù)端。客戶端發(fā)送“Hello”到服務(wù)端,服務(wù)端回復(fù)“World”。下面是Python的服務(wù)端,在5555端口啟動(dòng)一個(gè)ZeroMQ socket,從中讀取請(qǐng)求,并回復(fù)“World”給每一個(gè)請(qǐng)求:
# Hello World server in Python
# Binds REP socket to tcp://*:5555
# Expects b"Hello" from client, replies with b"World"
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
# Do some 'work'
time.sleep(1)
# Send reply back to client
socket.send(b"World")
圖2:請(qǐng)求——回復(fù)
<div align=center>
</div>
REQ-REP socket對(duì)是同步的。客戶端在循環(huán)中(或者只發(fā)起一次)發(fā)起zmq_send(),然后發(fā)起zmq_recv()。其它任何順序(比如,一次發(fā)送兩個(gè)消息)都會(huì)導(dǎo)致發(fā)送或接收返回-1。同樣的,服務(wù)端按順序發(fā)起zmq_recv(),然后發(fā)起zmq_send()。
下面是客戶端的代碼:
# Hello World client in Python
# Connects REQ socket to tcp://localhost:5555
# Sends "Hello" to server, expects "World" back
import zmq
context = zmq.Context()
# Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
# Do 10 requests, waiting each time for a response
for request in range(10):
print("Sending request %s …" % request)
socket.send(b"Hello")
# Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
在實(shí)際應(yīng)用看起來(lái)太簡(jiǎn)單了,但是向我們已經(jīng)學(xué)過(guò)的那樣,ZeroMQ有超能力。你可以向這個(gè)服務(wù)端一次發(fā)起幾千個(gè)客戶端,它也會(huì)繼續(xù)快速的工作。為了好玩,先啟動(dòng)客戶端,然后啟動(dòng)服務(wù)端,它仍然可以正常工作,然后想想這意味著什么。
讓我們簡(jiǎn)單的解釋一下這兩個(gè)程序做了什么。它們創(chuàng)建了一個(gè)ZeroMQ的context和一個(gè)socket。不用擔(dān)心這些名詞的意思,稍后會(huì)解釋。服務(wù)端綁定它的REP(回復(fù))socket到5555端口。服務(wù)端在循環(huán)中等待請(qǐng)求,然后每次響應(yīng)一個(gè)回復(fù)??蛻舳税l(fā)送一個(gè)請(qǐng)求,并從服務(wù)端讀取回復(fù)。
如果你殺死服務(wù)端(Ctrl-C),并重新啟動(dòng),客戶端不會(huì)正確的恢復(fù)。從崩潰進(jìn)程中恢復(fù)很不容易。開發(fā)一個(gè)可靠的請(qǐng)求——回復(fù)流很復(fù)雜,我們會(huì)在第四章討論。
這個(gè)場(chǎng)景背后發(fā)生了什么事情,但對(duì)于程序員來(lái)說(shuō)代碼很簡(jiǎn)潔,甚至在大量負(fù)載下也不會(huì)經(jīng)常崩潰。這就是請(qǐng)求——回復(fù)(request-reply)模式,可能是使用ZeroMQ最簡(jiǎn)單的方式。它對(duì)應(yīng)RPC和經(jīng)典的客戶端/服務(wù)端模型。
字符串小提示
除了發(fā)送的字節(jié)數(shù),ZeroMQ不知道你發(fā)送的任何數(shù)據(jù)。這意味著你需要負(fù)責(zé)格式化數(shù)據(jù),讓應(yīng)用程序可以取回?cái)?shù)據(jù)。格式化對(duì)象和負(fù)責(zé)數(shù)據(jù)類型是專業(yè)庫(kù)的工作,比如Protocol Buffers。但是對(duì)于字符串,你需要小心。
在C和其它一些語(yǔ)言中,字符串以null結(jié)尾。我們發(fā)送一個(gè)字符串,比如“HELLO”,會(huì)額外附加一個(gè)null字節(jié):
zmq_send (requester, "Hello", 6, 0);
如果從其它語(yǔ)言發(fā)送字符串,可能不會(huì)包括null字節(jié)。例如,當(dāng)用Python發(fā)送同樣的字符串時(shí):
socket.send ("Hello")
那么發(fā)送到網(wǎng)絡(luò)上的是一個(gè)長(zhǎng)度(較短的字符串只需要一個(gè)字節(jié)),以及字符串的內(nèi)容作為單獨(dú)的字符。
圖3:一個(gè)ZeroMQ字符串
<div align=center>
</div>
如果你用C語(yǔ)言讀取這個(gè)字符串,你會(huì)得到一個(gè)看起來(lái)像字符串的東西,可能偶爾行為像字符串(如果幸運(yùn)的話,這5個(gè)字節(jié)發(fā)現(xiàn)它們后面跟著一個(gè)不知不覺潛伏的null),但它不是一個(gè)嚴(yán)格意義上的字符串。當(dāng)你的客戶端和服務(wù)端的字符串格式不同時(shí),你會(huì)得到怪異的結(jié)果。
當(dāng)你用C語(yǔ)言從ZeroMQ中接收字符串?dāng)?shù)據(jù)時(shí),你不能簡(jiǎn)單的相信它安全的結(jié)束了。每次讀取字符串,你應(yīng)該分配一個(gè)新的buffer,包括一個(gè)額外的字節(jié),拷貝字符串,并用null正確的結(jié)束。
因此,我們確定了規(guī)則,ZeroMQ的字符串是指定長(zhǎng)度的,在網(wǎng)絡(luò)上發(fā)送時(shí)不帶結(jié)尾的null。最簡(jiǎn)單的情況下(我們會(huì)在示例中這么做),一個(gè)ZeroMQ字符串完整的對(duì)應(yīng)一個(gè)ZeroMQ消息幀(message frame),就像圖3所示——一個(gè)長(zhǎng)度加一些字節(jié)。
在C語(yǔ)言中接收一個(gè)ZeroMQ字符串,并傳遞給應(yīng)用程序一個(gè)有效的C語(yǔ)言字符串,我們需要這么做:
// Receive ZeroMQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer
static char *
s_recv (void *socket) {
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
if (size > 255)
size = 255;
buffer [size] = 0;
return strdup (buffer);
}
這是一個(gè)便捷函數(shù),本著復(fù)用的原則,我們寫了一個(gè)類似的s_send函數(shù),以正確的ZeroMQ格式發(fā)送字符串,并打包到頭文件中。
在zhelpers.h中,可以用C語(yǔ)言寫一些簡(jiǎn)短實(shí)用的ZeroMQ程序。它的源碼很長(zhǎng),只提供給C語(yǔ)言開發(fā)者。
獲得版本號(hào)
ZeroMQ does come in several versions and quite often, if you hit a problem, it'll be something that's been fixed in a later version. So it's a useful trick to know exactly what version of ZeroMQ you're actually linking with.
ZeroMQ有很多版本,并且經(jīng)常更新。如果你遇到了問(wèn)題,可能在新版本中已經(jīng)修復(fù)了。因此,了解你正在使用的ZeroMQ版本是一個(gè)很有用的技巧。
以下是獲得ZeroMQ版本的代碼:
# Report 0MQ version
#
# Author: Lev Givon <lev(at)columbia(dot)edu>
import zmq
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.__version__)
對(duì)外發(fā)送消息
第二個(gè)經(jīng)典模式是單向數(shù)據(jù)分發(fā),其中服務(wù)端推送更新到一組客戶端。讓我們看一個(gè)推送天氣更新的示例,其中包括郵編,溫度和相對(duì)濕度。我們會(huì)隨機(jī)生成這些值。
以下是服務(wù)端,我們使用5556端口:
# Weather update server
# Binds PUB socket to tcp://*:5556
# Publishes random weather updates
import zmq
from random import randrange
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
zipcode = randrange(1, 100000)
temperature = randrange(-80, 135)
relhumidity = randrange(10, 60)
socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
There's no start and no end to this stream of updates, it's like a never ending broadcast.
這個(gè)更新流沒有開始和結(jié)束,像一個(gè)永遠(yuǎn)不會(huì)終止的廣播。
以下是客戶端程序,監(jiān)聽更新流,并抓取指定郵編的數(shù)據(jù),默認(rèn)值是紐約:
# Weather update client
# Connects SUB socket to tcp://localhost:5556
# Collects weather updates and finds avg temp in zipcode
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server…")
socket.connect("tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
# Process 5 updates
total_temp = 0
for update_nbr in range(5):
string = socket.recv_string()
zipcode, temperature, relhumidity = string.split()
total_temp += int(temperature)
print("Average temperature for zipcode '%s' was %dF" % (
zip_filter, total_temp / (update_nbr+1))
)
圖4:發(fā)布——訂閱
<div align=center>
</div>
注意,當(dāng)你使用SUB socket時(shí),你需要設(shè)置使用zmq_setsockopt()和SUBSCRIBE設(shè)置一個(gè)訂閱。如果沒有設(shè)置任何訂閱,你不會(huì)收到任何消息。對(duì)于初學(xué)者,這是一個(gè)常見的錯(cuò)誤。訂閱者可以設(shè)置多個(gè)訂閱。如果一個(gè)更新匹配任何一個(gè)訂閱,訂閱者就會(huì)收到更新。訂閱者也可以取消指定的訂閱。一個(gè)訂閱通常,但不是必須的,是一個(gè)可打印的字符串。
PUB-SUB socket對(duì)是異步的??蛻舳嗽谘h(huán)中調(diào)用zmq_recv()(或者只調(diào)用一次)。發(fā)送消息給SUB socket會(huì)導(dǎo)致錯(cuò)誤。類似的,服務(wù)端調(diào)用zmq_send(),但是不能在PUB socket上調(diào)用zmq_recv()。
在ZeroMQ的理論中,它不關(guān)心哪個(gè)終端連接,哪個(gè)終端綁定。但是實(shí)際中存在不成文的區(qū)別,以后會(huì)討論。現(xiàn)在,綁定PUB,連接SUB,除非你的網(wǎng)絡(luò)設(shè)計(jì)不支持。
關(guān)于PUB-SUB sockets,還有一件更重要的事情需要知道:你不知道訂閱者什么時(shí)候開始獲取消息。甚至你啟動(dòng)一個(gè)訂閱者,等待一會(huì)兒,然后啟動(dòng)發(fā)布者,訂閱者總會(huì)錯(cuò)過(guò)發(fā)布者發(fā)送的第一條消息。這是因?yàn)橛嗛喺哌B接到發(fā)布者(需要花費(fèi)一點(diǎn)時(shí)間),發(fā)布者可能已經(jīng)發(fā)送了消息出去。
很多開發(fā)者都遇到了這個(gè)“緩慢加入者”(slow joiner)的癥狀,之后我們會(huì)詳細(xì)解釋。記住,ZeroMQ是異步 I/O,比如在后臺(tái)。你有兩個(gè)節(jié)點(diǎn)做這件事,以如下順序:
- 訂閱者連接到一個(gè)終端,接收并計(jì)數(shù)消息。
- 發(fā)布者綁定到一個(gè)終端,并立即發(fā)送1000條消息。
然后訂閱者很可能收不到任何消息。你會(huì)眨眼,檢查你設(shè)置了正確的過(guò)濾器,然后再次嘗試,但是訂閱者仍然收不到任何消息。
一個(gè)TCP連接涉及到握手,根據(jù)網(wǎng)絡(luò)和節(jié)點(diǎn)之間的跳轉(zhuǎn)數(shù)量,需要花費(fèi)幾毫秒時(shí)間。這個(gè)時(shí)間中,ZeroMQ可以發(fā)送很多消息。為了討論,假設(shè)它需要5毫秒建立連接,同一個(gè)鏈接每秒可以處理1M消息。在這5毫秒中,訂閱者正在連接到發(fā)布者,它讓發(fā)布者只有1毫秒時(shí)間發(fā)送1K消息。
在第2章,我們會(huì)解釋如何同步發(fā)布者和訂閱者,這樣你不會(huì)開始發(fā)布數(shù)據(jù),知道訂閱者真正完成連接。有一個(gè)簡(jiǎn)單愚蠢的方式延遲發(fā)布者,也就是sleep。在實(shí)際應(yīng)用程序中不要這么做。因?yàn)樗鼧O其脆弱,不雅和緩慢。使用sleep驗(yàn)證發(fā)生了什么,然后等到第2章看如何正確的解決。
同步的另一種選擇是簡(jiǎn)單的假設(shè)發(fā)布的數(shù)據(jù)流是無(wú)限的,沒有開始,沒有結(jié)束。還假設(shè)訂閱者不關(guān)心它啟動(dòng)之前丟失的數(shù)據(jù)。我們的天氣客戶端示例基于這種假設(shè)。
客戶端訂閱它選擇的郵編,收集該郵編的100次更新。如果郵編是隨機(jī)分布的,這意味著服務(wù)端大約發(fā)布一千萬(wàn)次更新。你可以啟動(dòng)客戶端,然后啟動(dòng)服務(wù)端,客戶端會(huì)繼續(xù)工作。你可以停止和重啟服務(wù)端,客戶端會(huì)繼續(xù)工作。當(dāng)客戶端收到100次更新,它會(huì)計(jì)算并打印平均值,然后退出。
發(fā)布——訂閱(pub-sub)模式的一些要點(diǎn):
- 每次使用一個(gè)連接調(diào)用,一個(gè)訂閱者可以連接到多個(gè)發(fā)布者。然后數(shù)據(jù)會(huì)交叉到達(dá)(“fair-queued”),因此每個(gè)發(fā)布者都不會(huì)被吞沒。
- 如果發(fā)布者沒有連接的訂閱者,它會(huì)簡(jiǎn)單的丟棄所有消息。
- 如果你使用TCP,有一個(gè)訂閱者很慢,消息會(huì)在發(fā)布者排隊(duì)等候。之后我們會(huì)看如何使用高水位線(high-water mark),讓發(fā)布者避免這種情況。
- 從ZeroMQ v3.x開始,當(dāng)使用已連接的協(xié)議(tcp://或ipc://),過(guò)濾在發(fā)布者端完成。使用epgm://協(xié)議,過(guò)濾在訂閱者端完成。在ZeroMQ v2.x中,所有過(guò)濾都在發(fā)布者端完成。
這是在我筆記本電腦上(2011年 Inter i5處理器)接收和過(guò)濾10M消息花費(fèi)的時(shí)間,不錯(cuò),但沒什么特別的:
$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F
real 0m4.470s
user 0m0.000s
sys 0m0.008s
分而治之
圖5:并行管道
<div align=center>
</div>
作為最后一個(gè)示例,讓我們做一點(diǎn)超級(jí)運(yùn)算,然后喝一杯咖啡。我們的超級(jí)計(jì)算程序是一個(gè)相當(dāng)?shù)湫偷牟⑿刑幚砟P?。我們有?/p>
- 一個(gè)ventilator產(chǎn)生可以并行處理的任務(wù)
- 一組worker處理任務(wù)
- 一個(gè)sink從worker進(jìn)程中收集結(jié)果
實(shí)際中,workers運(yùn)行在超快的盒子中,可能使用GPUs完成大量的運(yùn)算。以下是ventilator。它產(chǎn)生100個(gè)任務(wù),每個(gè)消息告訴worker休眠幾毫米:
# Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>
import zmq
import random
import time
try:
raw_input
except NameError:
# Python 3
raw_input = input
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers…")
# The first message is "0" and signals start of batch
sink.send(b'0')
# Initialize random number generator
random.seed()
# Send 100 tasks
total_msec = 0
for task_nbr in range(100):
# Random workload from 1 to 100 msecs
workload = random.randint(1, 100)
total_msec += workload
sender.send_string(u'%i' % workload)
print("Total expected cost: %s msec" % total_msec)
# Give 0MQ time to deliver
time.sleep(1)
以下是worker程序。它接收一個(gè)消息,休眠接收到的秒數(shù),然后發(fā)出信號(hào),它完成了:
# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>
import sys
import time
import zmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process tasks forever
while True:
s = receiver.recv()
# Simple progress indicator for the viewer
sys.stdout.write('.')
sys.stdout.flush()
# Do the work
time.sleep(int(s)*0.001)
# Send results to sink
sender.send(b'')
以下是sink程序。它收集100個(gè)任務(wù),然后計(jì)算整個(gè)處理時(shí)間,因此我們可以確定workers是并行運(yùn)行:
# Task sink
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>
import sys
import time
import zmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
# Start our clock now
tstart = time.time()
# Process 100 confirmations
for task_nbr in range(100):
s = receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(':')
else:
sys.stdout.write('.')
sys.stdout.flush()
# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
一次的平均耗時(shí)是5秒。當(dāng)啟動(dòng)1,2或4個(gè)works時(shí),從sink獲得的結(jié)果如下:
- 1 worker: total elapsed time: 5034 msecs.
- 2 workers: total elapsed time: 2421 msecs.
- 4 workers: total elapsed time: 1018 msecs.
Let's look at some aspects of this code in more detail:
讓我們看下這些代碼的細(xì)節(jié):
- worker向上連接到ventilator,向下連接到sink。這意味著你可以隨意增加workers。如果workers綁定到它們的終端,你需要(a)更多終端,(b)每次增加一個(gè)worker,修改ventilator和/或sink。我們說(shuō)ventilator和sink是架構(gòu)的穩(wěn)定部分,workers是架構(gòu)的動(dòng)態(tài)部分。
- 我們需要同步啟動(dòng)所有workers。這是ZeroMQ中相對(duì)常見的問(wèn)題,沒有簡(jiǎn)單的解決方案。zmq_connect方法需要花費(fèi)一定時(shí)間。因此,當(dāng)一組workers連接到ventilator,第一個(gè)worker成功的連接會(huì)在短時(shí)間內(nèi)獲得所有消息,而其它的也在連接。如果你不同步啟動(dòng)workers,系統(tǒng)根本不會(huì)并行運(yùn)行。移除ventilator中的wait試試,看會(huì)發(fā)生什么。
- ventilator的PUSH socket平均的發(fā)布任務(wù)到workers(假設(shè)批處理開始之前它們都已經(jīng)連接上了)。這叫做負(fù)載均衡,之后我們會(huì)詳細(xì)討論。
- sink的PULL socket平均的從workers收集結(jié)果。這叫做fair-queuing。
圖6:Fair Queuing
<div align=center>
</div>
管道模式也有“緩慢加入者”(slow joiner)癥狀,導(dǎo)致PUSH sockets不能正確的負(fù)載均衡。如果你正在使用PUSH和PULL,如果其中一個(gè)worker比其它的接收到更多的消息,這是因?yàn)樵揚(yáng)ULL socket比其它的更快加入,在其它workers連上之前獲得更多消息。如果你想完全的負(fù)載均衡,你可能想要閱讀第3章的負(fù)載均衡模式。
使用ZeroMQ編程
看完一些示例后,你肯定急于使用ZeroMQ。在你開始之前,深呼吸,放輕松,并仔細(xì)考慮一些基礎(chǔ)的建議,這會(huì)讓減輕你的壓力和困惑。
- 按部就班的學(xué)習(xí)ZeroMQ。它只是一個(gè)簡(jiǎn)單的API,但其中隱藏各種可能性。慢慢掌握每一個(gè)可能性。
- 書寫整潔的代碼。丑陋的代碼隱藏了問(wèn)題,讓其他人很難幫助你。你可能習(xí)慣了沒有意義的變量名,但閱讀你代碼的人沒有。使用有意義的名稱,而不是“我太粗心了,沒有告訴你這個(gè)變量的實(shí)際意思”。使用一致的縮進(jìn)和整潔的布局。書寫整潔的代碼,你的世界會(huì)更美好。
- 測(cè)試結(jié)果就是你需要的。當(dāng)你的程序不工作時(shí),你應(yīng)該知道哪五行代碼出錯(cuò)了。使用ZeroMQ魔法時(shí)尤其如此,頭幾次嘗試時(shí)總是不工作。
- 當(dāng)你發(fā)現(xiàn)程序不按預(yù)期工作時(shí),把你的程序分為小片段,測(cè)試每一個(gè),看看是否工作。ZeroMQ讓你書寫模塊化代碼;讓它們?yōu)槟闼谩?/li>
- 需要時(shí)創(chuàng)建抽象(類,方法)。如果你復(fù)制/粘貼大量代碼,同時(shí)也復(fù)制/粘貼了錯(cuò)誤。
正確獲得Context
ZeroMQ程序總是從創(chuàng)建一個(gè)context開始,然后使用它創(chuàng)建sockets。在C語(yǔ)言中,調(diào)用zmq_ctx_new()。你應(yīng)該在線程中正確創(chuàng)建和使用一個(gè)context。從技術(shù)上講,context是單個(gè)線程中所有sockets的容器,并且是inproc sockets的傳送帶。如果運(yùn)行時(shí),一個(gè)進(jìn)程中有兩個(gè)context,它們就像單獨(dú)的ZeroMQ實(shí)例。如果這確實(shí)是你想要的,可以,否則請(qǐng)記住:
在一個(gè)進(jìn)程啟動(dòng)時(shí)調(diào)用zmq_ctx_new(),并在結(jié)束時(shí)調(diào)用一次zmq_ctx_destroy()。
如果你正在使用fork()系統(tǒng)調(diào)用,在fork之后,其它子進(jìn)程代碼之前調(diào)用zmq_ctx_new()。通常,你想要在子進(jìn)程中做有趣的事情,在父進(jìn)程中做無(wú)聊的進(jìn)程管理。
干凈的退出
優(yōu)秀的程序員和優(yōu)秀的殺手有同樣的座右銘:完成工作后總是清理干凈。當(dāng)你使用類似Python語(yǔ)言時(shí),資源會(huì)自動(dòng)釋放。使用C語(yǔ)言時(shí),當(dāng)你使用完對(duì)象后,你需要小心的釋放它們,否則你會(huì)獲得內(nèi)存泄漏,不穩(wěn)定的程序,通常會(huì)有不好的報(bào)應(yīng)。
內(nèi)存泄漏是一件事,但是ZeroMQ對(duì)于如何退出程序十分講究。原因是技術(shù)上的,并且很痛苦,結(jié)果是,如果你讓任何sockets打開,zmq_ctx_destroy()函數(shù)會(huì)永遠(yuǎn)掛起。默認(rèn)情況下,即使你關(guān)閉所有sockets,如果有掛起的連接或者發(fā)送,zmq_ctx_destroy()會(huì)永遠(yuǎn)等待,除非在關(guān)閉sockets之前設(shè)置LINGER為零。
我們需要關(guān)心ZeroMQ的messages,sockets和contexts對(duì)象。幸運(yùn)的是,起碼在簡(jiǎn)單的程序中,它很簡(jiǎn)單:
- 盡可能使用zmq_send()和zmq_recv(),因?yàn)樗鼈儽苊饬耸褂脄mq_msg_t對(duì)象。
- 如果你確定使用zmq_msg_recv(),當(dāng)你不再使用時(shí),立即調(diào)用zmq_msg_close()釋放接收到的消息。
- 如果你正在打開和關(guān)閉很多sockets,很可能你需要重新設(shè)計(jì)你的程序。某些情況下,socket句柄直到銷毀context才會(huì)釋放。
- 當(dāng)你退出程序,關(guān)閉你的sockets,然后調(diào)用zmq_ctx_destroy()銷毀context。
最起碼用C語(yǔ)言開發(fā)時(shí)需要小心。對(duì)于自動(dòng)釋放對(duì)象的語(yǔ)言,sockets和contexts會(huì)在離開對(duì)象域時(shí)銷毀。如果你沒有使用這類語(yǔ)言,你需要在類似“final”塊中完成清理工作。
如果你正在使用多線程,情況會(huì)更復(fù)雜。我們會(huì)在下一章介紹多線程,但是有些人會(huì)在完全理解之前試圖使用多線程,以下是退出ZeroMQ多線程程序的快速和不討好的指南。
首先,不要再多個(gè)線程中使用同一個(gè)socket。請(qǐng)不要解釋你認(rèn)為這會(huì)很有趣,請(qǐng)不要這么做。接著,你需要關(guān)閉每個(gè)繼續(xù)請(qǐng)求的socket。正確的方式是設(shè)置一個(gè)低的LINGER值(1秒),并且關(guān)閉socket。當(dāng)你銷毀一個(gè)context時(shí),如果你使用的語(yǔ)言綁定沒有自動(dòng)這么做,請(qǐng)發(fā)送一個(gè)patch。
最后,銷毀context。這會(huì)導(dǎo)致附屬線程中(比如,共享同一個(gè)context)所有阻塞的接收,輪詢或者發(fā)送返回一個(gè)錯(cuò)誤。捕獲該錯(cuò)誤,然后設(shè)置一個(gè)持續(xù)時(shí)間,并在該線程中關(guān)閉socket,最后退出。不要銷毀同一個(gè)context兩次。主線程中的zmq_ctx_destroy會(huì)阻塞,直到所有sockets安全關(guān)閉。
瞧!這足夠復(fù)雜和痛苦。所有語(yǔ)言綁定作者都會(huì)自動(dòng)完成這項(xiàng)工作,不用再做額外的工作。
為什么需要ZeroMQ
你已經(jīng)使用過(guò)ZeroMQ,讓我們回頭看看為什么使用它。
現(xiàn)在,很多程序由橫跨多種網(wǎng)絡(luò)(局域網(wǎng)或者互聯(lián)網(wǎng))的組件構(gòu)成。很多開發(fā)者倒在消息通訊上。有些開發(fā)者使用消息隊(duì)列產(chǎn)品,但是大部分情況他們自己使用TCP或UDP。這些協(xié)議不難使用,但是從A發(fā)送一些字節(jié)到B,以及通過(guò)各種可靠方式傳遞消息之間,有很大的差異。
讓我們看看使用原始TCP時(shí),會(huì)遇到的典型問(wèn)題。所有可復(fù)用的消息層都需要解決所有或大部分這些問(wèn)題:
- 我們?nèi)绾翁幚鞩/O?我們的程序會(huì)阻塞,還是我們?cè)诤笈_(tái)處理I/O?這是一個(gè)關(guān)鍵的設(shè)計(jì)決策。阻塞I/O創(chuàng)建的架構(gòu)不能很好的擴(kuò)展。但是后臺(tái)I/O很難正確使用。
- 我們?nèi)绾翁幚韯?dòng)態(tài)組件,比如臨時(shí)離開的組件?我們把組件在形式上分為“客戶端”和“服務(wù)端”,并且要求服務(wù)端不能消失?如果我想要從服務(wù)端連接到服務(wù)端呢?我們每隔幾秒嘗試重新連接嗎?
- 我們?nèi)绾卧诰W(wǎng)絡(luò)上表示一個(gè)消息?我們?nèi)绾卧O(shè)計(jì)數(shù)據(jù),讓它可以很容易的讀寫,安全的從緩存區(qū)溢出,對(duì)短消息高效,還能勝任很大的視頻?
- 我們?nèi)绾翁幚聿荒芰⒓赐哆f的消息?更具體地說(shuō),如果我們正在等待一個(gè)組件重新在線,我們會(huì)丟棄消息,把它們放到數(shù)據(jù)庫(kù),還是放到內(nèi)存隊(duì)列?
- 我們?cè)谀睦锎鎯?chǔ)消息隊(duì)列?如果組件讀取消息的隊(duì)列很慢會(huì)發(fā)生什么?如何增強(qiáng)我們的隊(duì)列?然后我們的策略是什么?
- 我們?nèi)绾翁幚韥G失的消息?我們等待新的數(shù)據(jù),請(qǐng)求一個(gè)重新發(fā)送,還是創(chuàng)建一些可靠的層,確保消息不會(huì)丟失?如果該層自身崩潰了呢?
- 如果我們需要使用一個(gè)不同的網(wǎng)絡(luò)傳輸協(xié)議呢?比如多路廣播代替TCP單一廣播,或者IPv6?我們需要重寫應(yīng)用程序,還是某個(gè)層中的抽象傳輸協(xié)議?
- 我們?nèi)绾温酚上ⅲ磕馨l(fā)送同樣的消息到多個(gè)端嗎?能發(fā)送回復(fù)到原始的請(qǐng)求者嗎?
- 如何為另一種語(yǔ)言編寫API?是重新實(shí)現(xiàn)網(wǎng)絡(luò)層協(xié)議,還是重新打包一個(gè)庫(kù)?如果是前者,如何保證效率和穩(wěn)定?如果是后者,如何保證互操作性?
- 我們?nèi)绾伪憩F(xiàn)數(shù)據(jù),可以在不同架構(gòu)中讀?。课覀?yōu)閿?shù)據(jù)類型強(qiáng)制指定特定的編碼?這是消息系統(tǒng)還是更高層的工作呢?
- 我們?nèi)绾翁幚砭W(wǎng)絡(luò)錯(cuò)誤?等待然后重試,忽略它們,還是終止?
打開一個(gè)典型的開源項(xiàng)目,比如Hadoop Zookeeper,閱讀src/c/src/zookeeper.c中的C API。當(dāng)我2013年1月閱讀代碼時(shí),里面有4200行沒有注釋的,客戶端/服務(wù)端網(wǎng)絡(luò)通信協(xié)議代碼。它很高效,因?yàn)槭褂胮oll代替了select。但是,Zookeeper應(yīng)該使用通用消息層和注釋清楚的網(wǎng)絡(luò)層協(xié)議。重復(fù)造輪子很浪費(fèi)團(tuán)隊(duì)的時(shí)間。
但是如何開發(fā)一個(gè)可復(fù)用的消息層?這么多項(xiàng)目需要這項(xiàng)技術(shù),為什么人們?nèi)匀皇褂肨CP sockets這種困難的方式,并且一次次解決列表中這些問(wèn)題?
事實(shí)證明,開發(fā)一個(gè)可復(fù)用的消息系統(tǒng)真的很難,這也是為什么幾乎沒有FOSS項(xiàng)目嘗試,以及為什么商業(yè)消息產(chǎn)品很復(fù)雜,很貴,很頑固和很脆弱。在2006年,iMatix設(shè)計(jì)了AMQP,讓FOSS開發(fā)者第一次用上了可復(fù)用的消息系統(tǒng)。AMQP比其它設(shè)計(jì)更好,但仍然很復(fù)雜,昂貴和脆弱。它需要幾周時(shí)間學(xué)習(xí)如何使用,幾個(gè)月時(shí)間創(chuàng)建穩(wěn)定的架構(gòu)。
圖7:最初的消息傳輸
<div align=center>
</div>
絕大部分消息項(xiàng)目,比如AMQP,試圖通過(guò)創(chuàng)造一個(gè)可復(fù)用的新概念——“broker”,完成尋址,路由和隊(duì)列功能來(lái)解決列表中的問(wèn)題。結(jié)果是,在一些未經(jīng)文檔化的協(xié)議頂部的客戶端/服務(wù)端協(xié)議,或者一組APIs中,允許程序與broker交互。Brokers在降低網(wǎng)絡(luò)通訊的復(fù)雜性上是卓越的。但是在產(chǎn)品(比如Zookeeper)中增加基于broker的消息通信會(huì)更糟糕。這意味著增加了一個(gè)額外的大盒子,和一個(gè)新的故障點(diǎn)。Broker迅速成為瓶頸和風(fēng)險(xiǎn)。如果軟件支持它,我們可以增加第二個(gè),第三個(gè)和第四個(gè)broker,以及故障轉(zhuǎn)移計(jì)劃。需要開發(fā)者完成這項(xiàng)工作。它創(chuàng)建了更多可移動(dòng)組件,更復(fù)雜,更容易發(fā)生故障。
一個(gè)broker-centric設(shè)置需要自己的運(yùn)行團(tuán)隊(duì)。你需要不加夸張的日夜監(jiān)控brokers,當(dāng)它們不正常工作時(shí),你需要解決問(wèn)題。你需要盒子,需要備份盒子,還需要人員管理這些盒子。有很多可移動(dòng)組件的,幾個(gè)團(tuán)隊(duì),幾年時(shí)間開發(fā)的大型程序才值得這么做。
圖8:之后的消息傳輸
<div align=center>
</div>
所以中小型應(yīng)用程序開發(fā)者被困住了。不管是避免網(wǎng)絡(luò)編程,或者開發(fā)一個(gè)不能擴(kuò)展得應(yīng)用程序。還是一頭扎進(jìn)網(wǎng)絡(luò)編程,開發(fā)脆弱的,復(fù)雜的,難以維護(hù)的應(yīng)用程序。或者他們投注在一個(gè)消息產(chǎn)品上,依賴昂貴的,容易出問(wèn)題的技術(shù)上。沒有真正好的選擇,這也是為什么消息傳輸還停留在上個(gè)世紀(jì),并激起了強(qiáng)烈的情緒:用戶的負(fù)面情緒,銷售許可證和技術(shù)支持人員卻幸災(zāi)樂禍。
我們需要的是完成消息傳輸工作,但在任何程序中都可以簡(jiǎn)單,方便完成工作的方式。它應(yīng)該是一個(gè)庫(kù),不需要其他依賴。沒有額外的組件,也就沒有額外的風(fēng)險(xiǎn)。它應(yīng)該可以在所有操作系統(tǒng)和編程語(yǔ)言工作。
這就是ZeroMQ:一個(gè)高效的,可嵌入的庫(kù),不需要多少成本,就漂亮的解決了網(wǎng)絡(luò)的大部分問(wèn)題。
尤其是:
- 它在后臺(tái)線程異步處理I/O。這些與應(yīng)用程序交互的線程使用無(wú)鎖的數(shù)據(jù)結(jié)構(gòu),因此并發(fā)的ZeroMQ應(yīng)用程序不需要鎖,信號(hào),或者其他等待狀態(tài)。
- 組件可以動(dòng)態(tài)來(lái)去,ZeroMQ會(huì)自動(dòng)重連。這意味著你可以以任意順序啟動(dòng)組件。你可以創(chuàng)建面向服務(wù)的架構(gòu)(SOAs),服務(wù)端可以隨意加入和離開。
- 需要時(shí)它會(huì)自動(dòng)排隊(duì)消息。它會(huì)聰明的完成這項(xiàng)工作,盡可能在排列消息之前推送到接受者。
- 它有多種方式處理多度的隊(duì)列(稱為“高水位”)。當(dāng)隊(duì)列滿了,ZeroMQ根據(jù)你的消息傳輸類型(稱為模式)自動(dòng)阻塞發(fā)送者,或者丟棄消息。
- 它讓你的應(yīng)用程序可以通過(guò)任意傳輸協(xié)議交互:TCP,多路廣播,進(jìn)程內(nèi)部,進(jìn)程之間。你不需要修改代碼就能使用不同的傳輸協(xié)議。
- 它安全的處理緩慢/阻塞的讀者,根據(jù)消息傳輸模式使用不同的策略。
- 它讓你可以使用各種模式路由消息,比如請(qǐng)求-響應(yīng)和發(fā)布-訂閱。這些模式?jīng)Q定了如何創(chuàng)建網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu),也就是網(wǎng)絡(luò)的結(jié)構(gòu)。
- 它允許你創(chuàng)建代理隊(duì)列,通過(guò)一個(gè)調(diào)用捕獲或轉(zhuǎn)發(fā)消息。代理可以降低網(wǎng)絡(luò)交互的復(fù)雜性。
- 它在網(wǎng)絡(luò)上使用一個(gè)簡(jiǎn)單的框架,投遞整個(gè)消息。如果你發(fā)送一個(gè)10k的消息,你會(huì)收到一個(gè)10k的消息。
- 它沒有強(qiáng)加任何格式在消息上。它們是從零到千兆字節(jié)的二進(jìn)制數(shù)據(jù)塊。當(dāng)你想要表示數(shù)據(jù),你可以在頂部選擇其它產(chǎn)品,比如msgpack,Google的協(xié)議緩存,或者其它的。
- 在某些有意義的情況下,通過(guò)自動(dòng)重試,它可以聰明的處理網(wǎng)絡(luò)錯(cuò)誤。
- 它減少了你的碳排放量。用更少的CPU完成更多工作,意味著你的電腦消耗更少的電量,可以讓你的舊電腦用更長(zhǎng)時(shí)間。
實(shí)際上,ZeroMQ比這做得更多。它在開發(fā)網(wǎng)絡(luò)應(yīng)用程序上有顛覆性的影響。表面上看,它是一個(gè)受socket啟發(fā)的API,你在socket上調(diào)用zmq_recv()和zmq_send()。但是消息處理迅速成為中心點(diǎn),你的應(yīng)用程序很快分解為一組消息處理任務(wù)。它是優(yōu)雅和自然的。它可擴(kuò)展:這些每個(gè)任務(wù)對(duì)應(yīng)一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)之間通過(guò)任意傳輸協(xié)議通訊。兩個(gè)節(jié)點(diǎn)在一個(gè)進(jìn)程(節(jié)點(diǎn)是線程),兩個(gè)節(jié)點(diǎn)在同一個(gè)機(jī)器(節(jié)點(diǎn)是進(jìn)程),或者兩個(gè)在同一個(gè)網(wǎng)絡(luò)(節(jié)點(diǎn)是機(jī)器),都是一樣的,程序代碼不用改變。
Socket的可擴(kuò)展性
讓我們看看實(shí)際中ZeroMQ的擴(kuò)展性。這是一個(gè)啟動(dòng)了一個(gè)天氣服務(wù)端和幾個(gè)并行客戶端的腳本:
wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &
當(dāng)客戶端運(yùn)行,我們使用top命令查看激活的進(jìn)程,看起來(lái)像這樣(在四核機(jī)器上):
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
讓我們想想這里發(fā)生了什么。天氣服務(wù)端只有一個(gè)socket,這里我們讓它并行給5個(gè)客戶端發(fā)送數(shù)據(jù)。我們可以有幾千個(gè)并行客戶端。服務(wù)端應(yīng)用程序看不見它們,不會(huì)直接與它們通訊。所以ZeroMQ socket像一個(gè)小型服務(wù)端,悄悄地接收客戶端請(qǐng)求,只要網(wǎng)絡(luò)可以處理,就會(huì)盡快推送數(shù)據(jù)到客戶端。它是一個(gè)多線程服務(wù)端,充分利用你的CPU。
從ZeroMQ v2.2升級(jí)到ZeroMQ v3.2
略
警告:不穩(wěn)定的示范
典型的網(wǎng)絡(luò)編程建立在一個(gè)普遍的假設(shè)基礎(chǔ)上:一個(gè)socket與一個(gè)連接,一個(gè)終端通訊。有多路廣播協(xié)議,但它們是獨(dú)特的。當(dāng)我們假設(shè)一個(gè)socket等于一個(gè)連接時(shí),我們以特定方式擴(kuò)展架構(gòu)。我們創(chuàng)建邏輯線程,每個(gè)線程處理一個(gè)socket,一個(gè)終端。我們?cè)谶@些線程中處理邏輯和狀態(tài)。
在ZeroMQ世界中,sockets是進(jìn)入小而快的后臺(tái)通訊引擎的入口,該引擎自動(dòng)管理所有連接。你不能查看,操作,打開,關(guān)閉,或者附加狀態(tài)到這些連接。不管你使用阻塞發(fā)送或接收,或者poll,你只能與socket交互,而不是socket為你管理的連接。連接是私有的,不可見的,這是ZeroMQ可擴(kuò)展性的關(guān)鍵。
因?yàn)槟愕拇a與socket交互,所以可以處理任何網(wǎng)絡(luò)協(xié)議的任意連接數(shù)量。ZeroMQ中的消息傳輸模式比你的應(yīng)用程序代碼中的消息傳輸模式更容易擴(kuò)展。
所以,普遍的建設(shè)不再成立。當(dāng)你閱讀代碼示例時(shí),你的代碼試圖映射你已經(jīng)知道的知識(shí)。當(dāng)你看到“socket”,會(huì)想“哦,這表示到另一個(gè)節(jié)點(diǎn)的連接”。這是錯(cuò)誤的。當(dāng)你看到“線程”,會(huì)再次想”哦,線程表示到另一個(gè)節(jié)點(diǎn)的連接“,你的大腦再次錯(cuò)了。
如果你第一次閱讀這篇指南,當(dāng)你真正編寫一段時(shí)間ZeroMQ代碼后,才會(huì)認(rèn)識(shí)到這一點(diǎn)。你可能會(huì)感到困惑,尤其是ZeroMQ讓事情變得這么簡(jiǎn)單,你可能試圖強(qiáng)加這些假設(shè)在ZeroMQ上,但這不對(duì)。