基于RabbitMQ構建延遲隊列

延遲任務在業(yè)務中是一個很常見的需求,比如: 訂單下單15分鐘之后,用戶沒有支付,則自動取消訂單 用戶做了某些操作,5分鐘之后發(fā)短信提醒用戶 諸如此類的場景比比皆是,一種最常見的實現(xiàn)方式,就是開啟一個定時任務,然后一直輪詢數(shù)據(jù)庫,這種實現(xiàn)方式在數(shù)據(jù)量小的時候還好,但是數(shù)據(jù)量一旦過大,這輪詢數(shù)據(jù)庫就會給數(shù)據(jù)庫造成很大的壓力,此時全面掃表的實現(xiàn)方式就顯得不可靠了。 另外一種實現(xiàn)方式,就是用延遲隊列的方式來實現(xiàn),但是RabbitMQ本身是沒有實現(xiàn)延遲隊列的,不過可以使用TTL+死信隊列的方式來實現(xiàn)延遲隊列。 # 消息的TTL TTL全稱Time To Live,即生存時間。消息的TTL也就是消息的生存時間。在RabbitMQ中設置TTL有兩種 第一種是聲明隊列的時候,在隊列的屬性中設置TTL,這樣該隊列中的消息都會有相同的有效期 第二種是發(fā)送消息時給消息設置屬性,可以為每條消息都設置不同的TTL 如果兩者都設置,生存時間取兩者最小的那一個。這里我們采用第二種,即為每條消息設置TTL # 死信交換機/死信隊列 一個消息在滿足如下的條件的時候,就會變成“死信”,并且能被投遞到死信交換機(Dead-Letter-Exchange),最后進入到死信交換機綁定的隊列,也稱死信隊列(Dead-Letter-Queue) - 消息被拒絕而且requeue=false - 消息的TTL到了,即消息過期 - 隊列排滿了,排在前面的消息會被丟棄或者扔到死信路由上 死信交換機和普通的交換機是沒有區(qū)別的,只是某一個設置死信交換機的隊列中有消息過期了,會自動觸發(fā)消息的轉發(fā),發(fā)送到死信交換機中去,再由死信交換機轉發(fā)到死信隊列中。死信隊列也是一個普通的隊列,并沒有什么其它特殊的。 # 延遲隊列的實現(xiàn) 接著來看看TTL+死信交換機是如何實現(xiàn)延遲隊列的 ![](https://upload-images.jianshu.io/upload_images/28338950-621bca1a757fe408.png) 上面的流程就是實現(xiàn)延遲隊列的思路,比方說15分鐘取消訂單,那么用戶下單之后,消息的TTL設置為15分鐘,當消息在Queue1待的時間到了15分鐘,那么就會被轉發(fā)到Dead-Letter-Exchange,從而轉發(fā)到Dead-Letter-Queue,最后被消費者消費,實現(xiàn)延遲任務。 先在RabbitMQ控制臺創(chuàng)建一個名為dlx的交換機,作為死信交換機,并綁定上一個dlxQueue隊列,作為Dead-Letter-Queue ![](https://upload-images.jianshu.io/upload_images/28338950-927ef83996ac9433.png) ``` // 生產者.go package main import ( "github.com/streadway/amqp" "mq/fail" ) func main() { conn, err := amqp.Dial("amqp://123:123@localhost:5672") fail.OnError(err) defer conn.Close() ch, err := conn.Channel() fail.OnError(err) defer ch.Close() args := amqp.Table{"x-dead-letter-exchange": "dlx"} q, err := ch.QueueDeclare("test", true, false, false, false, args) // 聲明一個test隊列,并設置隊列的死信交換機為"dlx" body := "hello world1" for i := 0; i < 10; i++ { err = ch.Publish("", q.Name, false, false, amqp.Publishing{ Body: []byte(body), Expiration: "5000", // 設置TTL為5秒 }) fail.OnError(err) } } ``` 啟動生產者,可以看到消息被投遞到test隊列中 ![](https://upload-images.jianshu.io/upload_images/28338950-2dc39b3d0648ab52.png) 5秒之后,消息被轉發(fā)到dlxQueue隊列中 ![](https://upload-images.jianshu.io/upload_images/28338950-dfa1dc28ee0969e1.png) 之后有一個消費者,專門處理這個dlxQueu隊列中的消息 ``` // 消費者.go package main import ( "fmt" "github.com/streadway/amqp" "mq/fail" ) func main() { conn, err := amqp.Dial("amqp://123:123@localhost:5672") fail.OnError(err) c, err := conn.Channel() fail.OnError(err) msgs, err := c.Consume("dlxQueue", "", true, false, false, false, nil) //監(jiān)聽dlxQueue隊列 fail.OnError(err) for d := range msgs { fmt.Printf("收到信息: %s\n", d.Body) // 收到消息,業(yè)務處理 } } ``` ``` // 5秒之后,打印 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 // 收到信息: hello world1 ``` 總結 使用TTL+死信交換機實現(xiàn)延遲任務還是非常方便的,除此之外還可以使用相關的插件abbitmq-delayed-message-exchange,來實現(xiàn)延遲隊列,也是非常的方便。 ![](https://upload-images.jianshu.io/upload_images/28338950-abe5d878166573ef.png) 轉自: juejin.cn/post/6844904142155022344 本文由[mdnice](https://mdnice.com/?platform=6)多平臺發(fā)布
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容