rabbitmq

rabbitmq

安裝

省略(配置新用戶,配置vhost)

第一種模型:簡單消息

該模型只是簡單是數(shù)據(jù)產(chǎn)生和消費(fèi)的過程

simple.png

java調(diào)用

生產(chǎn)者

public class Provide {
?
 public static final String QUEUE_NAME = "test_work_queue";
?
 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
?
?
 Connection connetion = GetConnection.getConnetion();
?
 Channel channel = connetion.createChannel();
?
?
 channel.queueDeclare("",false,false,false,null);
?
?
 for (int i = 0; i < 50; i++) {
 String msg = "hello world";
 msg = msg +"   "+i;
 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
?
 Thread.sleep(20);
 System.out.println("send success"+msg);
 }
?
?
 channel.close();
 connetion.close();
 }
}

消費(fèi)者

/**
 * 用來消費(fèi)消息
 */
public class Consumer {
?
 public static final String name = "spmile_test_mq";
?
 public static void consumeMsg() throws IOException, TimeoutException, InterruptedException {
 Connection connetion = GetConnection.getConnetion();
?
 Channel channel = connetion.createChannel();
?
 QueueingConsumer consumer = new QueueingConsumer(channel);
?
 String s = channel.basicConsume(name, true, consumer);
?
 System.out.println(s);
?
 while (true){
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 System.out.println(new String(delivery.getBody()));
 }
?
?
 }
?
 public static void newConsumeMsg() throws IOException, TimeoutException, InterruptedException {
 Connection connetion = GetConnection.getConnetion();
?
 Channel channel = connetion.createChannel();
?
 //可以省略
 channel.queueDeclare(name,false,false,false,null);
?
 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
?
 System.out.println(new String(body));
 }
 };
?
 channel.basicConsume(name,true,defaultConsumer);
?
?
 }
?
 public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
 consumeMsg();
 newConsumeMsg();
 }
}
image-20200621211555875.png

輪詢模式(round-robin)

第一種簡單模式,消息隊(duì)列和消費(fèi)者一一對(duì)應(yīng),這樣耦合性很高,那么就會(huì)出現(xiàn)問題,只能有一個(gè)消費(fèi)者來消費(fèi)queue中的消息,那么一旦消費(fèi)者宕機(jī),既無法工作了。這樣是有問題,

由此才會(huì)有第二種模式

第二種模式生產(chǎn)者基本上都是一樣的,消費(fèi)者有所不同

這個(gè)時(shí)候我們需要先介紹一個(gè)參數(shù)

image-20200621212410163.png

這個(gè)autoAck,參數(shù)就是表示是否開啟自動(dòng)應(yīng)答,我們開啟之后我們就會(huì)發(fā)現(xiàn)一個(gè)現(xiàn)象,不論消費(fèi)者消費(fèi)的速度快慢與否,我們消費(fèi)者在回掉還未完全結(jié)束的時(shí)候就自動(dòng)應(yīng)答。

所以我們的queue就會(huì)認(rèn)為消費(fèi)的速度都是一樣的,所以會(huì)自動(dòng)輪詢分發(fā)給給個(gè)消費(fèi)者。

所以我們需要把自動(dòng)應(yīng)答這個(gè)參數(shù)改為false,然后自己手動(dòng)應(yīng)答。

這種模式也有一個(gè)很明顯的缺點(diǎn),會(huì)造成消息丟失

  • 當(dāng)我有消費(fèi)者掛掉了,那么這個(gè)消息沒有處理完,但是rabbitmq也認(rèn)為我們的消息處理完了,所以不會(huì)做任何處理,所以,消息就丟失掉了

一下這種模式叫做公平分發(fā)

1.首先保證,消息隊(duì)列對(duì)于每個(gè)消費(fèi)者而言,每次只發(fā)送一條消息

channel.queueDeclare("",false,false,false,null);
?
/**
 * 我們需要任這個(gè)隊(duì)列每次保證只發(fā)送一條消息
 *
 */
