RabbitMQ可以做什么?
AMQP,即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議。是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。
應(yīng)用場景
我們知道現(xiàn)在很多APP都可以推送。在管理員選定內(nèi)容進(jìn)行推送這個(gè)行為中。
系統(tǒng)往往需要執(zhí)行倆個(gè)操作:推送消息;記錄是誰在什么時(shí)候推送的。
但是對(duì)于管理員來說,他只關(guān)心推送完成了沒有,而不關(guān)心是否產(chǎn)生了日志。
傳統(tǒng)的做法有2種:
串行:推送消息,然后在記錄日志,在倆個(gè)操作都完成之后告訴管理員推送完成。
并行:推送消息的同時(shí)記錄日志,在倆個(gè)操作都完成之后告訴管理員推送完成。
OK。我們現(xiàn)在引入消息隊(duì)列。
引入消息隊(duì)列之后,我們只需要在推送,然后日志放入消息隊(duì)列中。然后就可以告訴管理員消息推送完成。
而日志信息會(huì)存放在消息隊(duì)列之中。消息隊(duì)列的消費(fèi)者會(huì)在系統(tǒng)不繁忙的時(shí)候進(jìn)行處理。
RabbitMQ術(shù)語
生產(chǎn)者:
消息發(fā)送者
消費(fèi)者:
等待消息的程序
Queue:
隊(duì)列,存放消息的
Simple
RabbitMQ四種exchange
如果發(fā)生緊急情況,我們的服務(wù)器宕掉的話,消息隊(duì)列里的信息沒了怎么辦?
消息持久化
RabbitMQ為我們提供了消息持久化的手段
首先是隊(duì)列持久化,然后在是消息持久化
如果消費(fèi)者在消費(fèi)當(dāng)前消息的時(shí)候,突然崩掉,那么這條消息還在消息隊(duì)列中嗎?還是已經(jīng)被消費(fèi)掉了?
消息響應(yīng)機(jī)制
RabbitMQ為我們提供了消息響應(yīng)機(jī)制
啰里啰唆半天,下邊是代碼。
.net版本。
//生產(chǎn)者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
im.ExchangeDeclare("rabbitmq_route", ExchangeType.Direct);
im.QueueDeclare("rabbitmq_query", true, false, false, null);//第二個(gè)參數(shù)隊(duì)列持久化
im.QueueBind("rabbitmq_query", "rabbitmq_route", ExchangeType.Direct, null);
for (int i = 0; i < 5; i++)
{
var props = im.CreateBasicProperties();
props.SetPersistent(true);//消息持久化
byte[] message = Encoding.UTF8.GetBytes("Hello " + i);
im.BasicPublish("rabbitmq_route", ExchangeType.Direct, props, message);
Console.WriteLine("send:" + i);
}
}
}
//消費(fèi)者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
while (true)
{
BasicGetResult res = channel.BasicGet("rabbitmq_query", false);
if (res != null)
{
Console.WriteLine("receiver:" + UTF8Encoding.UTF8.GetString(res.Body));
}
Thread.Sleep(5000);
channel.BasicAck(res.DeliveryTag, true);//消息響應(yīng)
Console.WriteLine("basiack end");
}
}
}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);// 路由
// int i = 0;
// while (true)
// {
// Thread.Sleep(1000);
// ++i;
// byte[] message = Encoding.UTF8.GetBytes(i.ToString());
// im.BasicPublish("rabbitmq_route_Fanout", "", null, message);
// Console.WriteLine("send:" + i.ToString());
// }
// }
//}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);
// var queueOk = im.QueueDeclare();//1
// im.QueueBind(queueOk.QueueName, "rabbitmq_route_Fanout", "");//2
// var consumer = new QueueingBasicConsumer(im);//3
// im.BasicConsume(queueOk.QueueName, true, consumer);//4
// while (true)
// {
// var _result = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//5
// var body = _result.Body;
// var message = Encoding.UTF8.GetString(body);
// Console.WriteLine("received:{0}", message);
// }
// }
//}
RabbitMQ.Client.dil 版本5.1.0-rc1