当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
为此,可利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
配置参数如下:
listener: # 开启消费者确认其机制
simple:
prefetch: 1 #消费者每次只能获取一条消息,处理完才能获取下一条(可实现能者多劳)
acknowledge-mode: AUTO # none:关闭ack;manual:手动ack;auto:自动ack
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始的失败等待时长为1秒
multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
测试结果:
图片
但是重试三次后,队列里面的消息被踢出了:
图片
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
1、RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(不建议采用:会出现死循环)。
3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(推荐使用)
图片
3.1、定义接收失败消息的交换机、队列及其绑定关系:
/**
* 功能描述:定义接收错误消费的日志
* @MethodName: receiveErrorMessage
* @MethodParam: [message]
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/11/15 9:55
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "errorQueue"),
exchange = @Exchange(name = "errorExchange", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"),
key = "errorRouting"
))
public void receiveErrorMessage(String message) {
log.info("消费者收到发送错误的消息: " + message);
}
/**
* @Description: TODO:定义错误消息接收
* @Author: yyalin
* @CreateDate: 2023/11/15 9:58
* @Version: V1.0
*/
@Configuration
@Slf4j
public class ErrorConfig {
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
log.debug("加载RepublishMessageRecoverer");
return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","errorRouting");
}
}
图片
消费者如何保证消息一定被消费?
当前文章:RabbitMQ高级之失败重试机制(含源码)
当前URL:http://www.mswzjz.cn/qtweb/news42/449092.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能