Python Web編程,Django框架 -- (9) 緩存 Redis RabbitMQ 使用

  1. Rabbitmq 內(nèi)部結(jié)構(gòu):
image.png
  • Message

消息,消息是不具名的,由消息頭和消息體組成,消息體是不透明的,而消息頭是由一些列的可選屬性組成,包括 routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出消息可能需要持久性存儲(chǔ))等

  • Publisher

消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。

  • Exchange

交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。

  • Binding

綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián),一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。

  • Queue

消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。這是消息的容器,也是消息的終點(diǎn),一個(gè)消息可以投入一個(gè)或多個(gè)隊(duì)列,消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。

  • Connection

網(wǎng)絡(luò)連接, TCP 連接

  • Channel

信道,多路復(fù)用來南街中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀TCP都是非常昂貴的開銷,所以引入信道的概念,在一個(gè)TCP連接內(nèi)建立多個(gè)信道,以復(fù)用一條TCP連接。

  • Consumer

消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。

  • Virtual Host

虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè)vhost本質(zhì)上就是一個(gè)mini版的rabbitmq服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。 vhost 是AMQP概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ默認(rèn)的vhost是 /。

  • Broker

代理人,飆戲消息隊(duì)列服務(wù)器實(shí)體。

  1. RabbitMQ的消息路由,常用架構(gòu):
image.png
  • 1 和 2 為 隊(duì)列模式,3, 4 為 發(fā)布-訂閱模式。
  1. Python操作 Rabbitmq
  • pip install pika

  • 隊(duì)列方式:

  • 生產(chǎn)者代碼:

# 生產(chǎn)者代碼
import pika
# 用戶名 和 密碼登錄, 建議不同的業(yè)務(wù) 創(chuàng)建不同的用戶名和密碼
credentials = pika.PlainCredentials('guest', 'guest')
# 虛擬隊(duì)列需要指定參數(shù)  virtual_port,如果是默認(rèn)的,可以不填。
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=credentials)
#創(chuàng)建連接,阻塞方法
connection = pika.BlockingConnection(parameters)
# 建立信道
channel = connection.channel()
# 聲明消息隊(duì)列, 建議在生產(chǎn)者與消費(fèi)者兩邊同時(shí)聲明,如果不存在會(huì)自動(dòng)創(chuàng)建,如果已存在,則直接使用
# durable=True  表示隊(duì)列持久化
channel.queue_declare(queue='direct_demo', durable=False)
# exchange 指定交換機(jī)
# routing_key 指定隊(duì)列名
# body 要填入到隊(duì)列中的內(nèi)容
channel.basic_publish(exchange='', routing_key='direct_demo',
                      body='again')
# 關(guān)閉與 rabbitmq server 的連接
connection.close()
  • 消費(fèi)者代碼:
# 消費(fèi)者代碼
import pika
# 以下同生產(chǎn)者代碼
crendential = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=crendential)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='direct_demo', durable=False)
# 回調(diào)函數(shù),當(dāng)消費(fèi)者拿到隊(duì)列中的數(shù)據(jù)后進(jìn)行的操作, 四個(gè)參數(shù)為固定參數(shù)
def callback(ch, method, properties, body):
    # 手動(dòng)發(fā)送確認(rèn)消息,當(dāng)生產(chǎn)者需要消費(fèi)者進(jìn)行消息確認(rèn)時(shí)
    # ch.basic_ack(delivery=method.delivery_tag)
    # 實(shí)現(xiàn)如何處理消息
    print(body.decode())
# 確認(rèn)消費(fèi)者使用哪個(gè)隊(duì)列以及回調(diào)函數(shù)
channel.basic_consume('direct_demo', on_message_callback=callback)
# 開始接收消息,并進(jìn)入阻塞狀態(tài),無限循環(huán)。
channel.start_consuming()
  • 分發(fā)者模式:一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者

生產(chǎn)者 代碼不變,只是改變了 隊(duì)列 和 消息 持久化參數(shù)。

消費(fèi)者會(huì)以輪詢的方式進(jìn)行消費(fèi),代碼也可以不變,不過為了能看到輪詢的效果,增加了 sleep 時(shí)間。另外 增加 prefetch_count 參數(shù),當(dāng)消費(fèi)者出現(xiàn)問題或者消費(fèi)比較慢時(shí),則不再向該消費(fèi)者發(fā)送消息。

# 生產(chǎn)者代碼
import pika
# 用戶名 和 密碼登錄, 建議不同的業(yè)務(wù) 創(chuàng)建不同的用戶名和密碼
credentials = pika.PlainCredentials('guest', 'guest')
# 虛擬隊(duì)列需要指定參數(shù)  virtual_port,如果是默認(rèn)的,可以不填。
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=credentials)
#創(chuàng)建連接,阻塞方法
connection = pika.BlockingConnection(parameters)
# 建立信道
channel = connection.channel()
# 聲明消息隊(duì)列, 建議在生產(chǎn)者與消費(fèi)者兩邊同時(shí)聲明,如果不存在會(huì)自動(dòng)創(chuàng)建,如果已存在,則直接使用
# durable=True  表示隊(duì)列持久化
channel.queue_declare(queue='task_queue', durable=True)
# exchange 指定交換機(jī)
# routing_key 指定隊(duì)列名
# body 要填入到隊(duì)列中的內(nèi)容
for i in range(6):
    message = f'send message to taskqueue: {i}'
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,   # 消息持久化
                          ))
