一、RibbitMQ的基礎介紹
1. 為什么要使用MQ
2. 與其他MQ的區(qū)別
- ActiveMQ:使用Java開發(fā),遵循JMS規(guī)范,使用方便,支持多種協(xié)議。但是有丟失消息的風險并且速度較慢
- RabbitMQ:使用Erlang開發(fā)(用于解決高并發(fā)的問題),可以解決并發(fā)問題。但是只支持AMQP協(xié)議且不能動態(tài)擴展
二、RabbitMQ的安裝
1. 安裝Erlang環(huán)境(這一步參照博客 <a>http://www.itdecent.cn/p/27197d58e94c</a>)
-
安裝阿里的yum源(我在安裝的時候下載速度很慢,所以這邊使用阿里的yum源來安裝erlang)
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo yum-y install make gcc gcc-c++kernel-devel m4 ncurses-devel openssl-devel java-devel unixODBC-devel -
安裝erlang的yum源
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
-
然后就可以直接安裝erlang了
yum -y install erlang
-
通過下面的命令就可以查看是否安裝完成了
erl
2. 安裝RibbitMQ
-
下載Rabbit的yum源(下載下來的時候名字很亂,改個名字)
wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm mv download_file\?file_path\=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm rabbitmq-server-3.7.14-el7.noarch.rpm -
安裝
yum -y install rabbitmq-server-3.7.14-el7.noarch.rpm- 如果報有依賴需要解決,就直接使用yum下載這個依賴就好了
-
啟動服務
rabbitmq-server start -
后臺啟動
rabbitmq-server -datached
- 啟動后使用amqp協(xié)議,默認在5672端口
三、RabbitMQ初步使用
1. 搭建管理平臺
-
初步搭建沒有任何插件,我們使用下面的命令下載并啟用RabbitMQ的管理地址
rabbitmq-plugins enable rabbitmq_management -
現(xiàn)在就可以訪問該節(jié)點的15672端口使用
guest/guest來登陸管理界面-
如果不是在localhost下訪問,我們還需要修改
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/ebin/rabbit.app文件,將{loopback_users, [<<”guest”>>]}
改為{loopback_users, []}{default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {loopback_users, []}, {password_hashing_module, rabbit_password_hashing_sha256}, {server_properties, []},再重啟就OK了
Virtural Host 用于區(qū)分不同業(yè)務,每個VH都是獨立的,互不影響的。不同的團隊用不同的VH,相互隔離
-
2. 點對點簡單隊列
點對點簡單隊列:一個生產(chǎn)者投遞消息給隊列,只允許一個消費者進行消費,(如果存在消費者集群,則會均攤消費,使用取模算法)每個消息只會消費一次
生產(chǎn)者生產(chǎn)的消息直接投遞給隊列服務器,然后隊列服務器直接推送或消費者自行拉取消息
-
ACK應答模式
- 自動應答:當消費者收到消息后,不論是否處理,消費者都會自動應答消費。
- 手動應答:消費者在代碼里顯式的回復ACK
-
導入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency> -
封裝一個鏈接工具
public class MQConnectionUtils { /** * 創(chuàng)建新的鏈接 * @return */ public static Connection connect() throws IOException, TimeoutException { //創(chuàng)建鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); //設置鏈接參數(shù) connectionFactory.setHost("192.168.3.203"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); return connectionFactory.newConnection(); } } -
生產(chǎn)者
public class Producer { /**隊列名稱*/ private static final String QUEUE_NAME = "libi_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接 Connection connection = MQConnectionUtils.connect(); //創(chuàng)建通道 Channel channel = connection.createChannel(); //創(chuàng)建一個隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //創(chuàng)建消息 String message = "Libi_Message"; //發(fā)送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); //關閉通道和鏈接 channel.close(); connection.close(); System.out.println("消息投遞成功!"); } } -
消費者
public class Consumer { /**隊列名稱*/ private static final String QUEUE_NAME = "libi_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接 Connection connection = MQConnectionUtils.connect(); //創(chuàng)建通道 Channel channel = connection.createChannel(); //消費者關聯(lián)一個隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ //使用匿名內(nèi)部類重寫獲取消息的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body, "UTF-8"); System.out.println("活動生產(chǎn)者消息:"+msg); } }; //設置應答模式,true表示自動應答,false表示手動應答 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); //關閉通道和鏈接 channel.close(); connection.close(); System.out.println("消息消費成功!"); } }
3. 公平隊列(需要自己在代碼里實現(xiàn))
均攤消費的缺點:當消費者處理消息的能力不一致時,如果還是均攤處理信息,則會造成資源浪費(對消費慢點節(jié)點不公平),需要實現(xiàn)"能者多勞"
-
公平隊列的實現(xiàn)思路(BaseQos方法):當有n個消費者在上一條消息還沒有處理完成時(還沒有發(fā)送ACK),消息隊列就不會發(fā)送下一條消息給它,給另外一個消費者。在生產(chǎn)者通過如下代碼開啟Qos
channel.basicQos(n); 如果這時消費者在代碼里忘記應答了,那么就會陷入阻塞
4.發(fā)布訂閱模式
生產(chǎn)者投遞消息給交換機,交換機根據(jù)路由策略轉發(fā)到不同的隊列服務器中,隊列服務器再給消費者進行消費
-
交換機策略
-
Direct:直接交換機,一種帶路由功能的交換機,一個隊列會和一個交換機綁定,除此之外再綁定一個
routing_key,當消息被發(fā)送的時候,需要指定一個binding_key,這個消息被送達交換機的時候,就會被這個交換機送到指定的隊列里面去。同樣的一個binding_key也是支持應用到多個隊列中的。就是說直接交換機可以更具生產(chǎn)者的
routing_key和消費者的binding_key進行匹配,只有一樣才會轉發(fā)這個消息 Fanout:扇形交換機,它所能做的事情非常簡單———廣播消息。扇形交換機會把能接收到的消息全部發(fā)送給綁定在自己身上的隊列。因為廣播不需要“思考”,所以扇形交換機處理消息的速度也是所有的交換機類型里面最快的。
-
Topic:主題交換機,發(fā)送到主題交換機上的消息需要攜帶指定規(guī)則的
routing_key,主題交換機會根據(jù)這個規(guī)則將數(shù)據(jù)發(fā)送到對應的(多個)隊列上。主題交換機的
routing_key需要有一定的規(guī)則,交換機和隊列的binding_key需要采用*.#.*.....的格式,每個部分用.分開,其中:-
*表示一個單詞 -
#表示任意數(shù)量(零個或多個)單詞。
假設有一條消息的
routing_key為fast.rabbit.white,那么帶有這樣binding_key的幾個隊列都會接收這條消息: -
-
Handler:首都交換機,首部交換機是忽略
routing_key的一種路由方式。路由器和交換機路由的規(guī)則是通過Headers信息來交換的,這個有點像HTTP的Headers。將一個交換機聲明成首部交換機,綁定一個隊列的時候,定義一個Hash的數(shù)據(jù)結構,消息發(fā)送的時候,會攜帶一組hash數(shù)據(jù)結構的信息,當Hash的內(nèi)容匹配上的時候,消息就會被寫入隊列。-
生產(chǎn)者(在發(fā)送消息的時候傳入exchange的參數(shù))
/** * 使用Fanout類型的交換機,交換器轉給發(fā)全部的隊列 */ public class Producer { //交換機名稱 static final String EXCHANGE_NAME = "fanout_destination"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.connect(); Channel channel = connection.createChannel(); //綁定交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "my_fanout_meg"; //發(fā)送消息(路由策略為空串) channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); } } -
消費者
public class EmailConsumer { private static String QUEUE_NAME = "Email_Queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.connect(); Channel channel = connection.createChannel(); //消費者聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //消費者綁定交換機 channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME,""); //監(jiān)聽消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("郵件消費者:" + msg); } }; channel.basicConsume(QUEUE_NAME, defaultConsumer); } }消費者的chanel和connection沒有關閉,可以多啟動幾個,就會發(fā)現(xiàn)所有的消費者都可以收到生產(chǎn)者傳入的信息