RabbitMQ的Python客戶端pika使用調(diào)研

消息隊列的作用

消息隊列最早產(chǎn)生在金融領(lǐng)域,是為解決金融業(yè)務(wù)的IT系統(tǒng)中產(chǎn)生的一些問題而應(yīng)運而生的。隨著互聯(lián)網(wǎng)和電子商務(wù)的發(fā)展,消息隊列在不同行業(yè)、不同場景下得到了廣泛運用。消息隊列主要有能解決三個問題:

異步解耦

在分布式系統(tǒng)中,不同應(yīng)用之間的相互調(diào)用,如果采用同步的方式,請求發(fā)起方發(fā)起調(diào)用之后,接收調(diào)用方需要在處理完成之后,再同步地返回執(zhí)行結(jié)果給到調(diào)用方,在此過程請求發(fā)起方需要一直等待被調(diào)用方的反饋結(jié)果。這在處理時間較短的場景下,并沒有什么問題。但是一旦處理時間過長的話,就會導(dǎo)致請求方程序一直阻塞,不能處理后續(xù)流程。故而需要引入一個中間人,就像一個郵筒,來收發(fā)信件,而不必發(fā)件人和收件人直接交換信息。消息隊列就這樣應(yīng)運而生了,消息隊列利用發(fā)布-訂閱模式工作,消息發(fā)送者發(fā)布消息,一個或者多個消息接受者訂閱消息。消息發(fā)送者是消息源,在對消息進行處理后將消息發(fā)送至分布式消息隊列,消息接受者從非分布式消息隊列獲取該消息后繼續(xù)進行處理。可以看到,消息發(fā)送者和消息接受者直接沒有直接的耦合關(guān)系。

流量削峰填谷

無論在互聯(lián)網(wǎng)還是傳統(tǒng)行業(yè)中,IT系統(tǒng)的訪問量、業(yè)務(wù)請求與時間的關(guān)系都不是均勻分布的。比如每年的“6.18”和“雙十一”,或者12306的春運車票發(fā)售日,系統(tǒng)的請求都會迎來高峰。或者因為某些促銷活動、熱點事件,也會導(dǎo)致系統(tǒng)達到峰值。但是系統(tǒng)的沒個應(yīng)用程序的單個節(jié)點的處理能力是有限的,不能在短期內(nèi)處理所有請求。這種問題其實在現(xiàn)實生活中也很常見,比如超市促銷時,客流激增,商家會組織顧客排隊購買、排隊支付。其實消息隊列,也可以看做這種排隊的思想的引入。通過將大量請求放入消息隊列,進行排隊,讓消費者挨個處理請求,而不超出消費者程序的承載上限,較好地削平了流量高峰,在請求量低的時候又把之前積壓的請求繼續(xù)處理。它就像一根可以穩(wěn)定壓力的,讓水流均勻流動的水管,從而保證整個系統(tǒng)中的運轉(zhuǎn)效率保持均勻穩(wěn)定。

RPC(Remote Procedure Call)

RPC即所謂遠程過程調(diào)用,前面將消息隊列來實現(xiàn)異步解耦的時候已經(jīng)說明,它可以做不同應(yīng)用程序相互調(diào)用的“中間人”。但作RPC時就還必須考慮到,消費方接收到消息,并且處理完該消息包含的請求后,還必須有個應(yīng)答的過程。所以,消息生產(chǎn)者(請求方)在消息頭里必須指定該條消息的編號(correlation_id)和消費者(接收請求方)應(yīng)答消息應(yīng)該寫入哪個隊列(如圖,reply_to信息),消息消費者在處理完之后,它又會轉(zhuǎn)換成一個消息生產(chǎn)者的角色,向消息隊列指定的隊列中寫入一調(diào)帶correlation_id的應(yīng)答消息。當RPC請求方接收到這條消息時,就知道編號correlation_id消息包含的調(diào)用已完成。


image

AMQP的幾個概念

image.png

Product(生產(chǎn)者):

將消息的生產(chǎn)(發(fā)生)出來的程序

Consumer(消費者):

等待接收消息的程序

Broker(消息中間件):

將消息由生產(chǎn)者傳遞給消費者的中介軟件,就是我們常說的消息中間件(MQ)。包括知名的RabbitMQ、RacketMQ、ZeroMQ、Kafka、Redis(NOSQL,也用作消息中間件)。

Virtual host(虛擬機):

考慮到多租戶情境下的安全因素,對broker進行虛擬的資源劃分,類似于VLAN。同一個消息中間件,可以劃分出多個vhost,供不同用戶在其中創(chuàng)建使用exchange和Queue,以保持數(shù)據(jù)的隔離性和安全性。

Channel(管道):

