一.介紹
RabbitMQ是一個消息代理:它接受和轉(zhuǎn)發(fā)消息.類似于郵局:將消息發(fā)給郵局,郵局再把消息分發(fā)給目標(biāo).在這個比喻中 RabbitMQ是郵箱, 郵局和郵遞員小哥.
RabbitMQ和郵局這兩者之間的主要區(qū)別是它不會處理紙質(zhì)郵件,取而代之的是接收、存儲和發(fā)送二進(jìn)制數(shù)據(jù)塊,也就是我們通常所說的消息。
RabbitMQ和消息中,通常會使用一些專業(yè)術(shù)語。
生產(chǎn):生產(chǎn)意味著就是發(fā)送。 發(fā)送消息的程序是一個生產(chǎn)者。下圖代表一個消息的生產(chǎn)者:

隊列:這里的隊列是指一個名稱,但是名稱所代表的隊列實體寄存在RabbitMQ服務(wù)器端中。 雖然消息流過RabbitMQ和您的應(yīng)用程序,但它們只能存儲在隊列中。 隊列只受主機的內(nèi)存和磁盤的限制,它本質(zhì)上是一個大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送消息到一個隊列,許多消費者可以嘗試從一個隊列接收數(shù)據(jù)。 下圖代表一個隊列:

消費:消費具有與接收相似的含義。 消費者是一個主要等待接收消息的程序。下圖代表消息的消費者:

請注意,生產(chǎn)者,消費者和代理不必駐留在同一主機上; 實際上在大多數(shù)應(yīng)用中他們都沒有在同一臺主機上。
(使用python腳本鏈接)
我們將用Python編寫兩個程序; 發(fā)送單個消息的生產(chǎn)者,以及接收消息并將其打印出來的消費者。我們將掩蓋Python API中的一些細(xì)節(jié),專注于這個非常簡單的事情,只是為了開始。它是消息傳遞的“Hello World”。
在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費者。中間的框是一個隊列 - RabbitMQ代表消費者保留的消息緩沖區(qū)。

Python客戶端庫
RabbitMQ說多種協(xié)議。本教程使用AMQP 0-9-1,它是一種開放的,通用的消息傳遞協(xié)議。RabbitMQ有許多不同語言的客戶端。我們將使用RabbitMQ提供的Python客戶端。
現(xiàn)在我們有了Python 客戶端的及其依賴項,我們可以編寫一些代碼.
發(fā)出消息

我們的第一個程序send.py將向隊列發(fā)送一條消息。我們需要做的第一件事是建立與RabbitMQ服務(wù)器的連接。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
我們現(xiàn)在已連接到本地計算機上的代理 - 所以是localhost。如果我們想要連接到不同機器上的代理,我們只需在此處指定其名稱或IP地址。
接下來,在發(fā)送之前,我們需要確保收件人隊列存在。如果我們向不存在的位置發(fā)送消息,RabbitMQ將丟棄該消息。讓我們創(chuàng)建一個將要傳遞消息的hello隊列:
channel.queue_declare(queue = 'hello')
此時我們已準(zhǔn)備好發(fā)送消息。我們的第一條消息只包含一個字符串Hello World!我們想將它發(fā)送到我們的 hello隊列。
在RabbitMQ中,消息永遠(yuǎn)不能直接發(fā)送到隊列,它總是需要通過exchange。但是,讓我們不要被細(xì)節(jié)拖累 - 您可以在本教程的第三部分中閱讀有關(guān)exchanges更多信息。我們現(xiàn)在需要知道的是如何使用由空字符串標(biāo)識的默認(rèn)交換。這種交換是特殊的 - 它允許我們準(zhǔn)確地指定消息應(yīng)該到達(dá)哪個隊列。需要在routing_key參數(shù)中指定隊列名稱:
channel.basic_publish(exchange = '',
routing_key = 'hello',
body = 'Hello World!')
print(“[x]發(fā)送'Hello World!'”)
在退出程序之前,我們需要確保刷新網(wǎng)絡(luò)緩沖區(qū)并將消息實際傳遞給RabbitMQ。我們可以通過輕輕關(guān)閉連接來實現(xiàn)。
connection.close()
發(fā)送不起作用!
如果這是您第一次使用RabbitMQ并且沒有看到“已發(fā)送”消息,那么您可能會感到頭疼,想知道可能出現(xiàn)的問題。也許代理是在沒有足夠的可用磁盤空間的情況下啟動的(默認(rèn)情況下它至少需要200 MB空閑),因此拒絕接受消息。檢查代理日志文件以確認(rèn)并在必要時減少限制。該配置文件文檔會告訴你如何設(shè)置disk_free_limit。
接收

