.Net Core&RabbitMQ限制循環消費( 二 )


.Net Core&RabbitMQ限制循環消費

文章插圖
此處假定接收100條消息,在接收到第50條消息時設置拒收,并且設置了requeue為false 。
var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");var queueName = "nackorreject_queue";var arguments = new Dictionary<string, object>{    { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關鍵在于requeue=false        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);【.Net Core&RabbitMQ限制循環消費】如此一來,拒收消息不會重入隊列,并且現有隊列綁定了死信交換機,因此,消息進入到死信隊列中,如不綁定 , 則消息丟失 。
.Net Core&amp;RabbitMQ限制循環消費

文章插圖
限定重試次數設置重試次數,限定循環消費的次數,允許短暫的循環,但最終打破循環 。
消息頭設定次數在消息頭中設置次數記錄作為標記,但是 , 消費端無法對接收到的消息修改消息頭然后將原消息送回MQ,因此,需要將原消息內容重新發送消息到MQ,具體步驟如下
  1. 原消息設置不重入隊列 。
  2. 再發送新的消息其內容與原消息一致,可設置新消息的消息頭來攜帶重試次數 。
  3. 消費端再次消費時,便可從消息頭中查看消息被消費的次數 。
    .Net Core&amp;RabbitMQ限制循環消費

    文章插圖
此處假定接收10條消息,在接收到第5條消息時設置拒收 ,  當消息頭中重試次數未超過設定的3次時,消息可以重入隊列,再次被消費 。
var queueName = "messageheaderretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))    {        var maxRetryCount = 3;        Console.WriteLine($"拒收 {DateTime.Now}");        //初次消費        if (ea.BasicProperties.Headers == null)        {            //原消息設置為不重入隊列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //發送新消息到隊列中            RetryPublishMessage(channel, queueName, message.ToArray(), 1);            return;        }        //獲取重試次數        var retryCount = ParseRetryCount(ea);        if (retryCount < maxRetryCount)        {            //原消息設置為不重入隊列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //發送新消息到隊列中            RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);            return;        }        //到達最大次數,不再重試消息        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount){    var basicProperties = channel.CreateBasicProperties();    basicProperties.Headers = new Dictionary<string, object>();    basicProperties.Headers.Add("retryCount", retryCount);    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);}static int ParseRetryCount(BasicDeliverEventArgs ea){    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);    if (!existRetryRecord)    {        throw new Exception("沒有設置重試次數");    }    return (int)retryCount;}

推薦閱讀