1,RabbitMQ的由來
RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue)的開源實(shí)現(xiàn)。AMQP 的出現(xiàn)其實(shí)也是應(yīng)了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標(biāo)準(zhǔn)(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業(yè)有一些商業(yè)實(shí)現(xiàn)(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯(lián)合制定了 AMQP 的公開標(biāo)準(zhǔn)。
RabbitMQ是由RabbitMQ Technologies Ltd開發(fā)并且提供商業(yè)支持的。該公司在2010年4月被SpringSource(VMWare的一個(gè)部門)收購。在2013年5月被并入Pivotal。其實(shí)VMWare,Pivotal和EMC本質(zhì)上是一家的。不同的是VMWare是獨(dú)立上市子公司,而Pivotal是整合了EMC的某些資源,現(xiàn)在并沒有上市。
RabbitMQ的官網(wǎng)是http://www.rabbitmq.com
2,RabbitMQ的應(yīng)用場景
RabbitMQ,或者說AMQP解決了什么問題,或者說它的應(yīng)用場景是什么?
對于一個(gè)大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上,但是還是有很多問題需要解決。比如:
1)信息的發(fā)送者和接收者如何維持這個(gè)連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個(gè)通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?
AMDQ協(xié)議解決了以上的問題,而RabbitMQ實(shí)現(xiàn)了AMQP。
3,系統(tǒng)架構(gòu)

- Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
- Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
- Queue:消息隊(duì)列載體,每個(gè)消息都會被投入到一個(gè)或多個(gè)隊(duì)列。
- Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
- Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
- vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
- producer:消息生產(chǎn)者,就是投遞消息的程序。
- consumer:消息消費(fèi)者,就是接受消息的程序。
- channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會話任務(wù)。
RabbitMQ支持多種開發(fā)語言,Python的RabbitMQ包:amqplib、txAMQP、pika
4,基本示例(queue)

- 發(fā)送端producer
# coding:utf8
import pika
# 建立一個(gè)實(shí)例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672) # 默認(rèn)端口5672,可不寫
)
# 聲明一個(gè)管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', # queue名字
body='Hello World!') # 消息內(nèi)容
print(" [x] Sent 'Hello World!'")
connection.close() # 隊(duì)列關(guān)閉
- 接收端consumer
# coding:utf8
import pika
import time
# 建立實(shí)例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
# 為什么又聲明了一個(gè)‘hello’隊(duì)列?
# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個(gè)機(jī)器先運(yùn)行,所以要聲明兩次。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): # 四個(gè)參數(shù)為標(biāo)準(zhǔn)格式
print(ch, method, properties) # 打印看一下是什么
# 管道內(nèi)存對象 內(nèi)容相關(guān)信息 后面講
print(" [x] Received %r" % body)
time.sleep(15)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費(fèi)消息
callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
queue='hello', # 你要從那個(gè)隊(duì)列里收消息
# no_ack=True # 寫的話,如果接收消息,機(jī)器宕機(jī)消息就丟了
# 一般不寫。宕機(jī)則生產(chǎn)者檢測到發(fā)給其他消費(fèi)者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費(fèi)消息
- 消費(fèi)者處理消息的上限控制
channel.basic_qos(prefetch_count=1) # 按能力分發(fā),如果有一個(gè)消息,就不再給消費(fèi)者分發(fā)(前提是no_ack=False)
- 消息持久化
# 在管道里聲明queue,每次聲明隊(duì)列的時(shí)候,都加上durable
channel.queue_declare(queue='hello2', durable=True)
# 發(fā)送端發(fā)送消息時(shí),加上properties
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
5,廣播模式(exchange)
前面的效果都是一對一發(fā),如果做一個(gè)廣播效果可不可以,這時(shí)候就要用到exchange了 。
exchange必須精確的知道收到的消息要發(fā)給誰。
exchange的類型決定了怎么處理, 類型有以下幾種:
- fanout: 所有綁定到此exchange的queue都可以接收消息
- direct: 通過routingKey和exchange決定的那個(gè)唯一的queue可以接收消息
- topic: 所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息
5.1,fanout 純廣播、all

