Rabbit MQ 學(xué)習(xí)筆記(一)

一.介紹

RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息.類(lèi)似于郵局:將消息發(fā)給郵局,郵局再把消息分發(fā)給目標(biāo).在這個(gè)比喻中 RabbitMQ是郵箱, 郵局和郵遞員小哥.

RabbitMQ和郵局這兩者之間的主要區(qū)別是它不會(huì)處理紙質(zhì)郵件,取而代之的是接收、存儲(chǔ)和發(fā)送二進(jìn)制數(shù)據(jù)塊,也就是我們通常所說(shuō)的消息。

RabbitMQ和消息中,通常會(huì)使用一些專(zhuān)業(yè)術(shù)語(yǔ)。

生產(chǎn):生產(chǎn)意味著就是發(fā)送。 發(fā)送消息的程序是一個(gè)生產(chǎn)者。下圖代表一個(gè)消息的生產(chǎn)者:

生產(chǎn)者

隊(duì)列:這里的隊(duì)列是指一個(gè)名稱(chēng),但是名稱(chēng)所代表的隊(duì)列實(shí)體寄存在RabbitMQ服務(wù)器端中。 雖然消息流過(guò)RabbitMQ和您的應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。 隊(duì)列只受主機(jī)的內(nèi)存和磁盤(pán)的限制,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送消息到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。 下圖代表一個(gè)隊(duì)列:

queue

消費(fèi):消費(fèi)具有與接收相似的含義。 消費(fèi)者是一個(gè)主要等待接收消息的程序。下圖代表消息的消費(fèi)者:

消費(fèi)者

請(qǐng)注意,生產(chǎn)者,消費(fèi)者和代理不必駐留在同一主機(jī)上; 實(shí)際上在大多數(shù)應(yīng)用中他們都沒(méi)有在同一臺(tái)主機(jī)上。

(使用python腳本鏈接)
我們將用Python編寫(xiě)兩個(gè)程序; 發(fā)送單個(gè)消息的生產(chǎn)者,以及接收消息并將其打印出來(lái)的消費(fèi)者。我們將掩蓋Python API中的一些細(xì)節(jié),專(zhuān)注于這個(gè)非常簡(jiǎn)單的事情,只是為了開(kāi)始。它是消息傳遞的“Hello World”。

在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列 - RabbitMQ代表消費(fèi)者保留的消息緩沖區(qū)。


(生產(chǎn)者) - > [隊(duì)列] - >(消費(fèi)者)

Python客戶(hù)端庫(kù)

RabbitMQ說(shuō)多種協(xié)議。本教程使用AMQP 0-9-1,它是一種開(kāi)放的,通用的消息傳遞協(xié)議。RabbitMQ有許多不同語(yǔ)言的客戶(hù)端。我們將使用RabbitMQ提供的Python客戶(hù)端。

現(xiàn)在我們有了Python 客戶(hù)端的及其依賴(lài)項(xiàng),我們可以編寫(xiě)一些代碼.

發(fā)出消息

(生產(chǎn)者) - > [隊(duì)列]

我們的第一個(gè)程序send.py將向隊(duì)列發(fā)送一條消息。我們需要做的第一件事是建立與RabbitMQ服務(wù)器的連接。

#!/usr/bin/env python 
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

我們現(xiàn)在已連接到本地計(jì)算機(jī)上的代理 - 所以是localhost。如果我們想要連接到不同機(jī)器上的代理,我們只需在此處指定其名稱(chēng)或IP地址。

接下來(lái),在發(fā)送之前,我們需要確保收件人隊(duì)列存在。如果我們向不存在的位置發(fā)送消息,RabbitMQ將丟棄該消息。讓我們創(chuàng)建一個(gè)將要傳遞消息的hello隊(duì)列:

channel.queue_declare(queue = 'hello')

此時(shí)我們已準(zhǔn)備好發(fā)送消息。我們的第一條消息只包含一個(gè)字符串Hello World!我們想將它發(fā)送到我們的 hello隊(duì)列。

在RabbitMQ中,消息永遠(yuǎn)不能直接發(fā)送到隊(duì)列,它總是需要通過(guò)exchange。但是,讓我們不要被細(xì)節(jié)拖累 - 您可以在本教程的第三部分中閱讀有關(guān)exchanges更多信息。我們現(xiàn)在需要知道的是如何使用由空字符串標(biāo)識(shí)的默認(rèn)交換。這種交換是特殊的 - 它允許我們準(zhǔn)確地指定消息應(yīng)該到達(dá)哪個(gè)隊(duì)列。需要在routing_key參數(shù)中指定隊(duì)列名稱(chēng):

