再一次把RabbitMQ的架構(gòu)圖拿到這里來(lái):

image.png
其主體分為三個(gè)大分部:
- RabbitMQ Server:它是一種傳輸服務(wù),用于維護(hù)數(shù)據(jù)從生產(chǎn)者到消費(fèi)者間的路線(xiàn),保證數(shù)據(jù)能按指定方式進(jìn)行傳輸。其內(nèi)包括了交換機(jī)和隊(duì)列,交換機(jī)分發(fā)生產(chǎn)者的數(shù)據(jù)到指定的隊(duì)列,隊(duì)列存放生產(chǎn)者發(fā)送的數(shù)據(jù)。
- Client A,B:生產(chǎn)者,產(chǎn)生數(shù)據(jù)
- Client 1,2,3:消費(fèi)者,數(shù)據(jù)的接收方。
一個(gè)簡(jiǎn)單的RabbitMQ編程實(shí)例
生產(chǎn)者代碼實(shí)現(xiàn)步驟:
- 獲得與RabbitMQ Server的連接對(duì)象
- 通過(guò)連接對(duì)象獲得Channel對(duì)象,Channel提供了與RabbitMQ交互的操作
- 連接用戶(hù)儲(chǔ)存消息數(shù)據(jù)的隊(duì)列,不存在則創(chuàng)建隊(duì)列
- 發(fā)送消息到指定的RabbitMQ隊(duì)列中
- 關(guān)閉RabbitMQ代理的連接
只需按照上面步驟進(jìn)行編寫(xiě)代碼即可
import pika
class RabbitMQ(object):
def __init__(self, host, port, username, password, vhost):
self._host = host # broker IP
self._port = port # broker port
self._vhost = vhost # vhost
self._credentials = pika.PlainCredentials(username, password)
self._connection = None
def connect(self):
# 連接RabbitMQ的參數(shù)對(duì)象
parameter = pika.ConnectionParameters(self._host, self._port, self._vhost,
self._credentials, heartbeat_interval=10)
self._connection = pika.BlockingConnection(parameter) # 建立連接
def put(self, message_str, queue_name, route_key, exchange=''):
if self._connection is None:
return
channel = self._connection.channel() # 獲取channel
channel.queue_declare(queue=queue_name) # 申明使用的queue
# 調(diào)用basic_publish方法向RabbitMQ發(fā)送數(shù)據(jù), 這個(gè)方法應(yīng)該只支持str類(lèi)型的數(shù)據(jù)
channel.basic_publish(
exchange=exchange, # 指定exchange
routing_key=route_key, # 指定路由
body=message_str # 具體發(fā)送的數(shù)據(jù)
)
def getting_start(self, queue_name):
if self._connection is None:
return
channel = self._connection.channel()
channel.queue_declare(queue=queue_name)
# 調(diào)用basic_consume方法,可以傳入一個(gè)回調(diào)函數(shù)
channel.basic_consume(self.callback,
queue=queue_name,
no_ack=True)
channel.start_consuming() # 相當(dāng)于run_forever(), 當(dāng)Queue中沒(méi)有數(shù)據(jù),則一直阻塞等待
@staticmethod
def callback(ch, method, properties, message_str):
"""定義一個(gè)回調(diào)函數(shù)"""
print "[x] Received {0}".format(message_str)
def close(self):
"""關(guān)閉RabbitMQ的連接"""
if self._connection is not None:
self._connection.close()