Rabbit MQ 學(xué)習(xí)筆記(一)

一.介紹

RabbitMQ是一個消息代理:它接受和轉(zhuǎn)發(fā)消息.類似于郵局:將消息發(fā)給郵局,郵局再把消息分發(fā)給目標(biāo).在這個比喻中 RabbitMQ是郵箱, 郵局和郵遞員小哥.

RabbitMQ和郵局這兩者之間的主要區(qū)別是它不會處理紙質(zhì)郵件,取而代之的是接收、存儲和發(fā)送二進(jìn)制數(shù)據(jù)塊,也就是我們通常所說的消息。

RabbitMQ和消息中,通常會使用一些專業(yè)術(shù)語。

生產(chǎn):生產(chǎn)意味著就是發(fā)送。 發(fā)送消息的程序是一個生產(chǎn)者。下圖代表一個消息的生產(chǎn)者:

生產(chǎn)者

隊列:這里的隊列是指一個名稱,但是名稱所代表的隊列實體寄存在RabbitMQ服務(wù)器端中。 雖然消息流過RabbitMQ和您的應(yīng)用程序,但它們只能存儲在隊列中。 隊列只受主機的內(nèi)存和磁盤的限制,它本質(zhì)上是一個大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送消息到一個隊列,許多消費者可以嘗試從一個隊列接收數(shù)據(jù)。 下圖代表一個隊列:

queue

消費:消費具有與接收相似的含義。 消費者是一個主要等待接收消息的程序。下圖代表消息的消費者:

消費者

請注意,生產(chǎn)者,消費者和代理不必駐留在同一主機上; 實際上在大多數(shù)應(yīng)用中他們都沒有在同一臺主機上。

(使用python腳本鏈接)
我們將用Python編寫兩個程序; 發(fā)送單個消息的生產(chǎn)者,以及接收消息并將其打印出來的消費者。我們將掩蓋Python API中的一些細(xì)節(jié),專注于這個非常簡單的事情,只是為了開始。它是消息傳遞的“Hello World”。

在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費者。中間的框是一個隊列 - RabbitMQ代表消費者保留的消息緩沖區(qū)。


(生產(chǎn)者) - > [隊列] - >(消費者)

Python客戶端庫

RabbitMQ說多種協(xié)議。本教程使用AMQP 0-9-1,它是一種開放的,通用的消息傳遞協(xié)議。RabbitMQ有許多不同語言的客戶端。我們將使用RabbitMQ提供的Python客戶端。

現(xiàn)在我們有了Python 客戶端的及其依賴項,我們可以編寫一些代碼.

發(fā)出消息

(生產(chǎn)者) - > [隊列]

我們的第一個程序send.py將向隊列發(fā)送一條消息。我們需要做的第一件事是建立與RabbitMQ服務(wù)器的連接。

#!/usr/bin/env python 
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

我們現(xiàn)在已連接到本地計算機上的代理 - 所以是localhost。如果我們想要連接到不同機器上的代理,我們只需在此處指定其名稱或IP地址。

接下來,在發(fā)送之前,我們需要確保收件人隊列存在。如果我們向不存在的位置發(fā)送消息,RabbitMQ將丟棄該消息。讓我們創(chuàng)建一個將要傳遞消息的hello隊列:

channel.queue_declare(queue = 'hello')

此時我們已準(zhǔn)備好發(fā)送消息。我們的第一條消息只包含一個字符串Hello World!我們想將它發(fā)送到我們的 hello隊列。

在RabbitMQ中,消息永遠(yuǎn)不能直接發(fā)送到隊列,它總是需要通過exchange。但是,讓我們不要被細(xì)節(jié)拖累 - 您可以在本教程的第三部分中閱讀有關(guān)exchanges更多信息。我們現(xiàn)在需要知道的是如何使用由空字符串標(biāo)識的默認(rèn)交換。這種交換是特殊的 - 它允許我們準(zhǔn)確地指定消息應(yīng)該到達(dá)哪個隊列。需要在routing_key參數(shù)中指定隊列名稱:

