C# & RabbitMQ 之 Work Queues

Work Queues介紹

Work Queues簡而言之就是Producer將Message發(fā)送到Queues中,公平調度的發(fā)送到個個worker處理。

Work Queues模型

值得注意的地方,在消息接受過程中,worker會遇到異常而崩潰,導致接收到的消息處理失敗,但是Queues發(fā)送Message并不知道這個是否已經(jīng)正確處理而自動刪除這條message。這樣會導致Message的丟失,所以需要實現(xiàn)手動Message acknowledgment。當處理成功是告知RabbitMQ 這條message處理OK并刪除。
除此之外還有一個Message的丟失風險,就是當RabbitMQ 退出或者異常崩潰時,會導致queue和message的丟失,所以也要配置Message durability(持久化)。
公平調度(Fair dispatch),RabbitMQ默認是平均分配message到各個worker。為防止出現(xiàn)某些worker因為處理比較復雜,大量的數(shù)據(jù)而一直處理繁忙狀態(tài),其他的worker卻處于閑置狀態(tài),還不停的進行調度繁忙的worker,需要使用basicQos 方法設置 prefetchCount = 1 ,就是告知RabbitMQ 不要同時的給一個worker大于1條Message。
Fair dispatch

演示代碼

producer:

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************producer***************");
            Console.WriteLine("please Input send message:");
            //連接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一種方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二種方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //產(chǎn)生一個連接對象
            using (var conncetion = factory.CreateConnection())
            {
                //通過conncetion產(chǎn)生一個連接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代碼實現(xiàn) exchanges和Queues 
                    //定義exchanges
                    string exchangeName = "Ewrokqueues";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //定義Queues
                    string queueName = "Qwrokqueues";
                    bool durable = true;//設RabbitMQ置持久化
                    channel.QueueDeclare(queueName, durable, false, false, null);
                    //綁定exchanges 和Queues
                    string routingKey = "task_queue";
                    channel.QueueBind(queueName, exchangeName, "", null);

                    //簡單設置隊列方式
                    //channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,
                    //    arguments: null);

                      for (int i =0;i<20;i++)
                      {
                          string message = i.ToString();

                            var body = Encoding.UTF8.GetBytes(message);
                            var properties = channel.CreateBasicProperties();
                            properties.Persistent = true;
                            channel.BasicPublish(exchange: "Ewrokqueues", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                            Console.WriteLine("[producer] send : {0}", message);
                          Thread.Sleep(1000);
                        }
                }
            }
            Console.ReadLine();
        }

worker 代碼:

static void Main(string[] args)
       {
           //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

           Console.WriteLine("********************worker1(sleep 5s)***************");
           //連接MQ
           var factory = new ConnectionFactory();
           factory.UserName = "admin";
           factory.Password = "admin";
           factory.VirtualHost = "/";
           factory.HostName = "10.19.52.80";

           //產(chǎn)生連接對象
           using (var connection = factory.CreateConnection())
           {
               //通道
               using (var channel = connection.CreateModel())
               {
                   //公平調用
                   channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                   //訂閱方式獲取message
                   var consumer = new EventingBasicConsumer(channel);
                   //實現(xiàn)獲取message處理事件
                   consumer.Received += (model, ea) =>
                   {
                       var body = ea.Body;
                       var message = Encoding.UTF8.GetString(body);
                       //睡眠5s 另一個是1s
                       Thread.Sleep(5000);

                       Console.WriteLine("[worker1] received : {0}", message);

                       //手動設置回復
                       channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
                   };
                   //設置手動回復認證
                   channel.BasicConsume(queue: "Qwrokqueues", autoAck: false, consumer: consumer);
                   Console.ReadLine();
               }
           }
       }

P會循環(huán)發(fā)送20次,每秒發(fā)送一個數(shù)字到queue中,兩個worker接受message。最后從運行結果可以看到整個分配情況,worker1第一個接受到“0”,在5秒處理完成后才接受“5”,而worker2會一直在處理,而不是出于等待閑置。


運行結果
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評論 19 139
  • 上篇講過簡單的hello消息,這篇我們將實現(xiàn)一個可以在多個Consumer上發(fā)送持久化消息的work queue。...
    初級賽亞人閱讀 1,972評論 0 3
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器。支持消息的持久化、事務、擁塞控...
    jiangmo閱讀 10,507評論 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,201評論 2 11
  • 上個學期學堂的招新工作是從四月份進行到五月份,但實際上在2016年的十二月左右學監(jiān)的一次講座也可以算作是招新的前言...
    白河丶夜船閱讀 876評論 0 0

友情鏈接更多精彩內(nèi)容