rabbitmq
本文算是實(shí)現(xiàn)對(duì)入門(mén)教程的 java版本翻譯吧。本文中演示代碼地址
1. 安裝
先安裝 erlang (安裝網(wǎng)上提供的教程安裝erlang)
在安裝 rabbitmq-server
下載rabbitmq的安裝包的時(shí)候選擇 tar.xz 直接解壓就可以了啟動(dòng)/停止
#啟動(dòng)rabbitmq
#進(jìn)入安裝目錄的sbin 目錄,執(zhí)行
./rabbitmq-server -detached
#關(guān)閉rabbitmq
./rabbitmqctl stop
2.用戶權(quán)限
guest/guest 現(xiàn)在只能在localhost使用,不能遠(yuǎn)程使用
需要添加用戶和權(quán)限
例如:
####添加用戶:
$sudo rabbitmqctl add_user user_admin passwd_admin
#####修改角色為 administrator:
$sudo rabbitmqctl set_user_tags user_admin administrator
上面的操作有了,可能還是為在日志中提示 user_admin在 vhost '/' 權(quán)限不夠
則執(zhí)行下面的操作:
######修改權(quán)限
$sudo rabbitmqctl set_permissions -p / user_admin '.*' '.*' '.*'
3.rabbitmq 的使用
3.1 工作隊(duì)列(workqueues) 模式
注意的點(diǎn):
- 工作隊(duì)列中的消息被所有的消費(fèi)者共享。
- rabbitmq在路由消息到消費(fèi)者的時(shí)候使用輪詢(round-robin)的方式,找到第n個(gè)消費(fèi)者來(lái)消費(fèi)消息
- ack(ackownledgment)機(jī)制確保消息被消費(fèi),不出現(xiàn)消息沒(méi)有消費(fèi)就從內(nèi)存中刪除的情況
3.1.1 ack機(jī)制(Message acknowledgment)
RabbitMQ 支持消息確認(rèn),當(dāng)消費(fèi)者(Consumer)接受到消息并且處理完成之后,回給RabbitMQ 發(fā)送一個(gè)確認(rèn)消息,Rabbitmq這個(gè)時(shí)候可以隨意的刪除這個(gè)消息
如果消費(fèi)者die ,但是沒(méi)有發(fā)送確認(rèn)消息,這個(gè)時(shí)候RabbitMQ會(huì)認(rèn)為消息沒(méi)有完全處理,這個(gè)時(shí)候會(huì)這個(gè)消息重新放到隊(duì)列之中,使用輪詢的方式分配給其他的消費(fèi)者消費(fèi)
RabbitMQ 默認(rèn)開(kāi)啟了確認(rèn)機(jī)制,使用的時(shí)候設(shè)置 autoAck = true 來(lái)關(guān)閉確認(rèn)機(jī)制,設(shè)置autoAck = false 則開(kāi)啟
3.1.2 消息持久化(Message durability)
啟動(dòng)消息持久化,需要在申明channel的時(shí)候設(shè)置持久化屬性為true
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
一個(gè)確定了是否持久化的隊(duì)列,不能再修改durable的值
設(shè)置隊(duì)列為持久化隊(duì)列之后,需要設(shè)置下消息的屬性,例如設(shè)置屬性為MessageProperties (實(shí)現(xiàn)了 BasicProperties)
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
使用持久化消息并不能完全保證消息的持久化,應(yīng)為消息可能先保存在緩存,后面保存到硬盤(pán)上,不過(guò)對(duì)于一般的task是完全足夠的,如果想要確保完全的持久化,可以結(jié)合 publisher confirms 來(lái)實(shí)現(xiàn)
3.1.3 公平分配(Fair dispatch)
通過(guò)設(shè)置prefetchCount 的值,控制一個(gè)消費(fèi)者接受的任務(wù)數(shù)量
int prefetchCount = 1;
channel.basicQos(prefetchCount);
如上面的設(shè)置,當(dāng)一個(gè)消費(fèi)者還有沒(méi)有處理完的任務(wù)的時(shí)候,rabbitmq不會(huì)分配任務(wù)給他
開(kāi)啟管理功能
rabbitmq-plugins enable rabbitmq_management
web管理界面: http://server-name:15672/
3.2 發(fā)布訂閱(Publish/Subscribe)模式
使用發(fā)布訂閱模式,一條消息會(huì)被發(fā)送給多個(gè)消費(fèi)者消費(fèi)
使用示例說(shuō)明:
創(chuàng)建一個(gè)日志的生產(chǎn)者 EmitLog 來(lái)發(fā)布日志,使用多個(gè)日志接受者ReceiveLogs
創(chuàng)建兩個(gè)接受者,一個(gè)把日志往硬盤(pán)上面寫(xiě),一個(gè)把日志打印到顯示器。整個(gè)過(guò)程發(fā)布日志
的只有一個(gè)。日志被廣播到多個(gè)接受者
1.1 交換(Exchanges)
在rabbitmq的消息模型中
生產(chǎn)者-->發(fā)送消息的一端
隊(duì)列--> 消息緩沖區(qū)
消費(fèi)者-->消費(fèi)消息的一端
rabbitmq的核心思想中,生產(chǎn)者通常不直接發(fā)送消息給一個(gè)隊(duì)列,事實(shí)上,
生產(chǎn)者也不知道把消息發(fā)送給哪個(gè)隊(duì)列
實(shí)際上,生成這是把消息發(fā)送給一個(gè)Exchange,這個(gè)Exchange一方面從生產(chǎn)者那邊接受消息,
一方面推送消息給隊(duì)列(queue),exchange清楚的知道自己接受的消息要怎么處理——是追加到
一個(gè)指定的隊(duì)列?還是追加到多個(gè)隊(duì)列?還是直接丟棄?具體做法要根據(jù)exchange的類型(type)
來(lái)定
可以通過(guò)下面的指令列出exchange的類型
./rabbitmqctl list_exchanges
exchange的類型:
- direct
- topic
- headers
- fanout
之前的例子中沒(méi)有使用exchange是因?yàn)槲覀兪褂昧四J(rèn)的exchange 即使用空字符串"" 來(lái)定義的
(默認(rèn)exchange的消息會(huì)被發(fā)送到指明的routingKey的隊(duì)列中)
channel.basicPublish("", "hello", null, message.getBytes());
現(xiàn)在創(chuàng)建一個(gè)exchange并發(fā)布
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish( "logs", "", null, message.getBytes());
1.2 臨時(shí)隊(duì)列(Temporary queues)
使用無(wú)參的 queueDeclare() 方法可以創(chuàng)建一個(gè)非持久化的、獨(dú)立的、自動(dòng)刪除的、名稱隨機(jī)的一個(gè)
隊(duì)列。例如:產(chǎn)生一個(gè)名為 amq.gen-JzTY20BRgKO-HjmUJj0wLg 的隊(duì)列
String queueName = channel.queueDeclare().getQueue();
1.3 綁定(Bindings)
之前說(shuō)過(guò),消息分配到哪個(gè)隊(duì)列是有exchange來(lái)處理的,那么要確定消息去哪兒,就需要明確隊(duì)列和exchange
的關(guān)系。這個(gè)過(guò)程稱為binding
channel.queueBind(queueName, "logs", "");
3.3 路由(Routing)
之前的發(fā)布訂閱示例中,我們使用的是廣播消息,所有的接受者都能接受。使用routing之后
能夠讓接收者只接收消息的一個(gè)子集。例如之前的示例中,只有錯(cuò)誤級(jí)別的日志寫(xiě)到硬盤(pán),所有的
的錯(cuò)誤全部打印
1.bindings
在綁定的過(guò)程中加上 routingKey,為了同 basic_publish 區(qū)分,我們稱之為 binding key
channel.queueBind(queueName, EXCHANGE_NAME, "black");
2. Direct exchange
fanout exchange :只適合無(wú)腦的廣播模式
direct exchange :消息會(huì)被分配到與消費(fèi)的routing key 對(duì)應(yīng)的 binding key的隊(duì)列上,匹配不上的
消息直接丟棄