應(yīng)用程序?qū)roker的訪問,必須先建立起連接,但是應(yīng)用程序需要發(fā)送/接收大量消息,如果每次發(fā)生/接收消息的操作都去建立一次連接,那么對部署broker的機器資源的開銷是巨大的。所以必須做到多次訪問對一次連接的復(fù)用,這就需要應(yīng)用程序用多線程的方式訪問broker。同時還得對每一次消息收發(fā),能保證數(shù)據(jù)的隔離性,那就必須由broker對消息傳遞的渠道做一定的分隔。channel就是為了實現(xiàn)這個目的而產(chǎn)生的,相當于在broker內(nèi)部對一個connection做了切分,實現(xiàn)了多個輕量級的connection,以極大地節(jié)約系統(tǒng)開銷。

Exchange (路由器)

負責(zé)分發(fā)消息的組件,相當于網(wǎng)絡(luò)中的路由器。message進入broker首先由Exchange處理,根據(jù)分發(fā)規(guī)則,查詢表中的routing key,來講消息分發(fā)到對應(yīng)的Queue中去。常用的分發(fā)規(guī)則有direct(point to point),topic(publish-subscribe)and fanout(mutilcast)。

Queue(隊列)

消息隊列,消息真正被寫入和讀取的地方,一個message可以拷貝到多個Queue上去。

Binding(路由規(guī)則)

exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用于message的分發(fā)依據(jù)。

Message(消息)

分為消息頭和消息體,消息頭主要記錄供broker處理分發(fā)的信息,消息體才是真正被消費者需要的信息。

RabbitMQ的幾大特性

RabbitMQ作為AMQP的一個實現(xiàn),基本遵循了上述AMQP的一些機制。主要具備以下特性,篇幅考慮,這里只做簡單列舉,日后再詳細敘述。
消息確認機制
隊列持久化
消息持久化
消息的拒收

RabbitMQ的安裝和配置

以CentOS 7 為例

安裝
erlangy與rabbitMQ版本對照關(guān)系表

$sudo yum install erlang -y
$sudo yum install rabbitmq-server-3.7.4-1.el7.noarch.rpm
#開啟管理后臺插件(可選,默認端口15672)
$sudo rabbitmq-plugins enble rabbitmq_management
#啟動
$sudo rabbitmq-server

配置

#創(chuàng)建管理員用戶
$sudo rabbitmqctl add_user  user_admin  passwd_admin
#賦予其administrator角色
$sudo rabbitmqctl set_user_tags user_admin administrator
#創(chuàng)建RabbitMQ監(jiān)控用戶,負責(zé)整個MQ的監(jiān)控
$sudo rabbitmqctl add_user  user_monitoring  passwd_monitor
#賦予其monitoring角色
$sudo rabbitmqctl set_user_tags user_monitoring monitoring
#創(chuàng)建某個項目的專用用戶,只能訪問項目自己的virtual hosts
$sudo rabbitmqctl  add_user  user_proj  passwd_proj
#查看所有用戶
$sudo rabbitmqctl list_users

pika的使用

pika是RabbitMQ官方文檔推薦的Python客戶端,如果我們采用Python編寫使用RabbitMQ的應(yīng)用程序,我們最好應(yīng)該采用pika來實現(xiàn)。

pika使用簡單示例

生產(chǎn)者程序示例:sendmq.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials('czq', 'czq')
Parameter = pika.ConnectionParameters('127.0.0.1',5672,'/',credentials,heartbeat_interval=0)
connection = pika.BlockingConnection(Parameter)
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消費者程序示例:receviemq.py

import pika

credentials = pika.PlainCredentials('czq', 'czq')
Parameter = pika.ConnectionParameters('127.0.0.1',5672,'/',credentials,heartbeat_interval=0)
connection = pika.BlockingConnection(Parameter)
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

參考資料

[1]RabbitMQ官方文檔:
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
[2]pika官方文檔:
http://pika.readthedocs.io/en/0.10.0/modules/channel.html
[3]RabbitMQ與AMQP協(xié)議詳解
https://www.cnblogs.com/frankyou/p/5283539.html
[4]李智慧,《大型網(wǎng)站技術(shù)架構(gòu):核心原理與案例分析》,電子工業(yè)出版社,2013年9月1日
[5] RabbitMQ在線模擬學(xué)習(xí)
http://tryrabbitmq.com/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評論 19 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,211評論 2 11
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,417評論 0 1
  • 本文大綱 RabbitMQ 歷史 RabbitMQ 應(yīng)用場景 RabbitMQ 系統(tǒng)架構(gòu) RabbitMQ 基本概...
    Java_Explorer閱讀 16,804評論 1 40

友情鏈接更多精彩內(nèi)容