我們的第二個程序receive.py將從隊列接收消息并在屏幕上打印它們。
同樣,首先我們需要連接到RabbitMQ服務(wù)器。負(fù)責(zé)連接Rabbit的代碼與之前相同。
與以前一樣,下一步是確保隊列存在。使用queue_declare創(chuàng)建隊列是冪等的 - 我們可以根據(jù)需要多次運行命令,并且只創(chuàng)建一個命令。
channel.queue_declare(queue = 'hello')
您可能會問為什么我們再次聲明隊列 - 我們已經(jīng)在之前的代碼中聲明了它。如果我們確定隊列已經(jīng)存在,我們可以避免這種情況。例如,如果之前運行了send.py程序。但我們還不確定首先運行哪個程序。在這種情況下,重復(fù)在兩個程序中重復(fù)聲明隊列是一個好習(xí)慣。
列出隊列
您可能希望看到RabbitMQ有哪些隊列以及它們中有多少消息。您可以使用rabbitmqctl工具(作為特權(quán)用戶)執(zhí)行此操作:sudo rabbitmqctl list_queues在Windows上,省略sudo:
rabbitmqctl.bat list_queues
從隊列接收消息更復(fù)雜。它通過將回調(diào)(callback)函數(shù)訂閱到隊列來工作。每當(dāng)我們收到消息時,Pika庫都會調(diào)用此回調(diào)(callback)函數(shù)。在我們的例子中,此功能將在屏幕上顯示消息的內(nèi)容。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
接下來,我們需要告訴RabbitMQ這個特定的回調(diào)函數(shù)應(yīng)該從我們的hello隊列接收消息:
channel.basic_consume(callback,
queue = 'hello',
no_ack = True)
要使該命令成功,我們必須確保存在我們想要訂閱的隊列。幸運的是,我們對此有信心 - 我們在上面創(chuàng)建了一個隊列 - 使用queue_declare。
該NO_ACK參數(shù)將被描述以后。
最后,我們進(jìn)入一個永不停止的循環(huán),等待數(shù)據(jù)并在必要時運行回調(diào)。
print('[*]等待消息。退出按CTRL + C')
channel.start_consuming()
把它們放在一起
send.py的完整代碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py的完整代碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
現(xiàn)在我們可以在終端中試用我們的程序。首先,讓我們開始一個消費者,它將持續(xù)運行等待交付:
python receive.py
#=> [*]正在等待消息。要退出按CTRL + C
#=> [x]收到'Hello World!'
現(xiàn)在開始運行生產(chǎn)者, 生產(chǎn)者程序在每次運行之后都會停止:
python send.py
#=> [x]發(fā)送'Hello World!'
牛逼!我們能夠通過RabbitMQ發(fā)送第一條消息。您可能已經(jīng)注意到,receive.py程序不會退出。它將保持準(zhǔn)備好接收更多消息,并可能被Ctrl-C中斷。
嘗試在新終端中再次運行send.py.
我們已經(jīng)學(xué)會了如何從命名隊列發(fā)送和接收消息。是時候轉(zhuǎn)到第2部分 并構(gòu)建一個簡單的工作隊列了。