Python 消息中間件RabbitMQ使用

介紹

rabbitmq是基于Erlang語(yǔ)言編寫的一種消息隊(duì)列中間件,具體的內(nèi)容網(wǎng)上有很多這里就不贅述了,本文主要介紹一下在python當(dāng)中基于第三方庫(kù)pika對(duì)rabbitmq的簡(jiǎn)單使用

安裝

服務(wù)端
ubuntu安裝參考

https://www.cnblogs.com/vipstone/p/9184314.html

centos參考

https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

客戶端
pip install pika

場(chǎng)景

  • 任務(wù)隊(duì)列
  • 發(fā)布訂閱內(nèi)容
  • ...

生產(chǎn)者-消費(fèi)者模式

在消息隊(duì)列當(dāng)中,最簡(jiǎn)單的就是生產(chǎn)者消費(fèi)者模式,即生產(chǎn)者發(fā)布一條消息,一個(gè)或者多個(gè)消費(fèi)者在監(jiān)聽(tīng)等待,最終發(fā)布的消息被其中一個(gè)消費(fèi)者給取走執(zhí)行的模式,下面是簡(jiǎn)單的示例:

生產(chǎn)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host、port(默認(rèn)15672)、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
channel.basic_publish(exchange='',
                      routing_key='aaa',
                      body='this is a msg!')
# 往aaa隊(duì)列發(fā)送一條消息
connection.close()
#關(guān)閉連接
消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host、port(默認(rèn)5672)、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
 
channel.basic_consume('aaa', callback, auto_ack=True)
# 從aaa隊(duì)列中取出一條消息,并執(zhí)行對(duì)應(yīng)回調(diào),第三個(gè)參數(shù)代表取到消息后自動(dòng)回復(fù)執(zhí)行完成,生產(chǎn)者會(huì)將該任務(wù)消息刪除
# 在舊版里傳參順序和參數(shù)名可能有所不同(舊版里是:callback, queue='aaa', no_ack=True)
print('start...')
channel.start_consuming()
# 開(kāi)啟循環(huán)監(jiān)聽(tīng)消息隊(duì)列

此時(shí)如果打開(kāi)多個(gè)消費(fèi)者,那么可以發(fā)現(xiàn)生產(chǎn)者的發(fā)送的消息隊(duì)列將會(huì)被消費(fèi)者按順序取走

生產(chǎn)者消費(fèi)者模式-常用配置

任務(wù)完成回復(fù)

auto_ack參數(shù)可以配置任務(wù)是否需要回復(fù),默認(rèn)是False,即任務(wù)被取走之后,只有消費(fèi)者在回調(diào)當(dāng)中執(zhí)行了ch.basic_ack(delivery_tag=method.delivery_tag)方法以后,代表任務(wù)執(zhí)行完成,此時(shí)生產(chǎn)者才會(huì)把任務(wù)從消息隊(duì)列當(dāng)中刪除,若消費(fèi)者沒(méi)能在關(guān)閉前執(zhí)行上面那句方法,那么別的消費(fèi)者將會(huì)在之后取走該任務(wù)去執(zhí)行,直到生產(chǎn)者接收到執(zhí)行完成的指令為止。如果auto_ack值為True,那么當(dāng)任務(wù)被取走之后,生產(chǎn)者將直接把隊(duì)列中的任務(wù)刪除。舉例:

# 消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    # 回復(fù)任務(wù)完成
 
channel.basic_consume('aaa', callback, auto_ack=False)
# 設(shè)置auto_ack=False,此時(shí)消費(fèi)者必須提供任務(wù)完成的回復(fù)
print('start...')
channel.start_consuming()
消息隊(duì)列持久化

在隊(duì)列聲明時(shí),可以通過(guò)參數(shù)durable配置是否需要持久化,默認(rèn)為False,即不需要持久化,此時(shí)如果服務(wù)端掛了,那么消息隊(duì)列的內(nèi)容將會(huì)丟失。如果配置持久化,那么首先需要在聲明當(dāng)中設(shè)置durable=True,然后在發(fā)布時(shí)也配置分發(fā)模式為持久化分發(fā),舉例:

# 生產(chǎn)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='bbb', durable=True)
# 聲明一個(gè)消息持久化的隊(duì)列,因?yàn)橹耙呀?jīng)創(chuàng)建了aaa隊(duì)列,并且不是持久化的隊(duì)列,所以這里新建一個(gè)不存在的隊(duì)列
channel.basic_publish(exchange='',
                      routing_key='bbb',
                      body="this is a durable msg!", 
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                          # 配置消息持久化
                      ))
connection.close()
閑置消費(fèi)

默認(rèn)是按照客戶端的順序一個(gè)個(gè)循環(huán)派發(fā)任務(wù)的,但要是第一個(gè)客戶端沒(méi)執(zhí)行完,而下一個(gè)客戶端已經(jīng)執(zhí)行完了,此時(shí)如果還把任務(wù)派發(fā)給第一個(gè)就有些不好了,所以需要將任務(wù)派發(fā)給閑置的客戶端,類似于Nginx的負(fù)載均衡,實(shí)現(xiàn)只需要在消費(fèi)者當(dāng)中加入一句代碼:channel.basic_qos(prefetch_count=1),舉例:

# 消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# 設(shè)置閑置消費(fèi)
channel.basic_consume('aaa', callback, auto_ack=False)
print('start...')
channel.start_consuming()

發(fā)布訂閱模式-fanout

