版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第RocketMq消息重试机制及死信队列详解目录生产者消息重试消费者消息重试并发消费顺序消费并发消费和顺序消费区别死信队列实践出真知公共部分创建测试并发消费并发消费状态测试顺序消费顺序消费状态测试死信队列死信队列特性
生产者消息重试
消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。
有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了。
#异步消息发送失败重试次数,默认为2
ducer.retry-times-when-send-async-failed=2
#消息发送失败重试次数,默认为2
ducer.retry-times-when-send-failed=2
也可以通过下面这种方式配置
DefaultMQProducerdefaultMQProducer=newDefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
消费者消息重试
ApacheRocketMQ有两种消费模式:集群消费模式和广播消费模式。消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
同时RocketMqPush消费提供了两种消费方式:并发消费和顺序消费。
并发消费
在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。
顺序消费
顺序消费是消息生产者发送过来的消息会遵循FIFO队列的思想,先进先出有顺序的消费消息。对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
并发消费和顺序消费区别
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。
两者参数差别如下
消费类型重试间隔最大重试次数顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值
并发消费重试间隔如下:
第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间110s97min230s108min31min119min42min1210min53min1320min64min1430min75min151h86min162h
死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-LetterMessage),存储死信消息的特殊队列称为死信队列(Dead-LetterQueue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQAdmin工具或者RocketMQDashboard上查询到对应死信消息的信息。
实践出真知
Talkischeap,showyouthecode.
公共部分创建
-server=localhost:9876
#消费者组
ducer.group=producer_group
rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group
创建消费者RetryConsumerDemo
@Component
publicclassRetryConsumerDemo{
@Value("${-server}")
privateStringnamesrvAddr;
@Value("${rocketmq.consumer.topic}")
privateStringtopic;
@Value("${rocketmq.consumer.group}")
privateStringconsumerGroup;
privatefinalDefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("consumer_group");
@PostConstruct
publicvoidstart(){
try{
consumer.setNamesrvAddr(namesrvAddr);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费超时时间(分钟)
consumer.setConsumeTimeout(1);
//订阅主题
consumer.subscribe(topic,"*");
//注册消息监听器
consumer.registerMessageListener(newMessageListenerConcurrentlyImpl());
//最大重试次数
consumer.setMaxReconsumeTimes(2);
//启动消费端
consumer.start();
System.out.println("RetryConsumerStart...");
}catch(MQClientExceptione){
e.printStackTrace();
测试并发消费
创建并发消费监听类并发消费监听类要实现MessageListenerConcurrently类
publicclassMessageListenerConcurrentlyImplimplementsMessageListenerConcurrently{
@Override
publicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){
if(CollectionUtils.isEmpty(msgs)){
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
MessageExtmessage=msgs.get(0);
try{
finalLocalDateTimenow=LocalDateTime.now();
//逐条消费
StringmessageBody=newString(message.getBody(),StandardCharsets.UTF_8);
System.out.println("当前时间:"+now+",messageId:"+message.getMsgId()+",topic:"+
message.getTopic()+",messageBody:"+messageBody);
//模拟消费失败
if("Concurrently_test".equals(messageBody)){
inta=1/0;
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch(Exceptione){
returnConsumeConcurrentlyStatus.RECONSUME_LATER;
注册监听类在消费者类RetryConsumerDemo中注册监听类
//注册消息监听器
consumer.registerMessageListener(newMessageListenerConcurrentlyImpl());
@RunWith(SpringRunner.class)
@SpringBootTest(classes=RocketmqApplication.class)
classRocketmqApplicationTests{
@Value("${rocketmq.consumer.topic}")
privateStringtopic;
@Autowired
privateRocketMQTemplaterocketMQTemplate;
@Test
publicvoidtestProducer(){
Stringmsg="Concurrently_test";
rocketMQTemplate.convertAndSend(topic,msg);
测试结果:
后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来。
然后通过RocketMqDashboardTopic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息。
并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;
0代表不重试,只要失败就会放入死信队列;
1-16重试次数对应着上面时间间隔表中对应次数。
配置的最大重试次数超过16就按16处理。
并发消费状态
并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试。
publicenumConsumeConcurrentlyStatus{
*Successconsumption
CONSUME_SUCCESS,
*Failureconsumption,latertrytoconsume
RECONSUME_LATER;
当MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER。
测试顺序消费
顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly接口
创建顺序消费监听类
publicclassMessageListenerOrderlyImplimplementsMessageListenerOrderly{
@Override
publicConsumeOrderlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeOrderlyContextcontext){
if(CollectionUtils.isEmpty(msgs)){
returnConsumeOrderlyStatus.SUCCESS;
MessageExtmessage=msgs.get(0);
try{
finalLocalDateTimenow=LocalDateTime.now();
//逐条消费
StringmessageBody=newString(message.getBody(),StandardCharsets.UTF_8);
System.out.println("当前时间:"+now+",messageId:"+message.getMsgId()+",topic:"+
message.getTopic()+",messageBody:"+messageBody);
//模拟消费失败
if("Orderly_test".equals(messageBody)){
inta=1/0;
returnConsumeOrderlyStatus.SUCCESS;
}catch(Exceptione){
returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
注册监听类
//最大重试次数
consumer.setMaxReconsumeTimes(2);
//顺序消费重试时间间隔
consumer.setSuspendCurrentQueueTimeMillis(2000);
SuspendCurrentQueueTimeMillis表示重试的时间间隔,默认是1s,这里修改成2s
@RunWith(SpringRunner.class)
@SpringBootTest(classes=RocketmqApplication.class)
classRocketmqApplicationTests{
@Value("${rocketmq.consumer.topic}")
privateStringtopic;
@Autowired
privateRocketMQTemplaterocketMQTemplate;
@Test
publicvoidtestProducer(){
Stringmsg="Orderly_test";
rocketMQTemplate.convertAndSend(topic,msg);
测试结果:
可以看到三条结果,第一条是第一次消费的,其余两条是隔了2s重试的。重试2次之后这条数据就进入了死信队列。
顺序消费状态
顺序消费目前也是两个状态:SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下,过S
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 江苏沭阳高级中学2025-2026学年高一下学期3月阶段检测化学试题(含解析)
- 江西赣州市会昌县2026年中考第二次模拟考试道德与法治(含解析)
- 2025年报关员《海关法规》真题解析选择题满分技巧配套
- 2021冀北电网研究生面试综合能力题题库及高分参考答案
- 2020滕州初中语文面试试讲易错点配套题库及答案
- 2023甘肃法宣在线刷题小程序配套试题及正确答案
- 2026年开发主管面试题及答案 3天突击专用 零基础也能面过管理岗
- 2024年恶意代码分析方向面试题及答案 技术大牛岗专属备考资料
- 2021徐州首创水务劳务派遣岗面试题库及参考答案
- 第2课时平面与平面垂直课件2025-2026学年高二下学期数学湘教版选择性必修第二册
- (2025版)血液净化模式选择专家共识解读
- 2026年北京市丰台区高三一模英语试卷(含答案)
- 2025上市公司股权激励100问-
- 急性心肌梗死并发心脏破裂的临床诊疗与管理
- 2026年国家队反兴奋剂准入教育考试试题及答案
- 第九章第一节压强课件2025-2026学年人教版物理八年级下学期
- 100以内看图写数专项练习题(每日一练共6份)
- 移动模架施工安全监理实施细则
- 2025-2026学年卖油翁教学设计初一语文
- 中兴新云2026年测评-B套题
- 2026年商丘职业技术学院单招职业技能测试题库带答案详解
评论
0/150
提交评论