這里博主推薦大家閱讀由朱忠華先生編寫的<<RabbitMq實戰(zhàn)指南>>,這里有詳細(xì)的客戶端開發(fā)接口的說明,例如com.rabbotmq.client包的使用
運行流程
生產(chǎn)者
-
生產(chǎn)者連接到RabbitMq Broker,建立一個連接即程序里的Connection,開啟一個信道
ConnectionFactory factory = new ConnectionFactory(); // "guest"/"guest" by default, limited to localhost connections factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); -
生產(chǎn)者聲明一個交換器,并設(shè)置相關(guān)屬性,這里聲明交換器的類型,是否持久化等,很重要哦,可以自行查看api
(交換器也可以轉(zhuǎn)發(fā)消息給別的交換器)channel.exchangeDeclare(exchangeName, "direct", true);
-
生產(chǎn)者聲明一個隊列并設(shè)置屬性,比如排他性,是否持久化,是否過期刪除等
String queueName = channel.queueDeclare().getQueue(); or channel.queueBind(queueName, exchangeName, routingKey); -
生產(chǎn)者通過路由鍵將交換器和隊列進(jìn)行綁定起來
channel.queueBind(queueName, exchangeName, routingKey) -
生產(chǎn)者發(fā)送信息到RabbitMq Broker,其中包含路由鍵、交換器信息
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); 對應(yīng)交換器根據(jù)收到的路由鍵匹配發(fā)送的隊列`(重點type: topic、direct)
找到后,把消息存入隊列
找不到,會退給生產(chǎn)者
-
關(guān)閉信道,關(guān)閉連接
channel.close(); conn.close();
消費者
消費者連接到RabbitMq Broker,建立一個連接即程序里的Connection,開啟一個信道
消費者向?qū)?yīng)隊列請求消費消息,
等待RabbitMq Broker回應(yīng)并投遞相應(yīng)的隊列消息給消費者,消費者然后進(jìn)行回調(diào)處理
(回調(diào)在輪詢或者是阻塞里)-
消費者完成回調(diào),確認(rèn)消息已經(jīng)被成功消費
(這里確認(rèn)消息可以是自動應(yīng)答,也可手動,建議是手動)boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); //。。。。。 // 手動應(yīng)答 acknowledge receipt of the message //Delivery Tag 用來標(biāo)識信道中投遞的消息 //第二個參數(shù)是requeue 如果是true表示通知RabbitMq把這個消息重新存入隊列并發(fā)送給下一個消費者,反之刪除 channel.basicAck(deliveryTag, false); } }); RabbitMq 刪除對應(yīng)已經(jīng)被消費者確認(rèn)消費的消息
關(guān)閉信道
關(guān)閉連接
重點
- 以上過程不是絕對的,已經(jīng)表明
- Connection連接就是一條TCP連接,在實際項目中不要多次創(chuàng)建Connection,他是可以復(fù)用的,類似NIO。因為建立TCP連接開銷很大。
- Channel信道是建立在Connection上虛擬連接,RabbitMq處理的每條AMQP指令都是通過信道完成。
- 每個線程把持一個信道,所以信道復(fù)用了Connection的TCP連接。同時RabbitMq確保了每個線程的私密性,就像獨立的連接一樣。
- Channel不建議在線程間貢獻(xiàn),不然很酸爽
- Connection可以復(fù)用不代表項目中只能創(chuàng)建一個,可以根據(jù)實際情況創(chuàng)建多個Connection,并讓信道均攤到這些Connection上。