一.介紹
RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息.類(lèi)似于郵局:將消息發(fā)給郵局,郵局再把消息分發(fā)給目標(biāo).在這個(gè)比喻中 RabbitMQ是郵箱, 郵局和郵遞員小哥.
RabbitMQ和郵局這兩者之間的主要區(qū)別是它不會(huì)處理紙質(zhì)郵件,取而代之的是接收、存儲(chǔ)和發(fā)送二進(jìn)制數(shù)據(jù)塊,也就是我們通常所說(shuō)的消息。
RabbitMQ和消息中,通常會(huì)使用一些專(zhuān)業(yè)術(shù)語(yǔ)。
生產(chǎn):生產(chǎn)意味著就是發(fā)送。 發(fā)送消息的程序是一個(gè)生產(chǎn)者。下圖代表一個(gè)消息的生產(chǎn)者:

隊(duì)列:這里的隊(duì)列是指一個(gè)名稱(chēng),但是名稱(chēng)所代表的隊(duì)列實(shí)體寄存在RabbitMQ服務(wù)器端中。 雖然消息流過(guò)RabbitMQ和您的應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。 隊(duì)列只受主機(jī)的內(nèi)存和磁盤(pán)的限制,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送消息到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。 下圖代表一個(gè)隊(duì)列:

消費(fèi):消費(fèi)具有與接收相似的含義。 消費(fèi)者是一個(gè)主要等待接收消息的程序。下圖代表消息的消費(fèi)者:

請(qǐng)注意,生產(chǎn)者,消費(fèi)者和代理不必駐留在同一主機(jī)上; 實(shí)際上在大多數(shù)應(yīng)用中他們都沒(méi)有在同一臺(tái)主機(jī)上。
(使用python腳本鏈接)
我們將用Python編寫(xiě)兩個(gè)程序; 發(fā)送單個(gè)消息的生產(chǎn)者,以及接收消息并將其打印出來(lái)的消費(fèi)者。我們將掩蓋Python API中的一些細(xì)節(jié),專(zhuān)注于這個(gè)非常簡(jiǎn)單的事情,只是為了開(kāi)始。它是消息傳遞的“Hello World”。
在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列 - RabbitMQ代表消費(fèi)者保留的消息緩沖區(qū)。

Python客戶(hù)端庫(kù)
RabbitMQ說(shuō)多種協(xié)議。本教程使用AMQP 0-9-1,它是一種開(kāi)放的,通用的消息傳遞協(xié)議。RabbitMQ有許多不同語(yǔ)言的客戶(hù)端。我們將使用RabbitMQ提供的Python客戶(hù)端。
現(xiàn)在我們有了Python 客戶(hù)端的及其依賴(lài)項(xiàng),我們可以編寫(xiě)一些代碼.
發(fā)出消息

我們的第一個(gè)程序send.py將向隊(duì)列發(fā)送一條消息。我們需要做的第一件事是建立與RabbitMQ服務(wù)器的連接。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
我們現(xiàn)在已連接到本地計(jì)算機(jī)上的代理 - 所以是localhost。如果我們想要連接到不同機(jī)器上的代理,我們只需在此處指定其名稱(chēng)或IP地址。
接下來(lái),在發(fā)送之前,我們需要確保收件人隊(duì)列存在。如果我們向不存在的位置發(fā)送消息,RabbitMQ將丟棄該消息。讓我們創(chuàng)建一個(gè)將要傳遞消息的hello隊(duì)列:
channel.queue_declare(queue = 'hello')
此時(shí)我們已準(zhǔn)備好發(fā)送消息。我們的第一條消息只包含一個(gè)字符串Hello World!我們想將它發(fā)送到我們的 hello隊(duì)列。
在RabbitMQ中,消息永遠(yuǎn)不能直接發(fā)送到隊(duì)列,它總是需要通過(guò)exchange。但是,讓我們不要被細(xì)節(jié)拖累 - 您可以在本教程的第三部分中閱讀有關(guān)exchanges更多信息。我們現(xiàn)在需要知道的是如何使用由空字符串標(biāo)識(shí)的默認(rèn)交換。這種交換是特殊的 - 它允許我們準(zhǔn)確地指定消息應(yīng)該到達(dá)哪個(gè)隊(duì)列。需要在routing_key參數(shù)中指定隊(duì)列名稱(chēng):
channel.basic_publish(exchange = '',
routing_key = 'hello',
body = 'Hello World!')
print(“[x]發(fā)送'Hello World!'”)
在退出程序之前,我們需要確保刷新網(wǎng)絡(luò)緩沖區(qū)并將消息實(shí)際傳遞給RabbitMQ。我們可以通過(guò)輕輕關(guān)閉連接來(lái)實(shí)現(xiàn)。
connection.close()
發(fā)送不起作用!
如果這是您第一次使用RabbitMQ并且沒(méi)有看到“已發(fā)送”消息,那么您可能會(huì)感到頭疼,想知道可能出現(xiàn)的問(wèn)題。也許代理是在沒(méi)有足夠的可用磁盤(pán)空間的情況下啟動(dòng)的(默認(rèn)情況下它至少需要200 MB空閑),因此拒絕接受消息。檢查代理日志文件以確認(rèn)并在必要時(shí)減少限制。該配置文件文檔會(huì)告訴你如何設(shè)置disk_free_limit。
接收