如圖:direct 的 exchange X 關(guān)聯(lián)了兩個(gè)隊(duì)列 Q1、Q2,Q1的 binding key=orange
,Q2的綁定了兩個(gè)key black 和 green
在這種情況下:
routing key=orange的消息會(huì)被 exchange X 傳遞給 Q1
routing key=black 或者 routing key=green 的消息會(huì)被 exchange X 傳遞給 Q2
3. 多綁定 (Multiple bindings)

如圖,對(duì)direct exchange 同時(shí)使用 binding key =black 綁定Q1、Q2,這個(gè)時(shí)候
direct的exchange就同之前的fanout的一樣了,直接廣播消息了
4. 測(cè)試
測(cè)試時(shí)候一個(gè)開(kāi)兩個(gè)接收者,一個(gè)設(shè)置級(jí)別為 error ,一個(gè)設(shè)置為 info,error
然后開(kāi)兩個(gè)生產(chǎn)者 分別發(fā)送 error 級(jí)別的消息和info級(jí)別的消息
觀察接收情況
3.4 主題(Topic)模式
在前面的日志例子中,我們可以通過(guò)級(jí)別來(lái)區(qū)分日志,但是我們還想通過(guò)日志來(lái)源來(lái)區(qū)分日志
就像unix tool 中的syslog,他能通過(guò) 級(jí)別(info/warn/error)和 來(lái)源(auth/cron/kern)來(lái)
路由日志
例如:我們只處理來(lái)自“cron” 的 “error”級(jí)別的日志,同時(shí)打印來(lái)自“kern”的所有級(jí)別的
日志
要實(shí)現(xiàn)這種要,我們就需要使用 topic exchange
1. Topic exchange
topic exchange 的 routing key必須是 一個(gè)或者英文單詞,中間使用點(diǎn)隔開(kāi)的方式,
最大長(zhǎng)度是255字節(jié)
binding key 必須是相同格式,topic exchange 的邏輯和direct 很相似,消息只會(huì)分配到匹配的binding key的隊(duì)列
- *(星) 代表一個(gè)單詞
- #(哈希)代表零個(gè)或者多個(gè)單詞
例如:

在這個(gè)例子中,接收動(dòng)物消息,消息會(huì)被發(fā)送到一個(gè)擁有三個(gè)單詞(兩個(gè)點(diǎn)隔開(kāi))的 routing key
格式為 <speed>.<color>.<species>
Q1綁定了 binding key“ *.orange.* ”, Q2 綁定了 “ *.*.rabbit” 和 “l(fā)azy.#”
Q1只對(duì)顏色為 orange的消息感興趣
Q2關(guān)心所有的rabbit 和 lazy的消息
routing key 為 quick.orange.rabbit既會(huì)被發(fā)送到Q1 也會(huì)被發(fā)送到 Q2
lazy.orange.elephant --> Q1,Q2
quick.orange.fox --> Q1
lazy.brown.fox --> Q2
lazy.pink.rabbit --> Q2(有兩個(gè)binding key 匹配的情況下依然只會(huì)被發(fā)送一次)
quick.brown.fox --> 匹配不上,直接丟棄
lazy.orange.male.rabbit -->Q2 (匹配上了"lazy.#")
當(dāng)不使用 * 或者 # 站位符的時(shí)候,topic exchange表現(xiàn)的就和direct 是一樣的
3.5 RPC
這一節(jié)使用 RabbitMQ 建立一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的服務(wù)器
本示例中在客戶端調(diào)用一個(gè)call 方法,服務(wù)端返回一個(gè)Fibonacci 數(shù)列的值
關(guān)于使用rpc的幾點(diǎn)建議:
本地方法和遠(yuǎn)程方法要定義明確,一目了然
系統(tǒng)加注釋,是組件之間的依賴清晰可見(jiàn)
異常處理,當(dāng)rpc 服務(wù)器出現(xiàn)異常的時(shí)候,客戶端改如何處理