.Net Core&RabbitMQ限制循環消費( 三 )

消息被拒收后,再重新發送消息到原有交換機或是隊列下中,以使得消息像是消費失敗回到了隊列中 , 如此來控制消費次數 , 但是這種場景下 , 新消息排在了隊列的尾部,而不是原消息排在隊列頭部 。

.Net Core&RabbitMQ限制循環消費

文章插圖
存儲重試次數在存儲服務中存儲消息的唯一標識與對應重試次數,消費消息前對消息進行判斷是否存在 。
.Net Core&RabbitMQ限制循環消費

文章插圖
與消息頭判斷一致,只是消息重試次數的存儲從消息本身挪入存儲服務中了 。需要注意的是,消息發送端需要設置消息的唯一標識(MessageId屬性)
//模擬外部存儲服務var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")){    var maxRetryCount = 3;    Console.WriteLine("拒收");    //重試次數判斷    var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);    if (!existRetryRecord)    {        //重入隊列,繼續重試        MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)    {        //重入隊列,繼續重試        MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    //到達最大次數 , 不再重試消息    ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);    return;}    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);除第一次拒收外 , 允許三次重試機會,三次重試完畢后 , 設置requeue為false,消息丟失或進入死信隊列(如有設置的話) 。
.Net Core&amp;RabbitMQ限制循環消費

文章插圖
隊列使用Quorum類型第一種和第二種分別是消息自身、外部存儲服務來管理消息重試次數,使用Quorum , 由MQ來限定消息的投遞次數,也就控制了重試次數 。
.Net Core&amp;RabbitMQ限制循環消費

文章插圖
設置隊列類型為quorum , 設置投遞最大次數,當超過投遞次數后,消息被丟棄 。
var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){    { "x-queue-type", "quorum"},    { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine($"拒收 {DateTime.Now}");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);第一次消費被拒收重入隊列后,經最大三次投遞后,消費端不再收到消息 , 如此一來也限制了消息的循環消費 。
.Net Core&amp;RabbitMQ限制循環消費

文章插圖
隊列消息過期當為消息設置了過期時間時 , 當消息沒有受到Ack,且還在隊列中,受到過期時間的限制,反復消費但未能成功時,消息將走向過期,進入死信隊列或是被丟棄 。
聚焦于過期時間的限制,因此在消費者端,因異?;蚴蔷苁障r,需要對requeue設置為true,將消息再次重入到原隊列中 。

推薦閱讀