【.NET 6】RabbitMQ延遲消費指南( 二 )

orders.notification_dlx,用于接收轉發過來延遲消息,同時將該隊列的死信交換機設置為orders.notification;消費消息時,為了消息是不是已經延遲過,可以在消息頭里添加一個自定義參數biz-delayed , 在將需要延遲處理的消息發送到orders.notification_dlx交換機之前,除了設置過期時間,也同時將biz-delayed設置為1,后續再消費該消息時,讀取該值,不至于陷入死循環 。完整代碼如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dlxExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dlx", ExchangeType.Direct);var dlxQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification_dlx", configure => configure.WithDeadLetterExchange(sourceExchange));await bus.Advanced.BindAsync(dlxExchange, dlxQueue, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 開始消費 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers.Add("biz-delayed", 1);message.Properties.Expiration = TimeSpan.FromHours(1);await bus.Advanced.PublishAsync(dlxExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延遲消費");}else{//假裝在消費Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消費");}}上述代碼中,EasyNetQ設置隊列死信交換機的API為WithDeadLetterExchange,設置消息過期時間的API為Properties.Expiration
運行DLXConsumer項目,可以看到Type為1的消息被延遲 , 其它則被正常消費

【.NET 6】RabbitMQ延遲消費指南

文章插圖
打開RabbitMQ后臺確認,原本orders.notification里的消息已經被消費掉了,同時多了一個orders.notification_dlx隊列,并且orders.notification_delay隊列相比orders.notification多了一個DLX標簽,Type為1的消息就是被轉移該隊列 。
【.NET 6】RabbitMQ延遲消費指南

文章插圖
進入orders.notification_delay隊列,交換機與隊列正常綁定,x-dead-letter-exchange也已被設置
【.NET 6】RabbitMQ延遲消費指南

文章插圖
檢查隊列中的消息 , 可以看到Properties里的expiration: 3600000headers:biz-delayed: 1
【.NET 6】RabbitMQ延遲消費指南

文章插圖
再過3600000毫秒,orders.notification_dlx隊列就會被投遞到orders.notification交換機,隊列orders.notification也就會收到這些信息 , 這時因為消息頭里有biz-delayed,消費者會正常將其消費 。
使用延遲交換機實現使用延遲交換機,需要RabbitMQ服務器安裝rabbitmq_delayed_message_exchange插件 , 原理是投遞到延遲交換機的消息,會延遲指定時間(x-delay參數設置)后,自動投遞到該交換機綁定的另一交換機上 。直接看代碼 。
docker環境安裝rabbitmq_delayed_message_exchange插件這里介紹下docker環境如何安裝rabbitmq_delayed_message_exchange插件 , 首先在github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 下載與你RabbitMQ服務器大版本匹配的Release,將文件復制到RabbitMQ的/plugins目錄下,命令如下
docker cp {rabbitmq_delayed_message_exchange文件路徑} {rabbitmq容器id}:/pluginsdocker exec -it {rabbitmq容器id} rabbitmq-plugins enable rabbitmq_delayed_message_exchange以我本機為例 , 插件啟用成功 。
【.NET 6】RabbitMQ延遲消費指南

文章插圖
下面給解決方法添加一個DMConsumer項目 。
cd srcdotnet new console -n DMConsumercd DMConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DMConsumer/DMConsumer.csproj

推薦閱讀