RabbitMQ pika簡單使用

MQ 全稱為 Message Queue, 是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。MQ 是消費(fèi)-生產(chǎn)者模型的一個典型的代表,一端往消息隊(duì)列中不斷寫入消息,而另一端則可以讀取或者訂閱隊(duì)列中的消息。MQ 遵循了 AMQP 協(xié)議的具體實(shí)現(xiàn)和產(chǎn)品。

整篇代碼擼下來預(yù)計耗時1.5h。

參考博主:anzhsoft
官網(wǎng)


閑聊
  • 什么是 RabbitMQ ?
  • 為什么要用 RabbitMQ ?
  • RabbitMQ 怎么用?

一、What RabbitMQ?

MQ 是消費(fèi)-生產(chǎn)者模型的一個典型的代表,一端往消息隊(duì)列中不斷寫入消息,而另一端則可以讀取或者訂閱隊(duì)列中的消息。MQ 和 JMS 類似,但不同的是 JMS 是 SUN JAVA 消息中間件服務(wù)的一個標(biāo)準(zhǔn)和API定義,而 MQ 則是遵循了 AMQP協(xié)議的具體實(shí)現(xiàn)和產(chǎn)品。

這個系統(tǒng)架構(gòu)圖版權(quán)屬于sunjun041640
  • 1、RabbitMQ Server
    • RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是維護(hù)一條從 Producer 到 Consumer 的路線。
  • 2、Exchange
    • where producers publish their messages(發(fā)送方發(fā)布消息的地方)
  • 3、Queue
    • where the messages end up and are received by consumers(接收方獲取消息的地方)
  • 4、Producer(Client A, Client B)
    • 數(shù)據(jù)的發(fā)送方
  • 5、Consumer(Client 1, Client 2, Client 3)
    • 數(shù)據(jù)的接收方
  • 6、Connection
    • 就是一個 TCP 的連接。Producer 和 Consumer 都是通過 TCP 連接到 RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個 TCP 連接。
  • 7、Channel
    • 虛擬連接。它建立在上述的 TCP 連接中。數(shù)據(jù)流動都是在 Channel 中進(jìn)行的。也就是說,一般情況是程序起始建立TCP 連接,第二步就是建立這個 Channel
  • 8、Bindings
    • Bindings are how the messages get routed from the exchange to particular queues.
二、Why RabbitMQ

對于一個大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊如何通信呢?可以使用 socket,或者 url 請求,但是還有很多問題需要解決,如:

  • 1)信息的發(fā)送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
  • 2)如何降低發(fā)送者和接收者的耦合度?
  • 3)如何讓 Priority 高的接收者先接到數(shù)據(jù)?
  • 4)如何做到 load balance?有效均衡接收者的負(fù)載?
  • 5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者 subscribe 不同的數(shù)據(jù),如何做有效的 filter。
  • 6)如何做到可擴(kuò)展,甚至將這個通信模塊發(fā)到 cluster 上?
  • 7)如何保證接收者接收到了完整,正確的數(shù)據(jù)
    AMDQ 協(xié)議解決了以上的問題,而 RabbitMQ 實(shí)現(xiàn)了 AMQP
三、How RabbitMQ

一套 MQ 完整流程如下:

首先將 RabbitMQ 服務(wù)啟動

Producer

  • 1、創(chuàng)建一個 connection
  • 2、在 connection 基礎(chǔ)上創(chuàng)建一個頻道 channel
  • 3、在頻道 channel 上聲明一個 exchange,參數(shù)為 exchange 的類型和名稱
  • 4、在頻道 channel 上發(fā)布消息,參數(shù)為 exchange 的名稱以及路由 (routing_key) 以及消息體
  • 5、關(guān)閉 connection

Consumer

  • 1、創(chuàng)建一個 connection
  • 2、在 connection 基礎(chǔ)上創(chuàng)建一個 channel
  • 3、在頻道 channel 上聲明一個 exchange,參數(shù)為 exchange 的類型和名稱
  • 4、在頻道 channel 上聲明一個 queue
  • 5、將 queue 綁定到聲明的 exchange 上
  • 6、channel 開始監(jiān)聽對應(yīng) queue 上的消息,同時設(shè)置獲取消息的回調(diào)
四、example
  • 1、直接使用 queue ,難度等級 ★☆☆☆☆

producer.py

import pika  

if __name__ == '__main__':
    # 創(chuàng)建一個connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()
    
    # 聲明一個queue
    channel.queue_declare(queue='hello')  
    
    # exchange為空的時候,routing_key就是指定的queue值
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')  
    print(" [x] Sent 'Hello World!'")
    # 關(guān)閉連接
    connection.close()