我們的第二個(gè)程序receive.py將從隊(duì)列接收消息并在屏幕上打印它們。
同樣,首先我們需要連接到RabbitMQ服務(wù)器。負(fù)責(zé)連接Rabbit的代碼與之前相同。
與以前一樣,下一步是確保隊(duì)列存在。使用queue_declare創(chuàng)建隊(duì)列是冪等的 - 我們可以根據(jù)需要多次運(yùn)行命令,并且只創(chuàng)建一個(gè)命令。
channel.queue_declare(queue = 'hello')
您可能會(huì)問(wèn)為什么我們?cè)俅温暶麝?duì)列 - 我們已經(jīng)在之前的代碼中聲明了它。如果我們確定隊(duì)列已經(jīng)存在,我們可以避免這種情況。例如,如果之前運(yùn)行了send.py程序。但我們還不確定首先運(yùn)行哪個(gè)程序。在這種情況下,重復(fù)在兩個(gè)程序中重復(fù)聲明隊(duì)列是一個(gè)好習(xí)慣。
列出隊(duì)列
您可能希望看到RabbitMQ有哪些隊(duì)列以及它們中有多少消息。您可以使用rabbitmqctl工具(作為特權(quán)用戶(hù))執(zhí)行此操作:sudo rabbitmqctl list_queues在Windows上,省略sudo:
rabbitmqctl.bat list_queues
從隊(duì)列接收消息更復(fù)雜。它通過(guò)將回調(diào)(callback)函數(shù)訂閱到隊(duì)列來(lái)工作。每當(dāng)我們收到消息時(shí),Pika庫(kù)都會(huì)調(diào)用此回調(diào)(callback)函數(shù)。在我們的例子中,此功能將在屏幕上顯示消息的內(nèi)容。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
接下來(lái),我們需要告訴RabbitMQ這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的hello隊(duì)列接收消息:
channel.basic_consume(callback,
queue = 'hello',
no_ack = True)
要使該命令成功,我們必須確保存在我們想要訂閱的隊(duì)列。幸運(yùn)的是,我們對(duì)此有信心 - 我們?cè)谏厦鎰?chuàng)建了一個(gè)隊(duì)列 - 使用queue_declare。
該NO_ACK參數(shù)將被描述以后。
最后,我們進(jìn)入一個(gè)永不停止的循環(huán),等待數(shù)據(jù)并在必要時(shí)運(yùn)行回調(diào)。
print('[*]等待消息。退出按CTRL + C')
channel.start_consuming()
把它們放在一起
send.py的完整代碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py的完整代碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
現(xiàn)在我們可以在終端中試用我們的程序。首先,讓我們開(kāi)始一個(gè)消費(fèi)者,它將持續(xù)運(yùn)行等待交付:
python receive.py
#=> [*]正在等待消息。要退出按CTRL + C
#=> [x]收到'Hello World!'
現(xiàn)在開(kāi)始運(yùn)行生產(chǎn)者, 生產(chǎn)者程序在每次運(yùn)行之后都會(huì)停止:
python send.py
#=> [x]發(fā)送'Hello World!'
牛逼!我們能夠通過(guò)RabbitMQ發(fā)送第一條消息。您可能已經(jīng)注意到,receive.py程序不會(huì)退出。它將保持準(zhǔn)備好接收更多消息,并可能被Ctrl-C中斷。
嘗試在新終端中再次運(yùn)行send.py.
我們已經(jīng)學(xué)會(huì)了如何從命名隊(duì)列發(fā)送和接收消息。是時(shí)候轉(zhuǎn)到第2部分 并構(gòu)建一個(gè)簡(jiǎn)單的工作隊(duì)列了。