channel.basicQos(1);

2.將自動(dòng)應(yīng)答關(guān)閉

3.自動(dòng)應(yīng)答,相應(yīng)服務(wù)器

Consumer defaultConsumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String (body);
?
 System.out.println("rev success "+msg);
?
 try {
 Thread.sleep(2000);
 } catch (InterruptedException e) {
 e.printStackTrace();
?
 channel.basicAck(envelope.getDeliveryTag(),false);
 }
 }
 };
?
 //   boolean autoAck = true;
 boolean autoAck = false;
?
 channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);

那么下面這種模式呢,因?yàn)槭鞘謩?dòng)應(yīng)答的,所以說,我們其中一個(gè)消費(fèi)者掛掉了,那么沒有產(chǎn)生消息回執(zhí),所以,我們的消息隊(duì)列會(huì)把這個(gè)消息轉(zhuǎn)發(fā)給其他的消費(fèi)者

持久化

在mq掛掉之后,那么存在mq中的消息怎么辦呢,當(dāng)然是需要持久換,落在磁盤上才可以

queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

這個(gè)方法中第二個(gè)參數(shù)durable就是控制是否開啟持久化的過程

交換機(jī)

上面的模式都是,消息只能使用一次,也只能發(fā)送給一個(gè)消費(fèi)和,那么并不適用于諸多場景,例如:微信公眾號(hào)的消息推送,因?yàn)槲覀冃枰總€(gè)人都受到消息,但是并不知道有多少消費(fèi)者,那么也就無法知道需要發(fā)送多少臺(tái)條消息才夠。

所以需要加上下面的概念:*** 交換機(jī)***

模型大概如下

image-20200621221750011.png

這個(gè)有交換機(jī)將消息轉(zhuǎn)發(fā)個(gè)每一個(gè)消息隊(duì)列,再有隊(duì)列轉(zhuǎn)發(fā)個(gè)消費(fèi)者,這個(gè)模型下,消費(fèi)者和消息隊(duì)列是綁定的。

從上面的圖形來看,我們的消費(fèi)者是直接連載交換機(jī)上的,所以在寫代碼之前我們可以找尋出如下不同,以便記憶

  • 首先,之前的是直接連載隊(duì)列上的,所有我們需要首先聲明隊(duì)列,

  • 而上面的模型我們是連載交換機(jī)上的,所有我們需要聲明交換機(jī),但是我們不能連在隊(duì)列上,因?yàn)椴恢澜壴谀囊粋€(gè)隊(duì)列上,而消費(fèi)者又沒有交換機(jī)的功能

    channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//居然不是枚舉值,讓人費(fèi)解
    //交換機(jī)沒有存儲(chǔ)消息的功能,所有在代碼中,如果沒有綁定隊(duì)列就發(fā)送了消息,那么消息就丟失了
  • 所以是隊(duì)列在綁定交換機(jī)
channel.exchangeBind(QUEME_NAME,EXCHANGE_NAME,"");

路由器

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//居然不是枚舉值,讓人費(fèi)解

fanout:是指不處理路由建

direct:處理路由建(只有所綁定的隊(duì)列和交換機(jī)此時(shí)的路由key想匹配的話,就會(huì)進(jìn)入隊(duì)列)

例如:日志記錄

  • error:記錄在專門的隊(duì)列里

  • info,debug。。在另外的隊(duì)列里

這個(gè)問題就是三個(gè)地方

1.生產(chǎn)者

channel.exchangeDeclare(EXCHANGE_Name,"direct");
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
String s = "info msg";
channel.basicPublish(EXCHANGE_Name,"info", null,s.getBytes());

2.消費(fèi)者

//queueBind(String queue, String exchange, String routingKey)
channel.queueBind(name,EXCHANGE_Name,"error");
channel.queueBind(name,EXCHANGE_Name,"info");

如果出現(xiàn)路由key,沒生效的情況,吧交換機(jī)刪了,重啟就好

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容