# 關(guān)閉與 rabbitmq server 的連接
connection.close()
# 消費(fèi)者代碼
import pika
import time
import random
# 以下同生產(chǎn)者代碼
crendential = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=crendential)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
# 回調(diào)函數(shù),當(dāng)消費(fèi)者拿到隊(duì)列中的數(shù)據(jù)后進(jìn)行的操作, 四個(gè)參數(shù)為固定參數(shù)
def callback(ch, method, properties, body):
    rand_int = random.randint(0, 6)
    print(rand_int)
    time.sleep(rand_int)
    # 實(shí)現(xiàn)如何處理消息
    print(body.decode())
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果消費(fèi)者的channel上未確認(rèn)的消息數(shù)達(dá)到了prefectch_count 數(shù),則不想該消費(fèi)者發(fā)送消息
channel.basic_qos(prefetch_count=1)
# 確認(rèn)消費(fèi)者使用哪個(gè)隊(duì)列以及回調(diào)函數(shù)
channel.basic_consume('task_queue', on_message_callback=callback)
# 開始接收消息,并進(jìn)入阻塞狀態(tài),無限循環(huán)。
channel.start_consuming()
  • 并行消費(fèi)模式。 發(fā)布-訂閱模式,需要增加 exchange

生產(chǎn)者代碼:聲明交換機(jī),并且交換機(jī)類型為 fanout,不需要聲明 queue

消費(fèi)者代碼:聲明交換機(jī),并且交換機(jī)類型為 fanout,聲明queue,名稱為空,為產(chǎn)生默認(rèn)名稱,并且設(shè)置參數(shù) exclusive,表示當(dāng)斷開連接時(shí)則刪除queue。最后與exchange一起綁定到channel

注意: 消費(fèi)者如果想消費(fèi),必須在 生產(chǎn)者消費(fèi)產(chǎn)生內(nèi)容之前(exchange不保存消息)就綁定到交換機(jī),否則 消息不會(huì)發(fā)送到這個(gè)隊(duì)列。

# 生產(chǎn)者代碼
import pika
# 用戶名 和 密碼登錄, 建議不同的業(yè)務(wù) 創(chuàng)建不同的用戶名和密碼
credentials = pika.PlainCredentials('guest', 'guest')
# 虛擬隊(duì)列需要指定參數(shù)  virtual_port,如果是默認(rèn)的,可以不填。
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=credentials)
#創(chuàng)建連接,阻塞方法
connection = pika.BlockingConnection(parameters)
# 建立信道
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
# exchange 指定交換機(jī)
# routing_key 指定隊(duì)列名
# body 要填入到隊(duì)列中的內(nèi)容
for i in range(6):
    message = f'send message to taskqueue: {i}'
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message,
                          )
# 關(guān)閉與 rabbitmq server 的連接
connection.close()
# 消費(fèi)者代碼
import pika
import time
import random
# 以下同生產(chǎn)者代碼
crendential = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(host='192.168.246.129',
                                       port=5672,
                                       virtual_host='/',
                                       credentials=crendential)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout'
                         )
# 聲明消息隊(duì)列,這里沒有指定 queue 的名稱,會(huì)自動(dòng)生成一個(gè)名稱。
# exclusive,當(dāng)與消費(fèi)者斷開連接的時(shí)候,隊(duì)列被立即刪除
result = channel.queue_declare(queue='',
                               exclusive=True)
queue_name = result.method.queue
# 通過 bind 實(shí)現(xiàn)exchange,將message 消息發(fā)送到指定的queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)
# 回調(diào)函數(shù),當(dāng)消費(fèi)者拿到隊(duì)列中的數(shù)據(jù)后進(jìn)行的操作, 四個(gè)參數(shù)為固定參數(shù)
def callback(ch, method, properties, body):
    rand_int = random.randint(0, 6)
    print(rand_int)
    time.sleep(rand_int)
    # 實(shí)現(xiàn)如何處理消息
    print(body.decode())
# 如果消費(fèi)者的channel上未確認(rèn)的消息數(shù)達(dá)到了prefectch_count 數(shù),則不想該消費(fèi)者發(fā)送消息
channel.basic_qos(prefetch_count=1)
# 確認(rèn)消費(fèi)者使用哪個(gè)隊(duì)列以及回調(diào)函數(shù)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)  # 消費(fèi)者自動(dòng)進(jìn)行確認(rèn),不需要手動(dòng)確認(rèn)。
# 開始接收消息,并進(jìn)入阻塞狀態(tài),無限循環(huán)。
channel.start_consuming()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容