需要queue和exchange綁定,因?yàn)橄M(fèi)者不是和exchange直連的,消費(fèi)者是連在queue上,queue綁定在exchange上,消費(fèi)者只會在queue里讀消息
發(fā)送端 publisher 發(fā)布、廣播
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
# 注意:這里是廣播,不需要聲明queue
channel.exchange_declare(exchange='logs', # 聲明廣播管道
type='fanout')
# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='', # 注意此處空,必須有
body=message)
print(" [x] Sent %r" % message)
connection.close()
- 接收端 subscriber 訂閱
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
# 不指定queue名字,rabbit會隨機(jī)分配一個(gè)名字,exclusive=True會在使用此queue的消費(fèi)者斷開后,自動將queue刪除
result = channel.queue_declare(exclusive=True)
# 獲取隨機(jī)的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)
channel.queue_bind(exchange='logs', # queue綁定到轉(zhuǎn)發(fā)器上
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
- 注意:廣播,是實(shí)時(shí)的,收不到就沒了,消息不會存下來,類似收音機(jī)
5.2,direct 有選擇的接收消息

接收者可以過濾消息,只收我想要的消息
發(fā)送端publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
# 重要程度級別,這里默認(rèn)定義為 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
- 接收端subscriber
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運(yùn)行腳本所有的參數(shù)
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 循環(huán)列表去綁定
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
- 測試
python direct_sonsumer.py info warning
python direct_sonsumer.py warning error
5.3,topic 更細(xì)致的過濾

- 比如把error中,apache和mysql的分別或取出來
- 發(fā)送端publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
- 接收端 subscriber
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
- 測試
需要補(bǔ)充
6,RPC實(shí)現(xiàn)
不知道你有沒有發(fā)現(xiàn),上面的流都是單向的,如果遠(yuǎn)程的機(jī)器執(zhí)行完返回結(jié)果,就實(shí)現(xiàn)不了了。
如果返回,這種模式叫什么呢,RPC(遠(yuǎn)程過程調(diào)用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎么返回呢?既是發(fā)送端又是接收端。
但是接收端返回消息怎么返回?可以發(fā)送到發(fā)過來的queue里么?不可以。
返回時(shí),再建立一個(gè)queue,把結(jié)果發(fā)送新的queue里
為了服務(wù)端返回的queue不寫死,在客戶端給服務(wù)端發(fā)指令的的時(shí)候,同時(shí)帶一條消息說,你結(jié)果返回給哪個(gè)queue
- RPC Client
import pika
import uuid
import time
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, # 只要一收到消息就調(diào)用on_response
no_ack=True,
queue=self.callback_queue) # 收這個(gè)queue的消息
def on_response(self, ch, method, props, body): # 必須四個(gè)參數(shù)
# 如果收到的ID和本機(jī)生成的相同,則返回的結(jié)果就是我想要的指令返回的結(jié)果
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None # 初始self.response為None
self.corr_id = str(uuid.uuid4()) # 隨機(jī)唯一字符串
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue', # 發(fā)消息到rpc_queue
properties=pika.BasicProperties( # 消息持久化
reply_to = self.callback_queue, # 讓服務(wù)端命令結(jié)果返回到callback_queue
correlation_id = self.corr_id, # 把隨機(jī)uuid同時(shí)發(fā)給服務(wù)器
),
body=str(n)
)
while self.response is None: # 當(dāng)沒有數(shù)據(jù),就一直循環(huán)
# 啟動后,on_response函數(shù)接到消息,self.response 值就不為空了
self.connection.process_data_events() # 非阻塞版的start_consuming()
# print("no msg……")
# time.sleep(0.5)
# 收到消息就調(diào)用on_response
return int(self.response)
if __name__ == '__main__':
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(7)")
response = fibonacci_rpc.call(7)
print(" [.] Got %r" % response)
- RPC Server
import pika
import time
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange='', # 把執(zhí)行結(jié)果發(fā)回給客戶端
routing_key=props.reply_to, # 客戶端要求返回想用的queue
# 返回客戶端發(fā)過來的correction_id 為了讓客戶端驗(yàn)證消息一致性
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response)
)
ch.basic_ack(delivery_tag = method.delivery_tag) # 任務(wù)完成,告訴客戶端
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue') # 聲明一個(gè)rpc_queue ,
channel.basic_qos(prefetch_count=1)
# 在rpc_queue里收消息,收到消息就調(diào)用on_request
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
7,參考頁面
rabitt mq
http://blog.csdn.net/fgf00/article/details/52872730
RabbitMQ從入門到精通
http://blog.csdn.net/column/details/rabbitmq.html