RabbitMQ入門學(xué)習(xí)2 - 應(yīng)用示例

1,RabbitMQ的由來

RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue)的開源實(shí)現(xiàn)。AMQP 的出現(xiàn)其實(shí)也是應(yīng)了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標(biāo)準(zhǔn)(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業(yè)有一些商業(yè)實(shí)現(xiàn)(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯(lián)合制定了 AMQP 的公開標(biāo)準(zhǔn)。

RabbitMQ是由RabbitMQ Technologies Ltd開發(fā)并且提供商業(yè)支持的。該公司在2010年4月被SpringSource(VMWare的一個(gè)部門)收購。在2013年5月被并入Pivotal。其實(shí)VMWare,Pivotal和EMC本質(zhì)上是一家的。不同的是VMWare是獨(dú)立上市子公司,而Pivotal是整合了EMC的某些資源,現(xiàn)在并沒有上市。
RabbitMQ的官網(wǎng)是http://www.rabbitmq.com

2,RabbitMQ的應(yīng)用場景

RabbitMQ,或者說AMQP解決了什么問題,或者說它的應(yīng)用場景是什么?

對于一個(gè)大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上,但是還是有很多問題需要解決。比如:
1)信息的發(fā)送者和接收者如何維持這個(gè)連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個(gè)通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?

AMDQ協(xié)議解決了以上的問題,而RabbitMQ實(shí)現(xiàn)了AMQP。

3,系統(tǒng)架構(gòu)

RabbitMQ數(shù)據(jù)流.jpeg
  • Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
  • Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
  • Queue:消息隊(duì)列載體,每個(gè)消息都會被投入到一個(gè)或多個(gè)隊(duì)列。
  • Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
  • Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
  • vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
  • producer:消息生產(chǎn)者,就是投遞消息的程序。
  • consumer:消息消費(fèi)者,就是接受消息的程序。
  • channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會話任務(wù)。

RabbitMQ支持多種開發(fā)語言,Python的RabbitMQ包:amqplib、txAMQP、pika

4,基本示例(queue)

單發(fā)送多接收.png
  • 發(fā)送端producer
# coding:utf8
import pika

# 建立一個(gè)實(shí)例
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默認(rèn)端口5672,可不寫
    )
# 聲明一個(gè)管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue名字
                      body='Hello World!')  # 消息內(nèi)容
print(" [x] Sent 'Hello World!'")
connection.close()  # 隊(duì)列關(guān)閉
  • 接收端consumer
# coding:utf8
import pika
import time

# 建立實(shí)例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

# 為什么又聲明了一個(gè)‘hello’隊(duì)列?
# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個(gè)機(jī)器先運(yùn)行,所以要聲明兩次。
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  # 四個(gè)參數(shù)為標(biāo)準(zhǔn)格式
    print(ch, method, properties)  # 打印看一下是什么
    # 管道內(nèi)存對象  內(nèi)容相關(guān)信息  后面講
    print(" [x] Received %r" % body)
    time.sleep(15)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 告訴生成者,消息處理完成

