版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
【移动应用开发技术】怎么在Kotlin中使用RocketMQ实现一个延时消息
这期内容当中在下将会给大家带来有关怎么在Kotlin中使用RocketMQ实现一个延时消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一.延时消息延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。使用延时消息的典型场景,例如:在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。在电商系统中,用户七天内没有评价商品,则默认好评。这些场景对应的解决方案,包括:轮询遍历数据库记录JDK的DelayQueueScheduledExecutorService基于Quartz的定时任务基于Redis的zset实现延时队列。除此之外,还可以使用消息队列来实现延时消息,例如RocketMQ。二.RocketMQRocketMQ是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件。三.RocketMQ实现延时消息3.1业务背景我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。3.2生产者(Producer)生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。首先,定义一个支持延时发送的AbstractProducer。abstract
class
AbstractProducer
:ProducerBean()
{
var
producerId:
String?
=
null
var
topic:
String?
=
null
var
tag:
String?=null
var
timeoutMillis:
Int?
=
null
var
delaySendTimeMills:
Long?
=
null
val
log
=
LogFactory.getLog(this.javaClass)
open
fun
sendMessage(messageBody:
Any,
tag:
String)
{
val
msgBody
=
JSON.toJSONString(messageBody)
val
message
=
Message(topic,
tag,
msgBody.toByteArray())
if
(delaySendTimeMills
!=
null)
{
val
startDeliverTime
=
System.currentTimeMillis()
+
delaySendTimeMills!!
message.startDeliverTime
=
startDeliverTime
(
"send
delay
message
producer
startDeliverTime:${startDeliverTime}currentTime
:${System.currentTimeMillis()}")
}
val
logMessageId
=
buildLogMessageId(message)
try
{
val
sendResult
=
send(message)
(logMessageId
+
"producer
messageId:
"
+
sendResult.getMessageId()
+
"\n"
+
"messageBody:
"
+
msgBody)
}
catch
(e:
Exception)
{
log.error(logMessageId
+
"messageBody:
"
+
msgBody
+
"\n"
+
"
error:
"
+
e.message,
e)
}
}
fun
buildLogMessageId(message:
Message):
String
{
return
"topic:
"
+
message.topic
+
"\n"
+
"producer:
"
+
producerId
+
"\n"
+
"tag:
"
+
message.tag
+
"\n"
+
"key:
"
+
message.key
+
"\n"
}
}根据业务需要,增加一个支持重试机制的Producer@Component
@ConfigurationProperties("ducers.xxx-producer")
@Configuration
@Data
class
CleanReportPushEventProducer
:AbstractProducer()
{
lateinit
var
delaySecondList:List<Long>
fun
sendMessage(messageBody:
CleanReportPushEventMessage){
//重试超过次数之后不再发事件
if
(delaySecondList!=null)
{
if(messageBody.times>=delaySecondList.size){
return
}
val
msgBody
=
JSON.toJSONString(messageBody)
val
message
=
Message(topic,
tag,
msgBody.toByteArray())
val
delayTimeMills
=
delaySecondList[messageBody.times]*1000L
message.startDeliverTime
=
System.currentTimeMillis()
+
delayTimeMills
(
"messageBody:
"
+
msgBody+
"startDeliverTime:
"+message.startDeliverTime
)
val
logMessageId
=
buildLogMessageId(message)
try
{
val
sendResult
=
send(message)
(logMessageId
+
"producer
messageId:
"
+
sendResult.getMessageId()
+
"\n"
+
"messageBody:
"
+
msgBody)
}
catch
(e:
Exception)
{
log.error(logMessageId
+
"messageBody:
"
+
msgBody
+
"\n"
+
"
error:
"
+
e.message,
e)
}
}
}
}在CleanReportPushEventProducer中,超过了重试的次数就不会再发送消息了。每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills。通过System.currentTimeMillis()+delayTimeMills可以设置message的startDeliverTime。然后调用send(message)即可发送延时消息。我们使用商用版的RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ只支持18个特定级别的延迟消息。:(3.3消费者(Consumer)消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。定义Push类型的AbstractConsumer:@Data
abstract
class
AbstractConsumer
():MessageListener{
var
consumerId:
String?
=
null
lateinit
var
subscribeOptions:
List<SubscribeOptions>
var
threadNums:
Int?
=
null
val
log
=
LogFactory.getLog(this.javaClass)
override
fun
consume(message:
Message,
context:
ConsumeContext):
Action
{
val
logMessageId
=
buildLogMessageId(message)
val
body
=
String(message.body)
try
{
(logMessageId
+
"
body:
"
+
body)
val
result
=
consumeInternal(message,
context,
JSON.parseObject(body,
getMessageBodyType(message.tag)))
(logMessageId
+
"
result:
"
+
)
return
result
}
catch
(e:
Exception)
{
if
(message.reconsumeTimes
>=
3)
{
log.error(logMessageId
+
"
error:
"
+
e.message,
e)
}
return
Action.ReconsumeLater
}
}
abstract
fun
getMessageBodyType(tag:
String):
Type?
abstract
fun
consumeInternal(message:
Message,
context:
ConsumeContext,
obj:
Any):
Action
protected
fun
buildLogMessageId(message:
Message):
String
{
return
"topic:
"
+
message.topic
+
"\n"
+
"consumer:
"
+
consumerId
+
"\n"
+
"tag:
"
+
message.tag
+
"\n"
+
"key:
"
+
message.key
+
"\n"
+
"MsgId:"
+
message.msgID
+
"\n"
+
"BornTimestamp"
+
message.bornTimestamp
+
"\n"
+
"StartDeliverTime:"
+
message.startDeliverTime
+
"\n"
+
"ReconsumeTimes:"
+
message.reconsumeTimes
+
"\n"
}
}再定义具体的消费者,并且在消费失败之后能够再发送一次消息。@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class
CleanReportPushEventConsumer(val
cleanReportService:
CleanReportService,val
eventProducer:CleanReportPushEventProducer):AbstractConsumer()
{
val
logger:
Logger
=
LoggerFactory.getLogger(this.javaClass)
override
fun
consumeInternal(message:
Message,
context:
ConsumeContext,
obj:
Any):
Action
{
if(obj
is
CleanReportPushEventMessage){
//清除事件
("consumer
clean-report
event
report_id:${obj.id}
")
//消费失败之后再发送一次消息
if(!cleanReportService.sendCleanReportEvent(obj.id)){
val
times
=
obj.times+1
eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
}
}
return
Action.CommitMessage
}
override
fun
getMessageBodyType(tag:
String):
Type?
{
return
CleanReportPushEventMessage::class.java
}
}其中,cleanReportService的sendCleanReportEvent()会通过http的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了eventProducer的sendMessage()方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)最后,定义ConsumerFactory@Component
class
ConsumerFactory(val
consumers:
List<AbstractConsumer>,val
aliyunOnsOptions:
AliyunOnsOptions)
{
val
logger:
Logger
=
LoggerFactory.getLogger(this.javaClass)
@PostConstruct
fun
start()
{
CompletableFuture.runAsync{
consumers.stream().forEach
{
val
properties
=
buildProperties(it.consumerId!!,
it.threadNums)
val
consumer
=
ONSFactory.createConsumer(properties)
if
(it.subscribeOptions
!=
null
&&
!it.subscribeOptions!!.isEmpty())
{
for
(options
in
it.subscribeOptions!!)
{
consumer.subscribe(options.topic,
options.tag,
it)
}
consumer.start()
val
message
=
"\n".plus(
it.subscribeOptions!!.stream().map{
a
->
String.format("topic:
%s,
tag:
%s
has
been
started",
a.topic,
a.tag)}
.collect(Collectors.toList<
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2023-2024学年浙江省高一下学期3月四校联考数学试题(解析版)
- 社会保险改革策略
- 中小学实验教学管理制度
- 电话用户本人实名业务办理承诺书
- 美学之美-探索方括号、花括号、括号等符号在设计中的应用
- 【选修现当代】《药》鲁迅(同步课件)-高二语文名校共研名师共创单元同步(统编版选择性必修下册)
- 房屋改造申请书范本
- 体育劳动合同范本
- 合同补充协议4篇
- 2024年泉州市晋江市六年级下学期模拟语文试题含答案
- 2024年03月上海市价格认证中心工作人员招考聘用笔试历年(2016-2023年)真题荟萃带答案解析
- 蒙自源饮食有限公司餐厅经理人事管理工作指引
- 苏教版七年级下册数学《期中检测题》附答案解析
- 配电网运行规程.doc
- 受限空间施工安全专项方案
- 工程洽商记录(最新整理)
- 人事前台文员——职位说明书(最新整理)
- 幼儿园家长委员会章程最新版本
- “三会一课”会议流程
- 二字词语接龙
- 常用磁芯规格参数
评论
0/150
提交评论