rabbitmq
安裝
省略(配置新用戶,配置vhost)
第一種模型:簡單消息
該模型只是簡單是數(shù)據(jù)產(chǎn)生和消費(fèi)的過程

java調(diào)用
生產(chǎn)者
public class Provide {
?
public static final String QUEUE_NAME = "test_work_queue";
?
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
?
?
Connection connetion = GetConnection.getConnetion();
?
Channel channel = connetion.createChannel();
?
?
channel.queueDeclare("",false,false,false,null);
?
?
for (int i = 0; i < 50; i++) {
String msg = "hello world";
msg = msg +" "+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
?
Thread.sleep(20);
System.out.println("send success"+msg);
}
?
?
channel.close();
connetion.close();
}
}
消費(fèi)者
/**
* 用來消費(fèi)消息
*/
public class Consumer {
?
public static final String name = "spmile_test_mq";
?
public static void consumeMsg() throws IOException, TimeoutException, InterruptedException {
Connection connetion = GetConnection.getConnetion();
?
Channel channel = connetion.createChannel();
?
QueueingConsumer consumer = new QueueingConsumer(channel);
?
String s = channel.basicConsume(name, true, consumer);
?
System.out.println(s);
?
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
}
?
?
}
?
public static void newConsumeMsg() throws IOException, TimeoutException, InterruptedException {
Connection connetion = GetConnection.getConnetion();
?
Channel channel = connetion.createChannel();
?
//可以省略
channel.queueDeclare(name,false,false,false,null);
?
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
?
System.out.println(new String(body));
}
};
?
channel.basicConsume(name,true,defaultConsumer);
?
?
}
?
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
consumeMsg();
newConsumeMsg();
}
}

輪詢模式(round-robin)
第一種簡單模式,消息隊(duì)列和消費(fèi)者一一對(duì)應(yīng),這樣耦合性很高,那么就會(huì)出現(xiàn)問題,只能有一個(gè)消費(fèi)者來消費(fèi)queue中的消息,那么一旦消費(fèi)者宕機(jī),既無法工作了。這樣是有問題,
由此才會(huì)有第二種模式
第二種模式生產(chǎn)者基本上都是一樣的,消費(fèi)者有所不同
這個(gè)時(shí)候我們需要先介紹一個(gè)參數(shù)

這個(gè)autoAck,參數(shù)就是表示是否開啟自動(dòng)應(yīng)答,我們開啟之后我們就會(huì)發(fā)現(xiàn)一個(gè)現(xiàn)象,不論消費(fèi)者消費(fèi)的速度快慢與否,我們消費(fèi)者在回掉還未完全結(jié)束的時(shí)候就自動(dòng)應(yīng)答。
所以我們的queue就會(huì)認(rèn)為消費(fèi)的速度都是一樣的,所以會(huì)自動(dòng)輪詢分發(fā)給給個(gè)消費(fèi)者。
所以我們需要把自動(dòng)應(yīng)答這個(gè)參數(shù)改為false,然后自己手動(dòng)應(yīng)答。
這種模式也有一個(gè)很明顯的缺點(diǎn),會(huì)造成消息丟失
- 當(dāng)我有消費(fèi)者掛掉了,那么這個(gè)消息沒有處理完,但是rabbitmq也認(rèn)為我們的消息處理完了,所以不會(huì)做任何處理,所以,消息就丟失掉了
一下這種模式叫做公平分發(fā)
1.首先保證,消息隊(duì)列對(duì)于每個(gè)消費(fèi)者而言,每次只發(fā)送一條消息
channel.queueDeclare("",false,false,false,null);
?
/**
* 我們需要任這個(gè)隊(duì)列每次保證只發(fā)送一條消息
*
*/
channel.basicQos(1);
2.將自動(dòng)應(yīng)答關(guān)閉
3.自動(dòng)應(yīng)答,相應(yīng)服務(wù)器
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String (body);
?
System.out.println("rev success "+msg);
?
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
?
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
?
// boolean autoAck = true;
boolean autoAck = false;
?
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
那么下面這種模式呢,因?yàn)槭鞘謩?dòng)應(yīng)答的,所以說,我們其中一個(gè)消費(fèi)者掛掉了,那么沒有產(chǎn)生消息回執(zhí),所以,我們的消息隊(duì)列會(huì)把這個(gè)消息轉(zhuǎn)發(fā)給其他的消費(fèi)者
持久化
在mq掛掉之后,那么存在mq中的消息怎么辦呢,當(dāng)然是需要持久換,落在磁盤上才可以
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
這個(gè)方法中第二個(gè)參數(shù)durable就是控制是否開啟持久化的過程
交換機(jī)
上面的模式都是,消息只能使用一次,也只能發(fā)送給一個(gè)消費(fèi)和,那么并不適用于諸多場景,例如:微信公眾號(hào)的消息推送,因?yàn)槲覀冃枰總€(gè)人都受到消息,但是并不知道有多少消費(fèi)者,那么也就無法知道需要發(fā)送多少臺(tái)條消息才夠。
所以需要加上下面的概念:*** 交換機(jī)***
模型大概如下

這個(gè)有交換機(jī)將消息轉(zhuǎn)發(fā)個(gè)每一個(gè)消息隊(duì)列,再有隊(duì)列轉(zhuǎn)發(fā)個(gè)消費(fèi)者,這個(gè)模型下,消費(fèi)者和消息隊(duì)列是綁定的。
從上面的圖形來看,我們的消費(fèi)者是直接連載交換機(jī)上的,所以在寫代碼之前我們可以找尋出如下不同,以便記憶
首先,之前的是直接連載隊(duì)列上的,所有我們需要首先聲明隊(duì)列,
而上面的模型我們是連載交換機(jī)上的,所有我們需要聲明交換機(jī),但是我們不能連在隊(duì)列上,因?yàn)椴恢澜壴谀囊粋€(gè)隊(duì)列上,而消費(fèi)者又沒有交換機(jī)的功能
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//居然不是枚舉值,讓人費(fèi)解
//交換機(jī)沒有存儲(chǔ)消息的功能,所有在代碼中,如果沒有綁定隊(duì)列就發(fā)送了消息,那么消息就丟失了
- 所以是隊(duì)列在綁定交換機(jī)
channel.exchangeBind(QUEME_NAME,EXCHANGE_NAME,"");
路由器
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//居然不是枚舉值,讓人費(fèi)解
fanout:是指不處理路由建
direct:處理路由建(只有所綁定的隊(duì)列和交換機(jī)此時(shí)的路由key想匹配的話,就會(huì)進(jìn)入隊(duì)列)
例如:日志記錄
error:記錄在專門的隊(duì)列里
info,debug。。在另外的隊(duì)列里
這個(gè)問題就是三個(gè)地方
1.生產(chǎn)者
channel.exchangeDeclare(EXCHANGE_Name,"direct");
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
String s = "info msg";
channel.basicPublish(EXCHANGE_Name,"info", null,s.getBytes());
2.消費(fèi)者
//queueBind(String queue, String exchange, String routingKey)
channel.queueBind(name,EXCHANGE_Name,"error");
channel.queueBind(name,EXCHANGE_Name,"info");
如果出現(xiàn)路由key,沒生效的情況,吧交換機(jī)刪了,重啟就好