channel.basic_publish(exchange = '',
                      routing_key = 'hello',
                      body = 'Hello World!')
print(“[x]發(fā)送'Hello World!'”)

在退出程序之前,我們需要確保刷新網(wǎng)絡(luò)緩沖區(qū)并將消息實際傳遞給RabbitMQ。我們可以通過輕輕關(guān)閉連接來實現(xiàn)。

connection.close()
發(fā)送不起作用!

如果這是您第一次使用RabbitMQ并且沒有看到“已發(fā)送”消息,那么您可能會感到頭疼,想知道可能出現(xiàn)的問題。也許代理是在沒有足夠的可用磁盤空間的情況下啟動的(默認(rèn)情況下它至少需要200 MB空閑),因此拒絕接受消息。檢查代理日志文件以確認(rèn)并在必要時減少限制。該配置文件文檔會告訴你如何設(shè)置disk_free_limit。

接收

我們的第二個程序receive.py將從隊列接收消息并在屏幕上打印它們。

同樣,首先我們需要連接到RabbitMQ服務(wù)器。負(fù)責(zé)連接Rabbit的代碼與之前相同。

與以前一樣,下一步是確保隊列存在。使用queue_declare創(chuàng)建隊列是冪等的 - 我們可以根據(jù)需要多次運行命令,并且只創(chuàng)建一個命令。

channel.queue_declare(queue = 'hello')

您可能會問為什么我們再次聲明隊列 - 我們已經(jīng)在之前的代碼中聲明了它。如果我們確定隊列已經(jīng)存在,我們可以避免這種情況。例如,如果之前運行了send.py程序。但我們還不確定首先運行哪個程序。在這種情況下,重復(fù)在兩個程序中重復(fù)聲明隊列是一個好習(xí)慣。

列出隊列
您可能希望看到RabbitMQ有哪些隊列以及它們中有多少消息。您可以使用rabbitmqctl工具(作為特權(quán)用戶)執(zhí)行此操作:

sudo rabbitmqctl list_queues

在Windows上,省略sudo:

rabbitmqctl.bat list_queues

從隊列接收消息更復(fù)雜。它通過將回調(diào)(callback)函數(shù)訂閱到隊列來工作。每當(dāng)我們收到消息時,Pika庫都會調(diào)用此回調(diào)(callback)函數(shù)。在我們的例子中,此功能將在屏幕上顯示消息的內(nèi)容。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下來,我們需要告訴RabbitMQ這個特定的回調(diào)函數(shù)應(yīng)該從我們的hello隊列接收消息:

channel.basic_consume(callback,
                      queue = 'hello',
                      no_ack = True)

要使該命令成功,我們必須確保存在我們想要訂閱的隊列。幸運的是,我們對此有信心 - 我們在上面創(chuàng)建了一個隊列 - 使用queue_declare。

該NO_ACK參數(shù)將被描述以后。

最后,我們進(jìn)入一個永不停止的循環(huán),等待數(shù)據(jù)并在必要時運行回調(diào)。

print('[*]等待消息。退出按CTRL + C')
channel.start_consuming()

把它們放在一起

send.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

現(xiàn)在我們可以在終端中試用我們的程序。首先,讓我們開始一個消費者,它將持續(xù)運行等待交付:

python receive.py
 #=> [*]正在等待消息。要退出按CTRL + C 
#=> [x]收到'Hello World!'

現(xiàn)在開始運行生產(chǎn)者, 生產(chǎn)者程序在每次運行之后都會停止:

python send.py
 #=> [x]發(fā)送'Hello World!'

牛逼!我們能夠通過RabbitMQ發(fā)送第一條消息。您可能已經(jīng)注意到,receive.py程序不會退出。它將保持準(zhǔn)備好接收更多消息,并可能被Ctrl-C中斷。
嘗試在新終端中再次運行send.py.

我們已經(jīng)學(xué)會了如何從命名隊列發(fā)送和接收消息。是時候轉(zhuǎn)到第2部分 并構(gòu)建一個簡單的工作隊列了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容