RocketMq消息重试机制及死信队列详解_第1页
RocketMq消息重试机制及死信队列详解_第2页
RocketMq消息重试机制及死信队列详解_第3页
RocketMq消息重试机制及死信队列详解_第4页
RocketMq消息重试机制及死信队列详解_第5页
已阅读5页,还剩6页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第RocketMq消息重试机制及死信队列详解目录生产者消息重试消费者消息重试并发消费顺序消费并发消费和顺序消费区别死信队列实践出真知公共部分创建测试并发消费并发消费状态测试顺序消费顺序消费状态测试死信队列死信队列特性

生产者消息重试

消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。

有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了。

#异步消息发送失败重试次数,默认为2

ducer.retry-times-when-send-async-failed=2

#消息发送失败重试次数,默认为2

ducer.retry-times-when-send-failed=2

也可以通过下面这种方式配置

DefaultMQProducerdefaultMQProducer=newDefaultMQProducer();

defaultMQProducer.setRetryTimesWhenSendFailed(2);

defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

消费者消息重试

ApacheRocketMQ有两种消费模式:集群消费模式和广播消费模式。消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

同时RocketMqPush消费提供了两种消费方式:并发消费和顺序消费。

并发消费

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。

顺序消费

顺序消费是消息生产者发送过来的消息会遵循FIFO队列的思想,先进先出有顺序的消费消息。对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

并发消费和顺序消费区别

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

两者参数差别如下

消费类型重试间隔最大重试次数顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

并发消费重试间隔如下:

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间110s97min230s108min31min119min42min1210min53min1320min64min1430min75min151h86min162h

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-LetterMessage),存储死信消息的特殊队列称为死信队列(Dead-LetterQueue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQAdmin工具或者RocketMQDashboard上查询到对应死信消息的信息。

实践出真知

Talkischeap,showyouthecode.

公共部分创建

-server=localhost:9876

#消费者组

ducer.group=producer_group

rocketmq.consumer.topic=consumer_topic

rocketmq.consumer.group=consumer_group

创建消费者RetryConsumerDemo

@Component

publicclassRetryConsumerDemo{

@Value("${-server}")

privateStringnamesrvAddr;

@Value("${rocketmq.consumer.topic}")

privateStringtopic;

@Value("${rocketmq.consumer.group}")

privateStringconsumerGroup;

privatefinalDefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("consumer_group");

@PostConstruct

publicvoidstart(){

try{

consumer.setNamesrvAddr(namesrvAddr);

//设置集群消费模式

consumer.setMessageModel(MessageModel.CLUSTERING);

//设置消费超时时间(分钟)

consumer.setConsumeTimeout(1);

//订阅主题

consumer.subscribe(topic,"*");

//注册消息监听器

consumer.registerMessageListener(newMessageListenerConcurrentlyImpl());

//最大重试次数

consumer.setMaxReconsumeTimes(2);

//启动消费端

consumer.start();

System.out.println("RetryConsumerStart...");

}catch(MQClientExceptione){

e.printStackTrace();

测试并发消费

创建并发消费监听类并发消费监听类要实现MessageListenerConcurrently类

publicclassMessageListenerConcurrentlyImplimplementsMessageListenerConcurrently{

@Override

publicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){

if(CollectionUtils.isEmpty(msgs)){

returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;

MessageExtmessage=msgs.get(0);

try{

finalLocalDateTimenow=LocalDateTime.now();

//逐条消费

StringmessageBody=newString(message.getBody(),StandardCharsets.UTF_8);

System.out.println("当前时间:"+now+",messageId:"+message.getMsgId()+",topic:"+

message.getTopic()+",messageBody:"+messageBody);

//模拟消费失败

if("Concurrently_test".equals(messageBody)){

inta=1/0;

returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}catch(Exceptione){

returnConsumeConcurrentlyStatus.RECONSUME_LATER;

注册监听类在消费者类RetryConsumerDemo中注册监听类

//注册消息监听器

consumer.registerMessageListener(newMessageListenerConcurrentlyImpl());

@RunWith(SpringRunner.class)

@SpringBootTest(classes=RocketmqApplication.class)

classRocketmqApplicationTests{

@Value("${rocketmq.consumer.topic}")

privateStringtopic;

@Autowired

privateRocketMQTemplaterocketMQTemplate;

@Test

publicvoidtestProducer(){

Stringmsg="Concurrently_test";

rocketMQTemplate.convertAndSend(topic,msg);

测试结果:

后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来。

然后通过RocketMqDashboardTopic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息。

并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;

0代表不重试,只要失败就会放入死信队列;

1-16重试次数对应着上面时间间隔表中对应次数。

配置的最大重试次数超过16就按16处理。

并发消费状态

并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试。

publicenumConsumeConcurrentlyStatus{

*Successconsumption

CONSUME_SUCCESS,

*Failureconsumption,latertrytoconsume

RECONSUME_LATER;

当MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER。

测试顺序消费

顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly接口

创建顺序消费监听类

publicclassMessageListenerOrderlyImplimplementsMessageListenerOrderly{

@Override

publicConsumeOrderlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeOrderlyContextcontext){

if(CollectionUtils.isEmpty(msgs)){

returnConsumeOrderlyStatus.SUCCESS;

MessageExtmessage=msgs.get(0);

try{

finalLocalDateTimenow=LocalDateTime.now();

//逐条消费

StringmessageBody=newString(message.getBody(),StandardCharsets.UTF_8);

System.out.println("当前时间:"+now+",messageId:"+message.getMsgId()+",topic:"+

message.getTopic()+",messageBody:"+messageBody);

//模拟消费失败

if("Orderly_test".equals(messageBody)){

inta=1/0;

returnConsumeOrderlyStatus.SUCCESS;

}catch(Exceptione){

returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

注册监听类

//最大重试次数

consumer.setMaxReconsumeTimes(2);

//顺序消费重试时间间隔

consumer.setSuspendCurrentQueueTimeMillis(2000);

SuspendCurrentQueueTimeMillis表示重试的时间间隔,默认是1s,这里修改成2s

@RunWith(SpringRunner.class)

@SpringBootTest(classes=RocketmqApplication.class)

classRocketmqApplicationTests{

@Value("${rocketmq.consumer.topic}")

privateStringtopic;

@Autowired

privateRocketMQTemplaterocketMQTemplate;

@Test

publicvoidtestProducer(){

Stringmsg="Orderly_test";

rocketMQTemplate.convertAndSend(topic,msg);

测试结果:

可以看到三条结果,第一条是第一次消费的,其余两条是隔了2s重试的。重试2次之后这条数据就进入了死信队列。

顺序消费状态

顺序消费目前也是两个状态:SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下,过S

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论