RabbitMQ高级之失败重试机制(含源码)
创始人
2025-07-07 10:11:44
0

一、失败重试机制

    当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

    为此,可利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

配置参数如下:

listener: # 开启消费者确认其机制
      simple:
        prefetch: 1  #消费者每次只能获取一条消息,处理完才能获取下一条(可实现能者多劳)
        acknowledge-mode: AUTO  # none:关闭ack;manual:手动ack;auto:自动ack
        retry:
          enabled: true  #开启消费者失败重试
          initial-interval: 1000ms  #初始的失败等待时长为1秒
          multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

测试结果:

图片图片

但是重试三次后,队列里面的消息被踢出了:

图片图片

二、失败消息处理策略

    在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

     1、RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。

    2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(不建议采用:会出现死循环)。

     3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(推荐使用)

图片图片

三、实现方式

3.1、定义接收失败消息的交换机、队列及其绑定关系:

/**
     * 功能描述:定义接收错误消费的日志
     * @MethodName: receiveErrorMessage
     * @MethodParam: [message]
     * @Return: void
     * @Author: yyalin
     * @CreateDate: 2023/11/15 9:55
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "errorQueue"),
            exchange = @Exchange(name = "errorExchange", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"),
            key = "errorRouting"
    ))
    public void receiveErrorMessage(String message) {
        log.info("消费者收到发送错误的消息: " + message);
    }

3.2、定义RepublishMessageRecoverer:

/**
 * @Description: TODO:定义错误消息接收
 * @Author: yyalin
 * @CreateDate: 2023/11/15 9:58
 * @Version: V1.0
 */
@Configuration
@Slf4j
public class ErrorConfig {
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        log.debug("加载RepublishMessageRecoverer");
        return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","errorRouting");
    }
}

3.3、测试结果:

图片图片

3.4、总结

消费者如何保证消息一定被消费?

  •     开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack。如果一直处理异常会一直重试。
  •     开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

相关内容

热门资讯

如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...
《非诚勿扰》红人闫凤娇被曝厕所... 【51CTO.com 综合消息360安全专家提醒说,“闫凤娇”、“非诚勿扰”已经被黑客盯上成为了“木...
2012年第四季度互联网状况报... [[71653]]  北京时间4月25日消息,据国外媒体报道,全球知名的云平台公司Akamai Te...