channel.basic_consume(  # 消費(fèi)消息
        callback,  # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
        queue='hello',  # 你要從那個(gè)隊(duì)列里收消息
        # no_ack=True  # 寫的話,如果接收消息,機(jī)器宕機(jī)消息就丟了
        # 一般不寫。宕機(jī)則生產(chǎn)者檢測到發(fā)給其他消費(fèi)者
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 開始消費(fèi)消息
  • 消費(fèi)者處理消息的上限控制
channel.basic_qos(prefetch_count=1)  # 按能力分發(fā),如果有一個(gè)消息,就不再給消費(fèi)者分發(fā)(前提是no_ack=False)
  • 消息持久化
# 在管道里聲明queue,每次聲明隊(duì)列的時(shí)候,都加上durable
channel.queue_declare(queue='hello2', durable=True)

# 發(fā)送端發(fā)送消息時(shí),加上properties
properties=pika.BasicProperties(
    delivery_mode=2,  # 消息持久化
    )

5,廣播模式(exchange)

前面的效果都是一對一發(fā),如果做一個(gè)廣播效果可不可以,這時(shí)候就要用到exchange了 。
exchange必須精確的知道收到的消息要發(fā)給誰。

exchange的類型決定了怎么處理, 類型有以下幾種:

  • fanout: 所有綁定到此exchange的queue都可以接收消息
  • direct: 通過routingKey和exchange決定的那個(gè)唯一的queue可以接收消息
  • topic: 所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息

5.1,fanout 純廣播、all

Exchange fanout.png
  • 需要queue和exchange綁定,因?yàn)橄M(fèi)者不是和exchange直連的,消費(fèi)者是連在queue上,queue綁定在exchange上,消費(fèi)者只會在queue里讀消息

  • 發(fā)送端 publisher 發(fā)布、廣播

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
# 注意:這里是廣播,不需要聲明queue
channel.exchange_declare(exchange='logs',  # 聲明廣播管道
                         type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',  # 注意此處空,必須有
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
  • 接收端 subscriber 訂閱
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')
# 不指定queue名字,rabbit會隨機(jī)分配一個(gè)名字,exclusive=True會在使用此queue的消費(fèi)者斷開后,自動將queue刪除
result = channel.queue_declare(exclusive=True)
# 獲取隨機(jī)的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)

channel.queue_bind(exchange='logs',  # queue綁定到轉(zhuǎn)發(fā)器上
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
  • 注意:廣播,是實(shí)時(shí)的,收不到就沒了,消息不會存下來,類似收音機(jī)

5.2,direct 有選擇的接收消息

Exchange direct.png
  • 接收者可以過濾消息,只收我想要的消息

  • 發(fā)送端publisher

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
# 重要程度級別,這里默認(rèn)定義為 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
  • 接收端subscriber
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運(yùn)行腳本所有的參數(shù)
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
# 循環(huán)列表去綁定
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
  • 測試
python direct_sonsumer.py info warning 
python direct_sonsumer.py warning error

5.3,topic 更細(xì)致的過濾

Exchange topic.png
  • 比如把error中,apache和mysql的分別或取出來
  • 發(fā)送端publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
  • 接收端 subscriber
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
  • 測試
需要補(bǔ)充

6,RPC實(shí)現(xiàn)

不知道你有沒有發(fā)現(xiàn),上面的流都是單向的,如果遠(yuǎn)程的機(jī)器執(zhí)行完返回結(jié)果,就實(shí)現(xiàn)不了了。
如果返回,這種模式叫什么呢,RPC(遠(yuǎn)程過程調(diào)用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎么返回呢?既是發(fā)送端又是接收端。
但是接收端返回消息怎么返回?可以發(fā)送到發(fā)過來的queue里么?不可以。
返回時(shí),再建立一個(gè)queue,把結(jié)果發(fā)送新的queue里
為了服務(wù)端返回的queue不寫死,在客戶端給服務(wù)端發(fā)指令的的時(shí)候,同時(shí)帶一條消息說,你結(jié)果返回給哪個(gè)queue

  • RPC Client
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,  # 只要一收到消息就調(diào)用on_response
                                   no_ack=True,
                                   queue=self.callback_queue)  # 收這個(gè)queue的消息

    def on_response(self, ch, method, props, body):  # 必須四個(gè)參數(shù)
        # 如果收到的ID和本機(jī)生成的相同,則返回的結(jié)果就是我想要的指令返回的結(jié)果
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None  # 初始self.response為None
        self.corr_id = str(uuid.uuid4())  # 隨機(jī)唯一字符串
        self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',  # 發(fā)消息到rpc_queue
                properties=pika.BasicProperties(  # 消息持久化
                    reply_to = self.callback_queue,  # 讓服務(wù)端命令結(jié)果返回到callback_queue
                    correlation_id = self.corr_id,  # 把隨機(jī)uuid同時(shí)發(fā)給服務(wù)器
                ),
                body=str(n)
        )
        while self.response is None:  # 當(dāng)沒有數(shù)據(jù),就一直循環(huán)
            # 啟動后,on_response函數(shù)接到消息,self.response 值就不為空了
            self.connection.process_data_events()  # 非阻塞版的start_consuming()
            # print("no msg……")
            # time.sleep(0.5)
        # 收到消息就調(diào)用on_response
        return int(self.response)

if __name__ == '__main__':
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(7)")
    response = fibonacci_rpc.call(7)
    print(" [.] Got %r" % response)
  • RPC Server
import pika
import time

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(
            exchange='',  # 把執(zhí)行結(jié)果發(fā)回給客戶端
            routing_key=props.reply_to,  # 客戶端要求返回想用的queue
            # 返回客戶端發(fā)過來的correction_id 為了讓客戶端驗(yàn)證消息一致性
            properties=pika.BasicProperties(correlation_id = props.correlation_id),
            body=str(response)
    )
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 任務(wù)完成,告訴客戶端

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')  # 聲明一個(gè)rpc_queue ,

    channel.basic_qos(prefetch_count=1)
    # 在rpc_queue里收消息,收到消息就調(diào)用on_request
    channel.basic_consume(on_request, queue='rpc_queue')
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

7,參考頁面

rabitt mq
http://blog.csdn.net/fgf00/article/details/52872730

RabbitMQ從入門到精通
http://blog.csdn.net/column/details/rabbitmq.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,590評論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,236評論 3 51
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,379評論 0 11
  • RabbitMQ詳解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七閱讀 2,626評論 0 9

友情鏈接更多精彩內(nèi)容