前言碎語
上次發(fā)文到現(xiàn)在已經(jīng)三個月了, 本來計劃每個月2篇的美夢就這樣成了泡影, 個中緣由不想詳述, 或者說不想找借口. 如此看來, 時光匆匆, 需珍惜, 切記切記.

RabbitMQ是啥?
最早接觸RabbitMQ還是在3年前, 當(dāng)時接手一個系統(tǒng)遷移到中國的落地任務(wù). 系統(tǒng)很大, 架構(gòu)很復(fù)雜, 功能也很強(qiáng)大. 過程中填了無數(shù)的坑, 但是也學(xué)到很多.
現(xiàn)在回想起來, 令我印象最深的就是RabbitMQ. RabbitMQ是什么? 簡單的一句話總結(jié) "RabbitMQ是基于AMQP協(xié)議的隊列服務(wù)". 由于它是Erlang這個天生分布式的語言所開發(fā), 所以分布式, 集群, 高可用這些武功它統(tǒng)統(tǒng)都會.

有童鞋忍不住要問了, 這么高大上的東西, 我用它作甚? 截取一段CSDN中大神anzhsoft的描述:
對于一個大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上,但是還是有很多問題需要解決。比如:
1)信息的發(fā)送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?
是的, 對于這些問題RabbitMQ都能給你一個答案.

動手實(shí)踐
閑話不多說, 讓我們操練起來.
首先安裝RabbitMQ的服務(wù), 下載地址在這里
提示一下, 安裝后把服務(wù)啟動起來通過執(zhí)行下面的命令把管理插件打開你會感覺格外的舒爽.
rabbitmq-plugins enable rabbitmq_management
打開管理插件后在瀏覽器輸入http://localhost:15672 即可打開管理頁面, 視圖如下

然后需要用Nuget導(dǎo)入RabbitMQ客戶端. 我使用的開發(fā)工具是vs code, 安裝包相對來說會比較復(fù)雜, 如果你用Visual Studio會簡單很多.
好了, 我說下vs code 中具體的操作步驟. 首先在擴(kuò)展中搜索nuget, 可以看到一個叫.net core project manager的擴(kuò)展, 安裝后快捷鍵Ctrl+Shift+P 打開命令工具輸入nuget即可看到一個nuget: add new package的提示, 選中后搜索rabbitmq, 會看到列出的幾個版本號, 我選的4.1.1
簡單碼一段代碼看看效果.
-
配置, 創(chuàng)建Client
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "test";
factory.Password = "test";
factory.VirtualHost = "test";
factory.HostName = "127.0.0.1";IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); channel.ExchangeDeclare("test", "topic"); channel.QueueDeclare("test",true,true,false,null); channel.QueueBind("test", "test", "test", null); 編寫生產(chǎn)者的代碼
Timer t =new Timer((a)=>{
var i=0;
while (i++<100)
{
try
{
channel.BasicPublish("test","test",true,null, messageBodyBytes);
}
catch (System.Exception ex)
{
System.Console.WriteLine(ex.Message);
}
}
},null,0,1000);
用了一個Timer, 可以定時發(fā)送數(shù)據(jù).編寫消費(fèi)者代碼
EventingBasicConsumer c = new EventingBasicConsumer(channel);
c.Received += (ch, ea) =>
{
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
Thread.Sleep(1000);
channel.BasicAck(ea.DeliveryTag, false);
};
string consumerTag = channel.BasicConsume("test", false, c);
RabbitMQ的消費(fèi)者有幾種, 我這里用的是EventingBasicConsumer 它是被事件觸發(fā)的, 不需要主動輪詢?nèi)ハM(fèi), 而且也是目前RabbitMQ官方推薦的.
需要注意這一句
string consumerTag = channel.BasicConsume("test", false, c);
第二個參數(shù)我設(shè)置為false, 意思是我要主動向server確認(rèn)信息的接收. 這樣做的好處有兩點(diǎn). 第一, 消費(fèi)者的入口容易控制, 不會把消費(fèi)者壓死. 第二, 可以通過RabbitMQ的控制臺直觀的了解到消費(fèi)者的處理能力. 看了后面的內(nèi)容我相信你會理解我的Point.

代碼碼好了, 來run一下瞧瞧.

由于我是單線程消費(fèi), 并且每次消費(fèi)過程中Sleep一秒, 這樣我每秒的處理速度高達(dá) 1

進(jìn)階一點(diǎn)點(diǎn)
看到這有人會說了, 你寫個這么爛的代碼也好意思出來曬? 好吧, 我是要循序漸進(jìn)的!

廢話不多說, 上代碼!
Task.Factory.StartNew(()=>{
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
Thread.Sleep(1000);
channel.BasicAck(ea.DeliveryTag, false);
});


貼了兩張圖, 看官們可以看到, 處理速度有一個爬坡的過程, 而且運(yùn)行到一段時間以后基本平穩(wěn)在了100個/秒上下. 川酷不滿意的是, 此時的cpu才僅僅占用了10%不到. 對于我這種抓到蛤蟆攥出尿來的人, 不壓榨壓榨實(shí)在不甘心. 對消費(fèi)者進(jìn)行了一下改造, 且看.
public static ConcurrentQueue<BasicDeliverEventArgs> Queue1 = new ConcurrentQueue<BasicDeliverEventArgs>();
EventingBasicConsumer c = new EventingBasicConsumer(channel);
c.Received += (ch, ea) =>
{
Queue1.Enqueue(ea);
};
string consumerTag = channel.BasicConsume("test", false, c);
var j=0;
while (j++<150)
{
Task.Factory.StartNew(()=>{
BasicDeliverEventArgs bdea=null;
while (true)
{
if(Queue1.TryDequeue(out bdea)){
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(bdea.Body));
Thread.Sleep(1000);
channel.BasicAck(bdea.DeliveryTag, false);
}
}
});
}
哈哈, 沒錯, 起了150個線程去同步處理, 速度就達(dá)到了150, 看證據(jù)

其實(shí)這時候的cpu占用率也不高. 只想做一個拋磚引玉, 當(dāng)我們直接調(diào)用.net為我們提供的傻瓜式接口時, 有時候會遇到結(jié)果并不那么如意的情況, 逼著我們開動腦筋. 看官當(dāng)中肯定會有人問, 如果起10000個線程, 豈不是速度能達(dá)到10000個/秒? 自己動手試試唄.

如果您覺得這篇文章對您有那么一丁點(diǎn)益處, 或者從某個角度觸動到了您, 請給川酷一些鼓勵, 打賞, 點(diǎn)贊, 關(guān)注, 哪怕評論區(qū)罵我兩句, 鄙人都感激涕零.