前面介紹的都是生產(chǎn)者發(fā)布一條任務(wù)消息,然后一個(gè)或者多個(gè)消費(fèi)者中的其中一個(gè)取走這個(gè)任務(wù)去執(zhí)行的情況。而有一種場(chǎng)景,如廣播、微信公眾號(hào)的消息推送這種,往往需要將一條消息發(fā)布給所有的消費(fèi)者執(zhí)行,而在rabbitmq當(dāng)中就可以通過(guò)創(chuàng)建一個(gè)exchange交換器來(lái)創(chuàng)建和管理多個(gè)隊(duì)列,即一個(gè)exchange下有多個(gè)隊(duì)列,并且每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者,當(dāng)有消息的時(shí)候,exchange會(huì)將消息發(fā)送給自己所管理的所有隊(duì)列,此時(shí)需要設(shè)置類型為fanout,舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器,發(fā)布類型為集體分發(fā)
channel.basic_publish(exchange='ex1',
                      routing_key='',
                      body="everyone on ex1 will get this msg!"
                      )
# 往ex1中所有隊(duì)列分發(fā)消息
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器,發(fā)布類型為集體分發(fā)
result = channel.queue_declare(queue='', exclusive=True)
# 這里不定義名字,通過(guò)exclusive=True生成一個(gè)名字不重復(fù)的隊(duì)列
queue_name = result.method.queue
# 獲取隊(duì)列名
channel.queue_bind(queue_name, exchange='ex1')
# 綁定隊(duì)列名和ex1交換器
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
# 使用隨機(jī)生成的隊(duì)列名在ex1交換器下接收消息
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

注:
發(fā)布訂閱模式和生產(chǎn)消費(fèi)者模式還有一點(diǎn)不同就是:生產(chǎn)消費(fèi)者模式當(dāng)中,生產(chǎn)者產(chǎn)生的消費(fèi)只要沒(méi)被取走,那么消息就會(huì)一直留著等待被消費(fèi)者取走;而在發(fā)布訂閱模式當(dāng)中,發(fā)布者只會(huì)發(fā)布一次,發(fā)布完該消息就會(huì)被刪除,因此如果訂閱者沒(méi)有在發(fā)布者發(fā)布時(shí)接收到消息,將永遠(yuǎn)錯(cuò)過(guò)接收的機(jī)會(huì)(就像關(guān)注公眾號(hào)以后,公眾號(hào)并不會(huì)把以前的所有歷史推送信息也給你重新再推送一遍一樣)

指定發(fā)布訂閱-direct

對(duì)于發(fā)布時(shí),可以不給所有隊(duì)列發(fā)送消息,而是指定給哪些隊(duì)列發(fā)送,舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器,發(fā)布類型為指定路由分發(fā)
channel.basic_publish(exchange='ex2',
                      routing_key='test1',
                      body="only test1 will get this msg!"
                      )
# 往ex2中test1的路由分發(fā)消息
channel.basic_publish(exchange='ex2',
                      routing_key='test2',
                      body="only test2 will get this msg!"
                      )
# 往ex2中test2的路由分發(fā)消息
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器,接收類型為指定路由
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex2', routing_key='test1')
# 綁定ex2與隊(duì)列,以及綁定對(duì)應(yīng)的路由
channel.queue_bind(queue_name, exchange='ex2', routing_key='test2')
channel.queue_bind(queue_name, exchange='ex2', routing_key='test3')
# 可以一個(gè)隊(duì)列綁定多個(gè)路由
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

模糊匹配發(fā)布訂閱-topic

在發(fā)布訂閱時(shí)也可以通過(guò)模糊匹配路由,當(dāng)符合匹配規(guī)則的路由將會(huì)接收到消息,其中常用的通配符有*#,*后面能夠匹配一個(gè)單詞,#后面能夠匹配一個(gè)或多個(gè)單詞(例如有a.#a.*的匹配規(guī)則,那么a.x能被兩個(gè)規(guī)則都匹配到,但a.x.y只能被a.#匹配到),舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器,發(fā)布類型為指定模糊匹配路由分發(fā)
channel.basic_publish(exchange='ex3',
                      routing_key='test.a',
                      body="test.# or test.* will get this msg!"
                      )
channel.basic_publish(exchange='ex3',
                      routing_key='test.b.c',
                      body="only test.# will get this msg!"
                      )
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器,發(fā)布類型為模糊匹配路由接收
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex3', routing_key='test.#')
# 這里因?yàn)閞outing_key='test.#',所以test.a和test.b.c的消息都能接收到

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

RPC

遠(yuǎn)程過(guò)程調(diào)用,一個(gè)簡(jiǎn)單的理解就是假如本地要調(diào)用一個(gè)函數(shù),而服務(wù)端已經(jīng)實(shí)現(xiàn)了該函數(shù),那么可以向服務(wù)端請(qǐng)求調(diào)用該函數(shù),并把返回值返回給本地,具體參考:https://www.cnblogs.com/goldsunshine/p/8665456.html

其他操作

pika模塊當(dāng)中也提供了如刪除/解綁隊(duì)列、刪除/解綁交換器等操作,舉例:

import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_delete("ex1")
# 刪除交換器
channel.queue_delete("aaa")
# 刪除隊(duì)列
connection.close()

更多關(guān)于pika操作參考:https://www.cnblogs.com/cwp-bg/p/8426188.html

更多參考

RabbitMQ的六種工作模式
Python RabbitMQ原理和使用場(chǎng)景以及模式

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容