消息隊列的作用
消息隊列最早產(chǎn)生在金融領(lǐng)域,是為解決金融業(yè)務(wù)的IT系統(tǒng)中產(chǎn)生的一些問題而應(yīng)運而生的。隨著互聯(lián)網(wǎng)和電子商務(wù)的發(fā)展,消息隊列在不同行業(yè)、不同場景下得到了廣泛運用。消息隊列主要有能解決三個問題:
異步解耦
在分布式系統(tǒng)中,不同應(yīng)用之間的相互調(diào)用,如果采用同步的方式,請求發(fā)起方發(fā)起調(diào)用之后,接收調(diào)用方需要在處理完成之后,再同步地返回執(zhí)行結(jié)果給到調(diào)用方,在此過程請求發(fā)起方需要一直等待被調(diào)用方的反饋結(jié)果。這在處理時間較短的場景下,并沒有什么問題。但是一旦處理時間過長的話,就會導(dǎo)致請求方程序一直阻塞,不能處理后續(xù)流程。故而需要引入一個中間人,就像一個郵筒,來收發(fā)信件,而不必發(fā)件人和收件人直接交換信息。消息隊列就這樣應(yīng)運而生了,消息隊列利用發(fā)布-訂閱模式工作,消息發(fā)送者發(fā)布消息,一個或者多個消息接受者訂閱消息。消息發(fā)送者是消息源,在對消息進行處理后將消息發(fā)送至分布式消息隊列,消息接受者從非分布式消息隊列獲取該消息后繼續(xù)進行處理。可以看到,消息發(fā)送者和消息接受者直接沒有直接的耦合關(guān)系。
流量削峰填谷
無論在互聯(lián)網(wǎng)還是傳統(tǒng)行業(yè)中,IT系統(tǒng)的訪問量、業(yè)務(wù)請求與時間的關(guān)系都不是均勻分布的。比如每年的“6.18”和“雙十一”,或者12306的春運車票發(fā)售日,系統(tǒng)的請求都會迎來高峰。或者因為某些促銷活動、熱點事件,也會導(dǎo)致系統(tǒng)達到峰值。但是系統(tǒng)的沒個應(yīng)用程序的單個節(jié)點的處理能力是有限的,不能在短期內(nèi)處理所有請求。這種問題其實在現(xiàn)實生活中也很常見,比如超市促銷時,客流激增,商家會組織顧客排隊購買、排隊支付。其實消息隊列,也可以看做這種排隊的思想的引入。通過將大量請求放入消息隊列,進行排隊,讓消費者挨個處理請求,而不超出消費者程序的承載上限,較好地削平了流量高峰,在請求量低的時候又把之前積壓的請求繼續(xù)處理。它就像一根可以穩(wěn)定壓力的,讓水流均勻流動的水管,從而保證整個系統(tǒng)中的運轉(zhuǎn)效率保持均勻穩(wěn)定。
RPC(Remote Procedure Call)
RPC即所謂遠程過程調(diào)用,前面將消息隊列來實現(xiàn)異步解耦的時候已經(jīng)說明,它可以做不同應(yīng)用程序相互調(diào)用的“中間人”。但作RPC時就還必須考慮到,消費方接收到消息,并且處理完該消息包含的請求后,還必須有個應(yīng)答的過程。所以,消息生產(chǎn)者(請求方)在消息頭里必須指定該條消息的編號(correlation_id)和消費者(接收請求方)應(yīng)答消息應(yīng)該寫入哪個隊列(如圖,reply_to信息),消息消費者在處理完之后,它又會轉(zhuǎn)換成一個消息生產(chǎn)者的角色,向消息隊列指定的隊列中寫入一調(diào)帶correlation_id的應(yīng)答消息。當RPC請求方接收到這條消息時,就知道編號correlation_id消息包含的調(diào)用已完成。

AMQP的幾個概念

