dotnet core 開發(fā)中使用RabbitMQ做消息中間件

前言碎語

上次發(fā)文到現(xiàn)在已經(jīng)三個月了, 本來計劃每個月2篇的美夢就這樣成了泡影, 個中緣由不想詳述, 或者說不想找借口. 如此看來, 時光匆匆, 需珍惜, 切記切記.


時間你等等我

RabbitMQ是啥?

最早接觸RabbitMQ還是在3年前, 當(dāng)時接手一個系統(tǒng)遷移到中國的落地任務(wù). 系統(tǒng)很大, 架構(gòu)很復(fù)雜, 功能也很強(qiáng)大. 過程中填了無數(shù)的坑, 但是也學(xué)到很多.
現(xiàn)在回想起來, 令我印象最深的就是RabbitMQ. RabbitMQ是什么? 簡單的一句話總結(jié) "RabbitMQ是基于AMQP協(xié)議的隊列服務(wù)". 由于它是Erlang這個天生分布式的語言所開發(fā), 所以分布式, 集群, 高可用這些武功它統(tǒng)統(tǒng)都會.

我讀書少, 不騙人

有童鞋忍不住要問了, 這么高大上的東西, 我用它作甚? 截取一段CSDN中大神anzhsoft的描述:

對于一個大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上,但是還是有很多問題需要解決。比如:
1)信息的發(fā)送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?

是的, 對于這些問題RabbitMQ都能給你一個答案.

動手實(shí)踐

閑話不多說, 讓我們操練起來.
首先安裝RabbitMQ的服務(wù), 下載地址在這里
提示一下, 安裝后把服務(wù)啟動起來通過執(zhí)行下面的命令把管理插件打開你會感覺格外的舒爽.

rabbitmq-plugins enable rabbitmq_management

打開管理插件后在瀏覽器輸入http://localhost:15672 即可打開管理頁面, 視圖如下

然后需要用Nuget導(dǎo)入RabbitMQ客戶端. 我使用的開發(fā)工具是vs code, 安裝包相對來說會比較復(fù)雜, 如果你用Visual Studio會簡單很多.
好了, 我說下vs code 中具體的操作步驟. 首先在擴(kuò)展中搜索nuget, 可以看到一個叫.net core project manager的擴(kuò)展, 安裝后快捷鍵Ctrl+Shift+P 打開命令工具輸入nuget即可看到一個nuget: add new package的提示, 選中后搜索rabbitmq, 會看到列出的幾個版本號, 我選的4.1.1

簡單碼一段代碼看看效果.

  1. 配置, 創(chuàng)建Client
    ConnectionFactory factory = new ConnectionFactory();
    factory.UserName = "test";
    factory.Password = "test";
    factory.VirtualHost = "test";
    factory.HostName = "127.0.0.1";

     IConnection conn = factory.CreateConnection();
     IModel channel = conn.CreateModel();
     channel.ExchangeDeclare("test", "topic");
     channel.QueueDeclare("test",true,true,false,null);
     channel.QueueBind("test", "test", "test", null);
    
  2. 編寫生產(chǎn)者的代碼
    Timer t =new Timer((a)=>{
    var i=0;
    while (i++<100)
    {
    try
    {
    channel.BasicPublish("test","test",true,null, messageBodyBytes);
    }
    catch (System.Exception ex)
    {
    System.Console.WriteLine(ex.Message);
    }
    }
    },null,0,1000);
    用了一個Timer, 可以定時發(fā)送數(shù)據(jù).

  3. 編寫消費(fèi)者代碼
    EventingBasicConsumer c = new EventingBasicConsumer(channel);
    c.Received += (ch, ea) =>
    {
    System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
    Thread.Sleep(1000);
    channel.BasicAck(ea.DeliveryTag, false);
    };
    string consumerTag = channel.BasicConsume("test", false, c);
    RabbitMQ的消費(fèi)者有幾種, 我這里用的是EventingBasicConsumer 它是被事件觸發(fā)的, 不需要主動輪詢?nèi)ハM(fèi), 而且也是目前RabbitMQ官方推薦的.
    需要注意這一句
    string consumerTag = channel.BasicConsume("test", false, c);
    第二個參數(shù)我設(shè)置為false, 意思是我要主動向server確認(rèn)信息的接收. 這樣做的好處有兩點(diǎn). 第一, 消費(fèi)者的入口容易控制, 不會把消費(fèi)者壓死. 第二, 可以通過RabbitMQ的控制臺直觀的了解到消費(fèi)者的處理能力. 看了后面的內(nèi)容我相信你會理解我的Point.

代碼碼好了, 來run一下瞧瞧.

由于我是單線程消費(fèi), 并且每次消費(fèi)過程中Sleep一秒, 這樣我每秒的處理速度高達(dá) 1

進(jìn)階一點(diǎn)點(diǎn)

看到這有人會說了, 你寫個這么爛的代碼也好意思出來曬? 好吧, 我是要循序漸進(jìn)的!


廢話不多說, 上代碼!

    Task.Factory.StartNew(()=>{   
        System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
        Thread.Sleep(1000);
        channel.BasicAck(ea.DeliveryTag, false);
    });

貼了兩張圖, 看官們可以看到, 處理速度有一個爬坡的過程, 而且運(yùn)行到一段時間以后基本平穩(wěn)在了100個/秒上下. 川酷不滿意的是, 此時的cpu才僅僅占用了10%不到. 對于我這種抓到蛤蟆攥出尿來的人, 不壓榨壓榨實(shí)在不甘心. 對消費(fèi)者進(jìn)行了一下改造, 且看.

        public static ConcurrentQueue<BasicDeliverEventArgs> Queue1 = new ConcurrentQueue<BasicDeliverEventArgs>();
        EventingBasicConsumer c = new EventingBasicConsumer(channel);
        c.Received += (ch, ea) =>
        {
              Queue1.Enqueue(ea);
        };
        string consumerTag = channel.BasicConsume("test", false, c);

        var j=0;
        while (j++<150)
        {
            Task.Factory.StartNew(()=>{
                BasicDeliverEventArgs bdea=null;
                while (true)
                {
                    if(Queue1.TryDequeue(out bdea)){
                        System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(bdea.Body));
                        Thread.Sleep(1000);
                        channel.BasicAck(bdea.DeliveryTag, false);
                    }
                }
            });
        }

哈哈, 沒錯, 起了150個線程去同步處理, 速度就達(dá)到了150, 看證據(jù)


其實(shí)這時候的cpu占用率也不高. 只想做一個拋磚引玉, 當(dāng)我們直接調(diào)用.net為我們提供的傻瓜式接口時, 有時候會遇到結(jié)果并不那么如意的情況, 逼著我們開動腦筋. 看官當(dāng)中肯定會有人問, 如果起10000個線程, 豈不是速度能達(dá)到10000個/秒? 自己動手試試唄.

如果您覺得這篇文章對您有那么一丁點(diǎn)益處, 或者從某個角度觸動到了您, 請給川酷一些鼓勵, 打賞, 點(diǎn)贊, 關(guān)注, 哪怕評論區(qū)罵我兩句, 鄙人都感激涕零.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評論 19 139
  • 關(guān)于消息隊列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    預(yù)流閱讀 586,593評論 51 787
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,799評論 0 3
  • 摘要:RabbitMQ發(fā)送消息時,都是先把消息發(fā)送給ExChange(交換機(jī)),然后再分發(fā)給有相應(yīng)RoutingK...
    請叫wo小爺閱讀 1,403評論 0 2
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,507評論 2 34

友情鏈接更多精彩內(nèi)容