RabbitMQ在Python中的使用:pika模塊

再一次把RabbitMQ的架構(gòu)圖拿到這里來(lái):


image.png

其主體分為三個(gè)大分部:

  1. RabbitMQ Server:它是一種傳輸服務(wù),用于維護(hù)數(shù)據(jù)從生產(chǎn)者到消費(fèi)者間的路線(xiàn),保證數(shù)據(jù)能按指定方式進(jìn)行傳輸。其內(nèi)包括了交換機(jī)和隊(duì)列,交換機(jī)分發(fā)生產(chǎn)者的數(shù)據(jù)到指定的隊(duì)列,隊(duì)列存放生產(chǎn)者發(fā)送的數(shù)據(jù)。
  2. Client A,B:生產(chǎn)者,產(chǎn)生數(shù)據(jù)
  3. Client 1,2,3:消費(fèi)者,數(shù)據(jù)的接收方。

一個(gè)簡(jiǎn)單的RabbitMQ編程實(shí)例

生產(chǎn)者代碼實(shí)現(xiàn)步驟:

  1. 獲得與RabbitMQ Server的連接對(duì)象
  2. 通過(guò)連接對(duì)象獲得Channel對(duì)象,Channel提供了與RabbitMQ交互的操作
  3. 連接用戶(hù)儲(chǔ)存消息數(shù)據(jù)的隊(duì)列,不存在則創(chuàng)建隊(duì)列
  4. 發(fā)送消息到指定的RabbitMQ隊(duì)列中
  5. 關(guān)閉RabbitMQ代理的連接
    只需按照上面步驟進(jìn)行編寫(xiě)代碼即可
import pika


class RabbitMQ(object):
    def __init__(self, host, port, username, password, vhost):
        self._host = host  # broker IP
        self._port = port  # broker port
        self._vhost = vhost  # vhost
        self._credentials = pika.PlainCredentials(username, password)
        self._connection = None

    def connect(self):
        # 連接RabbitMQ的參數(shù)對(duì)象
        parameter = pika.ConnectionParameters(self._host, self._port, self._vhost,
                                              self._credentials, heartbeat_interval=10)
        self._connection = pika.BlockingConnection(parameter)  # 建立連接

    def put(self, message_str, queue_name, route_key, exchange=''):
        if self._connection is None:
            return

        channel = self._connection.channel()      # 獲取channel
        channel.queue_declare(queue=queue_name)   # 申明使用的queue
        
        #  調(diào)用basic_publish方法向RabbitMQ發(fā)送數(shù)據(jù), 這個(gè)方法應(yīng)該只支持str類(lèi)型的數(shù)據(jù)
        channel.basic_publish(
            exchange=exchange,  # 指定exchange
            routing_key=route_key,  # 指定路由
            body=message_str      # 具體發(fā)送的數(shù)據(jù)
        )

    def getting_start(self, queue_name):
        if self._connection is None:
            return
        channel = self._connection.channel() 
        channel.queue_declare(queue=queue_name)
        
        # 調(diào)用basic_consume方法,可以傳入一個(gè)回調(diào)函數(shù)
        channel.basic_consume(self.callback,
                              queue=queue_name,
                              no_ack=True)
        channel.start_consuming()   # 相當(dāng)于run_forever(), 當(dāng)Queue中沒(méi)有數(shù)據(jù),則一直阻塞等待

    @staticmethod
    def callback(ch, method, properties, message_str):
        """定義一個(gè)回調(diào)函數(shù)"""
        print "[x] Received {0}".format(message_str)

    def close(self):
        """關(guān)閉RabbitMQ的連接"""
        if self._connection is not None:
            self._connection.close()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容