【.NET 6】RabbitMQ延遲消費指南

背景最近遇到一個比較特殊需求,需要修改一個的RabbitMQ消費者,以實現在消費某種特定的類型消息時,延遲1小時再處理 , 幾個需要注意的點:

  • 延遲是以小時為單位
  • 不是所有消息都延遲消費,只延遲特定類型的消息
  • 只在第一次消費時延遲1小時,容錯機制產生的重新消費(也即消息消費失敗,多次進入延遲隊列重試),則不再延遲1小時
  • 消費者消費過程中可能會重啟
考慮到這幾點,我們需要一個標識以及持久化,不能簡單使用Thread.Sleep或者Task.Delay;下面開始演示在不引入其它框架資源的前提下,利用現有的RabbitMQ來實現這個需求 。
準備如果沒有可用的RabbitMQ測試環境,推薦使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management項目搭建創建解決方案RabbitMQDemo,并添加一個.Net6控制臺程序Producer作為生產者,
mkdir RabbitMQDemocd RabbitMQDemodotnet new sln -n RabbitMQDemomkdir srccd srcdotnet new console -n Producercd Producerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/Producer/Producer.csproj我們給Producer項目添加了兩個包 ——EasyNetQ是用來簡便RabbitMQ操作,添加Newtonsoft.Json則是因為EasyNetQ從v7版本開始移除了對前者的依賴,需要使用者自行添加 。
接下來定義消息的數據結構,添加一個類庫Core到解決方案,
cd srcdotnet new classlib --name Corecd ..dotnet sln add ./src/Core/Core.csproj添加如下OrderNotification類 , 后面我們根據消息的 Type的值來確定是正常消費還是延遲消費 。
namespace Core{public class OrderNotification{public string OrderId { get; set; }public int Type { get; set; }public DateTime DateCreation { get; set; }}}生產者在Producer項目里 , 聲明隊列orders.notification,綁定到同名交換機 , 然后向該交換機發送OrderNotification類型的數據,
實際項目中,我們很少直接發消息到隊列,都是發送到交換機,這個項目雖然只是demo,但也遵循這個原則
完整代碼如下:
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");//聲明交換機var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);//聲明隊列var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");//綁定await bus.Advanced.BindAsync(sourceExchange, sourceQueue, "");Console.WriteLine("按Ctrl + C 暫停發送,任意鍵恢復發送");Console.TreatControlCAsInput = true;while (true){Random random = new();var orderId = Guid.NewGuid().ToString();var type = random.Next(1, 3);await bus.Advanced.PublishAsync(sourceExchange, "", true, new Message<OrderNotification>(new OrderNotification { OrderId = orderId, Type = type, DateCreation = DateTime.Now }));Console.WriteLine($"{DateTime.Now}:消息(OrderId:{orderId},Type:{type}) 已發送");Thread.Sleep(1000);}運行Producer項目,可以看到消息正在不停的發送
【.NET 6】RabbitMQ延遲消費指南

文章插圖
打開RabbitMQ后臺,名orders.notification的隊列和交換機已經創建好且相互綁定,隊列里已經有我們剛剛發送的消息
【.NET 6】RabbitMQ延遲消費指南

文章插圖
下面我們要做的就是將隊列orders.notificationType為1的消息延遲消費,其它則正常消費 。
延遲消費使用死信交換機實現原理就是在聲明一個隊列時 , 給它配置死信交換機(Dead Letter Exchanges,簡稱DLX)策略,對應參數為x-dead-letter-exchange,這種隊列處理帶設置了過期時間屬性(Properties.expiration)的消息時,在消息到期時,會自動將消息投遞到事先配置好的死信交換機上 。
我們解決方案增加一個控制臺類型的消費者項目DLXConsumer
cd srcdotnet new console -n DLXConsumercd DLXConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DLXConsumer/DLXConsumer.csproj和生產者類似,實現消費者我們也創建一對同名的交換機和隊列

推薦閱讀