consumer.py

import pika  

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % (body,))

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    # 創(chuàng)建頻道
    channel = connection.channel()  
    # 聲明queue
    channel.queue_declare(queue='hello')  
    
    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 收到指定消息的回調(diào)設(shè)置 
    channel.basic_consume(callback, queue='hello', no_ack=True)  
    # 開始循環(huán)監(jiān)聽 
    channel.start_consuming()
  • 2、消息確認(rèn),難度等級 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    # 創(chuàng)建一個 connection
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 創(chuàng)建一個 channel
    channel = connection.channel()
    # 聲明一個queue
    channel.queue_declare(queue='hello')

    # 發(fā)布消息,exchange 為空的情況下,routing_key 值就是指定的 queue 名字,即將消息直接發(fā)送到指定的 queue
    channel.basic_publish(exchange='', routing_key='hello', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    time.sleep(str(body).count('.'))
    ch.basic_ack(delivery_tag = method.delivery_tag)
        
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    channel.queue_declare(queue='hello')  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
      
    channel.basic_consume(callback, queue='hello',no_ack=False)  
    
    channel.start_consuming()
  • 3、使用 fanout 類型 exchange,難度等級 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    channel.basic_publish(exchange='logs', routing_key='', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 確認(rèn)消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 聲明一個 exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    # 聲明一個隨機(jī)名字的 queue
    result = channel.queue_declare()  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 獲取 queue 的 name 
    queue_name = result.method.queue
    # 將 queue 綁定到 exchange
    channel.queue_bind(exchange='logs', queue=queue_name)
    # 設(shè)置監(jiān)聽的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
  • 4、使用 direct 類型exchange,難度等級 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 聲明一個 direct 類型的 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 獲取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    channel.basic_publish(exchange='logs_direct', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 確認(rèn)消息
    ch.basic_ack(delivery_tag = method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 聲明一個 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 聲明一個隨機(jī)名字的 queue
    result = channel.queue_declare()  

    # 設(shè)置監(jiān)聽的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 獲取 queue 的 name 
    queue_name = result.method.queue
    # 將 queue 綁定到 exchange
    channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)
    # 設(shè)置監(jiān)聽的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
  • 5、使用 topic 類型exchange難度等級 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 聲明一個 topic 類型的 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 獲取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
    channel.basic_publish(exchange='logs_topic', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 確認(rèn)消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
  
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()

    # 聲明一個 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 聲明一個隨機(jī)名字的 queue
    result = channel.queue_declare()  

    # 設(shè)置監(jiān)聽的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 獲取 queue 的 name 
    queue_name = result.method.queue
    # 將 queue 綁定到 exchange
    channel.queue_bind(exchange='logs_topic', queue=queue_name, routing_key=severity)
    # 設(shè)置監(jiān)聽的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
五、實(shí)現(xiàn)細(xì)節(jié)部分
  • 1、Exchange
    • 空:直接將消息綁定到指定 queue 處理
    • fanout:廣播,所有綁定到該 exchange 的 queue 都會收到消息
    • direct:定向,所有綁定到該 exchange 并且其 routing_key 也相同的 queue 能收到消息
    • topic:主題,所有綁定到該 exchange 并且 routing_key 符合其匹配的 queue 能收到消息。匹配規(guī)則如下:* (星號) 代表任意 一個單詞,# (hash) 0個或者多個單詞。即 111.111.111 是和 *.111.111、#.111 匹配的。
六、RabbitMQ 常用指令
  • 1、服務(wù)器的啟動與關(guān)閉·
啟動: rabbitmq-server –detached

關(guān)閉: rabbitmqctl stop

若單機(jī)有多個實(shí)例,則在 rabbitmqctlh 后加 –n 指定名稱
  • 2、獲取服務(wù)器狀態(tài)
服務(wù)器狀態(tài):rabbitmqctl status
  • 3、常用的命令
查看所有的消息隊(duì)列
rabbitmqctl list_queues

清除所有隊(duì)列
abbitmqctl reset

啟動應(yīng)用
rabbitmqctl start_app

關(guān)閉應(yīng)用
rabbitmqctl stop_app

查看所有的 Exchanges
rabbitmqctl list_exchanges 

查看所有的 bindings
rabbitmqctl list_bindings
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,236評論 3 51
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評論 19 139
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件,它能夠在應(yīng)用之間提供可靠的消息傳輸。在易用性,擴(kuò)展性,高可用性...
    點(diǎn)融黑幫閱讀 3,131評論 3 41
  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,415評論 0 1

友情鏈接更多精彩內(nèi)容