Product(生產(chǎn)者):
將消息的生產(chǎn)(發(fā)生)出來的程序
Consumer(消費者):
等待接收消息的程序
Broker(消息中間件):
將消息由生產(chǎn)者傳遞給消費者的中介軟件,就是我們常說的消息中間件(MQ)。包括知名的RabbitMQ、RacketMQ、ZeroMQ、Kafka、Redis(NOSQL,也用作消息中間件)。
Virtual host(虛擬機):
考慮到多租戶情境下的安全因素,對broker進行虛擬的資源劃分,類似于VLAN。同一個消息中間件,可以劃分出多個vhost,供不同用戶在其中創(chuàng)建使用exchange和Queue,以保持數(shù)據(jù)的隔離性和安全性。
Channel(管道):
應(yīng)用程序?qū)roker的訪問,必須先建立起連接,但是應(yīng)用程序需要發(fā)送/接收大量消息,如果每次發(fā)生/接收消息的操作都去建立一次連接,那么對部署broker的機器資源的開銷是巨大的。所以必須做到多次訪問對一次連接的復(fù)用,這就需要應(yīng)用程序用多線程的方式訪問broker。同時還得對每一次消息收發(fā),能保證數(shù)據(jù)的隔離性,那就必須由broker對消息傳遞的渠道做一定的分隔。channel就是為了實現(xiàn)這個目的而產(chǎn)生的,相當于在broker內(nèi)部對一個connection做了切分,實現(xiàn)了多個輕量級的connection,以極大地節(jié)約系統(tǒng)開銷。
Exchange (路由器)
負責(zé)分發(fā)消息的組件,相當于網(wǎng)絡(luò)中的路由器。message進入broker首先由Exchange處理,根據(jù)分發(fā)規(guī)則,查詢表中的routing key,來講消息分發(fā)到對應(yīng)的Queue中去。常用的分發(fā)規(guī)則有direct(point to point),topic(publish-subscribe)and fanout(mutilcast)。
Queue(隊列)
消息隊列,消息真正被寫入和讀取的地方,一個message可以拷貝到多個Queue上去。
Binding(路由規(guī)則)
exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用于message的分發(fā)依據(jù)。
Message(消息)
分為消息頭和消息體,消息頭主要記錄供broker處理分發(fā)的信息,消息體才是真正被消費者需要的信息。
RabbitMQ的幾大特性
RabbitMQ作為AMQP的一個實現(xiàn),基本遵循了上述AMQP的一些機制。主要具備以下特性,篇幅考慮,這里只做簡單列舉,日后再詳細敘述。
消息確認機制
隊列持久化
消息持久化
消息的拒收
RabbitMQ的安裝和配置
以CentOS 7 為例
安裝
erlangy與rabbitMQ版本對照關(guān)系表
$sudo yum install erlang -y
$sudo yum install rabbitmq-server-3.7.4-1.el7.noarch.rpm
#開啟管理后臺插件(可選,默認端口15672)
$sudo rabbitmq-plugins enble rabbitmq_management
#啟動
$sudo rabbitmq-server
配置
#創(chuàng)建管理員用戶
$sudo rabbitmqctl add_user user_admin passwd_admin
#賦予其administrator角色
$sudo rabbitmqctl set_user_tags user_admin administrator
#創(chuàng)建RabbitMQ監(jiān)控用戶,負責(zé)整個MQ的監(jiān)控
$sudo rabbitmqctl add_user user_monitoring passwd_monitor
#賦予其monitoring角色
$sudo rabbitmqctl set_user_tags user_monitoring monitoring
#創(chuàng)建某個項目的專用用戶,只能訪問項目自己的virtual hosts
$sudo rabbitmqctl add_user user_proj passwd_proj
#查看所有用戶
$sudo rabbitmqctl list_users
pika的使用
pika是RabbitMQ官方文檔推薦的Python客戶端,如果我們采用Python編寫使用RabbitMQ的應(yīng)用程序,我們最好應(yīng)該采用pika來實現(xiàn)。
pika使用簡單示例
生產(chǎn)者程序示例:sendmq.py
#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('czq', 'czq')
Parameter = pika.ConnectionParameters('127.0.0.1',5672,'/',credentials,heartbeat_interval=0)
connection = pika.BlockingConnection(Parameter)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消費者程序示例:receviemq.py
import pika
credentials = pika.PlainCredentials('czq', 'czq')
Parameter = pika.ConnectionParameters('127.0.0.1',5672,'/',credentials,heartbeat_interval=0)
connection = pika.BlockingConnection(Parameter)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
參考資料
[1]RabbitMQ官方文檔:
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
[2]pika官方文檔:
http://pika.readthedocs.io/en/0.10.0/modules/channel.html
[3]RabbitMQ與AMQP協(xié)議詳解
https://www.cnblogs.com/frankyou/p/5283539.html
[4]李智慧,《大型網(wǎng)站技術(shù)架構(gòu):核心原理與案例分析》,電子工業(yè)出版社,2013年9月1日
[5] RabbitMQ在線模擬學(xué)習(xí)
http://tryrabbitmq.com/