【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息_第1页
【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息_第2页
【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息_第3页
【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息_第4页
【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息

这期内容当中在下将会给大家带来有关怎么在Kotlin中使用RocketMQ实现一个延时消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一.延时消息延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。使用延时消息的典型场景,例如:在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。在电商系统中,用户七天内没有评价商品,则默认好评。这些场景对应的解决方案,包括:轮询遍历数据库记录JDK的DelayQueueScheduledExecutorService基于Quartz的定时任务基于Redis的zset实现延时队列。除此之外,还可以使用消息队列来实现延时消息,例如RocketMQ。二.RocketMQRocketMQ是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件。三.RocketMQ实现延时消息3.1业务背景我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。3.2生产者(Producer)生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。首先,定义一个支持延时发送的AbstractProducer。abstract

class

AbstractProducer

:ProducerBean()

{

var

producerId:

String?

=

null

var

topic:

String?

=

null

var

tag:

String?=null

var

timeoutMillis:

Int?

=

null

var

delaySendTimeMills:

Long?

=

null

val

log

=

LogFactory.getLog(this.javaClass)

open

fun

sendMessage(messageBody:

Any,

tag:

String)

{

val

msgBody

=

JSON.toJSONString(messageBody)

val

message

=

Message(topic,

tag,

msgBody.toByteArray())

if

(delaySendTimeMills

!=

null)

{

val

startDeliverTime

=

System.currentTimeMillis()

+

delaySendTimeMills!!

message.startDeliverTime

=

startDeliverTime

(

"send

delay

message

producer

startDeliverTime:${startDeliverTime}currentTime

:${System.currentTimeMillis()}")

}

val

logMessageId

=

buildLogMessageId(message)

try

{

val

sendResult

=

send(message)

(logMessageId

+

"producer

messageId:

"

+

sendResult.getMessageId()

+

"\n"

+

"messageBody:

"

+

msgBody)

}

catch

(e:

Exception)

{

log.error(logMessageId

+

"messageBody:

"

+

msgBody

+

"\n"

+

"

error:

"

+

e.message,

e)

}

}

fun

buildLogMessageId(message:

Message):

String

{

return

"topic:

"

+

message.topic

+

"\n"

+

"producer:

"

+

producerId

+

"\n"

+

"tag:

"

+

message.tag

+

"\n"

+

"key:

"

+

message.key

+

"\n"

}

}根据业务需要,增加一个支持重试机制的Producer@Component

@ConfigurationProperties("ducers.xxx-producer")

@Configuration

@Data

class

CleanReportPushEventProducer

:AbstractProducer()

{

lateinit

var

delaySecondList:List<Long>

fun

sendMessage(messageBody:

CleanReportPushEventMessage){

//重试超过次数之后不再发事件

if

(delaySecondList!=null)

{

if(messageBody.times>=delaySecondList.size){

return

}

val

msgBody

=

JSON.toJSONString(messageBody)

val

message

=

Message(topic,

tag,

msgBody.toByteArray())

val

delayTimeMills

=

delaySecondList[messageBody.times]*1000L

message.startDeliverTime

=

System.currentTimeMillis()

+

delayTimeMills

(

"messageBody:

"

+

msgBody+

"startDeliverTime:

"+message.startDeliverTime

)

val

logMessageId

=

buildLogMessageId(message)

try

{

val

sendResult

=

send(message)

(logMessageId

+

"producer

messageId:

"

+

sendResult.getMessageId()

+

"\n"

+

"messageBody:

"

+

msgBody)

}

catch

(e:

Exception)

{

log.error(logMessageId

+

"messageBody:

"

+

msgBody

+

"\n"

+

"

error:

"

+

e.message,

e)

}

}

}

}在CleanReportPushEventProducer中,超过了重试的次数就不会再发送消息了。每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills。通过System.currentTimeMillis()+delayTimeMills可以设置message的startDeliverTime。然后调用send(message)即可发送延时消息。我们使用商用版的RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ只支持18个特定级别的延迟消息。:(3.3消费者(Consumer)消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。定义Push类型的AbstractConsumer:@Data

abstract

class

AbstractConsumer

():MessageListener{

var

consumerId:

String?

=

null

lateinit

var

subscribeOptions:

List<SubscribeOptions>

var

threadNums:

Int?

=

null

val

log

=

LogFactory.getLog(this.javaClass)

override

fun

consume(message:

Message,

context:

ConsumeContext):

Action

{

val

logMessageId

=

buildLogMessageId(message)

val

body

=

String(message.body)

try

{

(logMessageId

+

"

body:

"

+

body)

val

result

=

consumeInternal(message,

context,

JSON.parseObject(body,

getMessageBodyType(message.tag)))

(logMessageId

+

"

result:

"

+

)

return

result

}

catch

(e:

Exception)

{

if

(message.reconsumeTimes

>=

3)

{

log.error(logMessageId

+

"

error:

"

+

e.message,

e)

}

return

Action.ReconsumeLater

}

}

abstract

fun

getMessageBodyType(tag:

String):

Type?

abstract

fun

consumeInternal(message:

Message,

context:

ConsumeContext,

obj:

Any):

Action

protected

fun

buildLogMessageId(message:

Message):

String

{

return

"topic:

"

+

message.topic

+

"\n"

+

"consumer:

"

+

consumerId

+

"\n"

+

"tag:

"

+

message.tag

+

"\n"

+

"key:

"

+

message.key

+

"\n"

+

"MsgId:"

+

message.msgID

+

"\n"

+

"BornTimestamp"

+

message.bornTimestamp

+

"\n"

+

"StartDeliverTime:"

+

message.startDeliverTime

+

"\n"

+

"ReconsumeTimes:"

+

message.reconsumeTimes

+

"\n"

}

}再定义具体的消费者,并且在消费失败之后能够再发送一次消息。@Configuration

@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")

@Data

class

CleanReportPushEventConsumer(val

cleanReportService:

CleanReportService,val

eventProducer:CleanReportPushEventProducer):AbstractConsumer()

{

val

logger:

Logger

=

LoggerFactory.getLogger(this.javaClass)

override

fun

consumeInternal(message:

Message,

context:

ConsumeContext,

obj:

Any):

Action

{

if(obj

is

CleanReportPushEventMessage){

//清除事件

("consumer

clean-report

event

report_id:${obj.id}

")

//消费失败之后再发送一次消息

if(!cleanReportService.sendCleanReportEvent(obj.id)){

val

times

=

obj.times+1

eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))

}

}

return

Action.CommitMessage

}

override

fun

getMessageBodyType(tag:

String):

Type?

{

return

CleanReportPushEventMessage::class.java

}

}其中,cleanReportService的sendCleanReportEvent()会通过http的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了eventProducer的sendMessage()方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)最后,定义ConsumerFactory@Component

class

ConsumerFactory(val

consumers:

List<AbstractConsumer>,val

aliyunOnsOptions:

AliyunOnsOptions)

{

val

logger:

Logger

=

LoggerFactory.getLogger(this.javaClass)

@PostConstruct

fun

start()

{

CompletableFuture.runAsync{

consumers.stream().forEach

{

val

properties

=

buildProperties(it.consumerId!!,

it.threadNums)

val

consumer

=

ONSFactory.createConsumer(properties)

if

(it.subscribeOptions

!=

null

&&

!it.subscribeOptions!!.isEmpty())

{

for

(options

in

it.subscribeOptions!!)

{

consumer.subscribe(options.topic,

options.tag,

it)

}

consumer.start()

val

message

=

"\n".plus(

it.subscribeOptions!!.stream().map{

a

->

String.format("topic:

%s,

tag:

%s

has

been

started",

a.topic,

a.tag)}

.collect(Collectors.toList<

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论