channel.basic_publish(exchange = '',
                      routing_key = 'hello',
                      body = 'Hello World!')
print(“[x]發(fā)送'Hello World!'”)

在退出程序之前,我們需要確保刷新網(wǎng)絡(luò)緩沖區(qū)并將消息實(shí)際傳遞給RabbitMQ。我們可以通過(guò)輕輕關(guān)閉連接來(lái)實(shí)現(xiàn)。

connection.close()
發(fā)送不起作用!

如果這是您第一次使用RabbitMQ并且沒(méi)有看到“已發(fā)送”消息,那么您可能會(huì)感到頭疼,想知道可能出現(xiàn)的問(wèn)題。也許代理是在沒(méi)有足夠的可用磁盤(pán)空間的情況下啟動(dòng)的(默認(rèn)情況下它至少需要200 MB空閑),因此拒絕接受消息。檢查代理日志文件以確認(rèn)并在必要時(shí)減少限制。該配置文件文檔會(huì)告訴你如何設(shè)置disk_free_limit。

接收

我們的第二個(gè)程序receive.py將從隊(duì)列接收消息并在屏幕上打印它們。

同樣,首先我們需要連接到RabbitMQ服務(wù)器。負(fù)責(zé)連接Rabbit的代碼與之前相同。

與以前一樣,下一步是確保隊(duì)列存在。使用queue_declare創(chuàng)建隊(duì)列是冪等的 - 我們可以根據(jù)需要多次運(yùn)行命令,并且只創(chuàng)建一個(gè)命令。

channel.queue_declare(queue = 'hello')

您可能會(huì)問(wèn)為什么我們?cè)俅温暶麝?duì)列 - 我們已經(jīng)在之前的代碼中聲明了它。如果我們確定隊(duì)列已經(jīng)存在,我們可以避免這種情況。例如,如果之前運(yùn)行了send.py程序。但我們還不確定首先運(yùn)行哪個(gè)程序。在這種情況下,重復(fù)在兩個(gè)程序中重復(fù)聲明隊(duì)列是一個(gè)好習(xí)慣。

列出隊(duì)列
您可能希望看到RabbitMQ有哪些隊(duì)列以及它們中有多少消息。您可以使用rabbitmqctl工具(作為特權(quán)用戶(hù))執(zhí)行此操作:

sudo rabbitmqctl list_queues

在Windows上,省略sudo:

rabbitmqctl.bat list_queues

從隊(duì)列接收消息更復(fù)雜。它通過(guò)將回調(diào)(callback)函數(shù)訂閱到隊(duì)列來(lái)工作。每當(dāng)我們收到消息時(shí),Pika庫(kù)都會(huì)調(diào)用此回調(diào)(callback)函數(shù)。在我們的例子中,此功能將在屏幕上顯示消息的內(nèi)容。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下來(lái),我們需要告訴RabbitMQ這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的hello隊(duì)列接收消息:

channel.basic_consume(callback,
                      queue = 'hello',
                      no_ack = True)

要使該命令成功,我們必須確保存在我們想要訂閱的隊(duì)列。幸運(yùn)的是,我們對(duì)此有信心 - 我們?cè)谏厦鎰?chuàng)建了一個(gè)隊(duì)列 - 使用queue_declare。

該NO_ACK參數(shù)將被描述以后。

最后,我們進(jìn)入一個(gè)永不停止的循環(huán),等待數(shù)據(jù)并在必要時(shí)運(yùn)行回調(diào)。

print('[*]等待消息。退出按CTRL + C')
channel.start_consuming()

把它們放在一起

send.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

現(xiàn)在我們可以在終端中試用我們的程序。首先,讓我們開(kāi)始一個(gè)消費(fèi)者,它將持續(xù)運(yùn)行等待交付:

python receive.py
 #=> [*]正在等待消息。要退出按CTRL + C 
#=> [x]收到'Hello World!'

現(xiàn)在開(kāi)始運(yùn)行生產(chǎn)者, 生產(chǎn)者程序在每次運(yùn)行之后都會(huì)停止:

python send.py
 #=> [x]發(fā)送'Hello World!'

牛逼!我們能夠通過(guò)RabbitMQ發(fā)送第一條消息。您可能已經(jīng)注意到,receive.py程序不會(huì)退出。它將保持準(zhǔn)備好接收更多消息,并可能被Ctrl-C中斷。
嘗試在新終端中再次運(yùn)行send.py.

我們已經(jīng)學(xué)會(huì)了如何從命名隊(duì)列發(fā)送和接收消息。是時(shí)候轉(zhuǎn)到第2部分 并構(gòu)建一個(gè)簡(jiǎn)單的工作隊(duì)列了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容