.NET Core 用 EasyNetQ 建立 RabbitMQ 的延時(shí)隊(duì)列

已經(jīng)在群暉的 Docker 組件中,安裝了 RabbitMQ(含 Delay 插件),安裝方法見這里。
在 .NET Core 中,使用 RabbitMQ 的文章已經(jīng)很多了,這里不再熬述。
本文主要是記錄使用 EasyNetQ 建立 RabbitMQ 的延時(shí)隊(duì)列的經(jīng)歷。

.NET Core 訪問 RabbitMQ 的庫很多(官方推薦的列表
一般都使用 RabbitMQ .NET Client
在官方的推薦組件中,還有一個(gè):EasyNetQ,看了下 Nuget 下載量和 Github 的情況,貌似很不錯(cuò),就嘗試用了。(EasyNetQ 官網(wǎng)
注:還有一個(gè)組件,NServiceBus,也是很不錯(cuò)的,不過,該組件已經(jīng)商業(yè)了,要收費(fèi),免費(fèi)版有訪問量限制,就放棄了。(NServiceBus 官網(wǎng)

特別說明:EasyNetQ 的設(shè)計(jì)初衷,是共享業(yè)務(wù)實(shí)體類,如果不想共享業(yè)務(wù)實(shí)體類,可考慮在發(fā)送消息時(shí),將業(yè)務(wù)實(shí)體序列化成字符串,再發(fā)送(不過,該方法也不是全部場(chǎng)景都可行,如果跨平臺(tái)了,也會(huì)報(bào)錯(cuò))。

額外說明:

  1. 要完美解決上述問題,應(yīng)該是要繼承 ITypeNameSerializer 接口,實(shí)現(xiàn)相關(guān)方法,然后在創(chuàng)建 Bus 時(shí),注冊(cè)進(jìn)去,應(yīng)該可以解決,不過由于目前還沒有細(xì)看,所以,暫時(shí)不能確定是否能解決。可以看看這些資料:資料一(中文)、資料二(中文)資料三(英文)、資料四(英文)
  2. 如果不想用自帶的 Newtonsoft.Json,甚至,不想用 Json 作為消息傳輸?shù)母袷交绞剑勺孕欣^承 EasyNetQ.ISerializer 接口,并實(shí)現(xiàn)里面的方法,然后在創(chuàng)建 Bus 時(shí),注冊(cè)進(jìn)去即可,參考這里(英文)。

關(guān)于 RabbitMQ 的延時(shí)隊(duì)列方案,總結(jié)起來,主要有三種,各方案的優(yōu)劣,可自行百度。
總之,推薦:delayed_message_exchange 插件,該方案在大多數(shù)云服務(wù)商的消息隊(duì)列產(chǎn)品中,均支持,兼容性好。
如果購買了阿里云的 RabbitMQ 版消息隊(duì)列,除了上述三種方案,還提供了阿里云定制的方案,如下圖所示:

開源版和阿里云版的方案對(duì)比

阿里云的延時(shí)隊(duì)列方案介紹,參考這里。

正式開始(平臺(tái):.NET Core 5.0)

EasyNetQ 的連接字符串,如下所示:

host=<IP 或域名>:<端口,默認(rèn)“5672”>;virtualhost=<vHost名稱,默認(rèn)“/”>;username=<連接用戶名>;password=<連接密碼>

連接字符串各參數(shù)說明,參考這里。

  1. 建立兩個(gè) Console 項(xiàng)目:
  • MsgProducer:消息生產(chǎn)者,用于發(fā)送消息到延時(shí)隊(duì)列中
  • MsgConsumer:消息消費(fèi)者,用于消費(fèi)延時(shí)隊(duì)列中的消息
  1. MsgProducer 項(xiàng)目

主要邏輯:

  • 創(chuàng)建延時(shí) exchange
  • 創(chuàng)建普通 exchange,并和延時(shí) exchange 綁定
  • 數(shù)據(jù)發(fā)送到延時(shí) exchange 中

MsgProducer 主要代碼如下(所有方法,可使用異步 Async):

using EasyNetQ;
using EasyNetQ.Topology;

static void Main(string[] args)
{
    // 通常建議單例該連接對(duì)象(比如在 StartUp.cs 中單例注入)
    using var bus = RabbitHutch.CreateBus("<EasyNetQ 連接字符串>");

    // 建立延時(shí) exchange
    var exDelay = bus.Advanced.ExchangeDeclare("<延時(shí) exchange 名稱>", cfg => cfg.AsDelayedExchange(ExchangeType.Direct));
    // 建立普通 exchange
    var exNormal = bus.Advanced.ExchangeDeclare("<普通 exchange 名稱>", ExchangeType.Direct);
    // 綁定兩個(gè) exchange,設(shè)置好 RoutingKey
    bus.Advanced.Bind(exDelay, exNormal, "delay");

    // 創(chuàng)建普通隊(duì)列,并和普通 exchange 綁定,設(shè)置好 RoutingKey
    var qNormal = bus.Advanced.QueueDeclare("<普通隊(duì)列名稱>");
    bus.Advanced.Bind(exNormal, qNormal, "delay");

    // 以下是測(cè)試發(fā)送消息的代碼
    string msg;

    while ((msg = Console.ReadLine()) != "exit")
    {
        // 針對(duì)單個(gè)消息,設(shè)置延遲時(shí)間(毫秒)
        var msgHeaders = new MessageProperties();
        msgHeaders.Headers.Add("x-delay", new Random().Next(1000, 5000));

        // 發(fā)送消息
        bus.Advanced.Publish(exDelay, "delay", false, new Message<string>(msg, msgHeaders));

        Console.WriteLine($"{DateTimeOffset.Now.ToUnixTimeMilliseconds()} 發(fā)送成功 {msg}");
    }
}

在 EasyNetQ 的官方文檔中,有關(guān)于 Delayed Message 插件的推薦使用說明,本人實(shí)在受不了 EasyNetQ 自動(dòng)建立的 exchange 和 queue,就放棄了官方的方法,自己使用了 Advanced 模式。
關(guān)于 EasyNetQ 官方文檔推薦的使用方式,參考這里(英文)。

  1. MsgConsumer 項(xiàng)目

主要邏輯

  • 從普通隊(duì)列中讀取消息并處理

MsgConsumer 主要代碼如下(所有方法,可使用異步 Async):

using EasyNetQ;

static void Main(string[] args)
{
    // 通常建議單例該連接對(duì)象(比如在 StartUp.cs 中單例注入)
    using var bus = RabbitHutch.CreateBus(<EasyNetQ 連接字符串>);

    // 創(chuàng)建普通隊(duì)列,設(shè)置好 RoutingKey
    // 和第二步 MsgProducer 項(xiàng)目中的普通隊(duì)列名稱一致?。?!
    var qNormal = bus.Advanced.QueueDeclare("<MsgProducer 的普通隊(duì)列名稱>");
    bus.Advanced.Consume<string>(qNormal, MsgHandle);   // MsgHandle 也可以用匿名方法

    Console.WriteLine("等待中……(回車退出)");
    Console.ReadLine();
}

static void MsgHandle(IMessage<string> msg, MessageReceivedInfo msgInfo)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"{DateTimeOffset.Now.ToUnixTimeMilliseconds()} 收到消息:{msg.Body}");
    Console.ResetColor();
}

至此,即可正常收發(fā)延時(shí)消息了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容