.Net Core&RabbitMQ限制循環消費

前言當消費者端接收消息處理業務時,如果出現異?;蚴蔷苁障⑾⒂肿兏鼮榈却哆f再次推送給消費者,這樣一來,則形成循環的條件 。

.Net Core&RabbitMQ限制循環消費

文章插圖
循環場景生產者發送100條消息到RabbitMQ中,消費者設定讀取到第50條消息時,設置拒收,同時設定是否還留存在當前隊列中(當requeue為false時,設置了死信隊列則進入死信隊列,否則移除消息) 。
consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};當第50條消息拒收,則仍在隊列中且處在隊列頭部,重新推送給消費者,再次拒收,再次推送,反反復復 。
.Net Core&RabbitMQ限制循環消費

文章插圖
最終其他消息全部消費完畢,僅剩第50條消息往復間不斷消費,拒收,消費,這將可能導致RabbitMQ出現內存泄漏問題 。
.Net Core&RabbitMQ限制循環消費

文章插圖
解決方案RabbitMQ及AMQP協議本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實現重試限定(以下只考慮基于手動確認模式情況) 。此處只想到或是只查到了如下幾種方案解決消息循環消費問題 。
  • 一次消費
    • 無論成功與否 , 消費者都對外返回ack,將拒收原因或是異常信息catch存入本地或是新隊列中另作重試 。
    • 消費者拒絕消息或是出現異常,返回Nack或Reject,消息進入死信隊列或丟棄(requeue設定為false) 。
  • 限定重試次數
    • 在消息的頭中添加重試次數,并將消息重新發送出去,再每次重新消費時從頭中判斷重試次數,遞增或遞減該值,直到達到限制,requeue改為false,最終進入死信隊列或丟棄 。
    • 可以在Redis、Memcache或其他存儲中存儲消息唯一鍵(例如Guid、雪花Id等 , 但必須在發布消息時手動設置它),甚至在mysql中連同重試次數一起存儲,然后在每次重新消費時遞增/遞減該值,直到達到限制 , requeue改為false,最終進入死信隊列或丟棄 。
    • 隊列使用Quorum類型,限制投遞次數 , 超過次數消息被刪除 。
  • 隊列消息過期
    • 設置過期時間,給隊列或是消息設置TTL,重試一定次數消息達到過期時間后進入死信隊列或丟棄(requeue設定為true) 。
  • 也許還有更多好的方案...
一次消費對外總是Ack消息到達了消費端 , 可因某些原因消費失敗了,對外可以發送Ack,而在內部走額外的方式去執行補償操作 , 比如將消息轉發到內部的RabbitMQ或是其他處理方式,終歸是只消費一次 。
var queueName = "alwaysack_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    try    {        var message = ea.Body;        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))        {            throw new Exception("模擬異常");        }    }    catch (Exception ex)    {        Console.WriteLine(ex.Message);    }    finally    {        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);    }};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);當消費端收到消息,處理時出現異常 , 可以另想辦法去處理,而對外保持著ack的返回,以避免消息的循環消費 。
.Net Core&RabbitMQ限制循環消費

文章插圖
消息不重入隊列在消費者端 , 因異常或是拒收消息時,對requeue設置為false時,如果設置了死信隊列,則符合“消息被拒絕且不重入隊列”這一進入死信隊列的情況,從而避免消息反復重試 。如未設置死信隊列,則消息被丟失 。

推薦閱讀