Go+Kafka实现延迟消息的实现示例_第1页
Go+Kafka实现延迟消息的实现示例_第2页
Go+Kafka实现延迟消息的实现示例_第3页
Go+Kafka实现延迟消息的实现示例_第4页
Go+Kafka实现延迟消息的实现示例_第5页
已阅读5页,还剩2页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第Go+Kafka实现延迟消息的实现示例目录前言原理简单的实现生产者延迟服务消费者改进点通用的延迟服务生产者负责延迟服务总结

前言

延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等。

这篇文章主要是使用Go+Kafka实现延迟消息。

使用了sarama客户端。

原理

Kafka实现延迟消息分为下面三步:

生产者把消息发送到延迟队列延迟服务把延迟队列里超过延迟时间的消息写入真实队列消费者消费真实队列里的消息

简单的实现

生产者

生产者只是把消息发送到延迟队列

msg:=sarama.ProducerMessage{

Topic:kafka_delay_queue_test.DelayTopic,

Value:sarama.ByteEncoder("test"+strconv.Itoa(i)),

if_,_,err:=producer.SendMessage(msg);err!=nil{

log.Println(err)

}

延迟服务

延迟服务会订阅延迟队列的消息,并把超时消息发送到真实队列

iferr=consumerGroup.Consume(context.Background(),

[]string{kafka_delay_queue_test.DelayTopic},consumer);err!=nil{

break

}

typeConsumerstruct{

producersarama.SyncProducer

delaytime.Duration

funcNewConsumer(producersarama.SyncProducer,delaytime.Duration)*Consumer{

returnConsumer{

producer:producer,

delay:delay,

func(c*Consumer)ConsumeClaim(sessionsarama.ConsumerGroupSession,claimsarama.ConsumerGroupClaim)error{

formessage:=rangeclaim.Messages(){

//如果消息已经超时,把消息发送到真实队列

now:=time.Now()

ifnow.Sub(message.Timestamp)=c.delay{

_,_,err:=ducer.SendMessage(sarama.ProducerMessage{

Topic:kafka_delay_queue_test.RealTopic,

Key:sarama.ByteEncoder(message.Key),

Value:sarama.ByteEncoder(message.Value),

iferr==nil{

session.MarkMessage(message,"")

continue

//否则休眠一秒

time.Sleep(time.Second)

returnnil

returnnil

}

消费者

消费者只是订阅真实队列并消费消息

iferr=consumerGroup.Consume(context.Background(),

[]string{kafka_delay_queue_test.RealTopic},consumer);err!=nil{

break

}

typeConsumerstruct{}

funcNewConsumer()*Consumer{

returnConsumer{}

func(c*Consumer)ConsumeClaim(sessionsarama.ConsumerGroupSession,claimsarama.ConsumerGroupClaim)error{

formessage:=rangeclaim.Messages(){

fmt.Println("收到消息:",message.Value,message.Timestamp)

session.MarkMessage(message,"")

returnnil

}

改进点

通用的延迟服务

可以把延迟服务封装成一个通用的服务,这样生产者可以直接把消息发送给延迟服务,让延迟服务去处理剩下的逻辑。

延迟服务可以提供多个延时等级,比如5s、10s、30s、1m、5m、10m、1h、2h等,类似于RocketMQ。

生产者负责延迟服务

也可以让生产者负责延迟服务,让生产者自己把延迟队列里面的消息发送到真实队列。

下面是一个简单的实现:

//KafkaDelayQueueProducer延迟队列生产者,包含了生产者和延迟服务

typeKafkaDelayQueueProducerstruct{

producersarama.SyncProducer//生产者

delayTopicstring//延迟服务主题

//NewKafkaDelayQueueProducer创建延迟队列生产者

//producer生产者

//delayServiceConsumerGroup延迟服务消费者

//delayTime延迟时间

//delayTopic延迟服务主题

//realTopic真实队列主题

funcNewKafkaDelayQueueProducer(producersarama.SyncProducer,delayServiceConsumerGroupsarama.ConsumerGroup,

delayTimetime.Duration,delayTopic,realTopicstring)*KafkaDelayQueueProducer{

//启动延迟服务

consumer:=NewDelayServiceConsumer(producer,delayTime,realTopic)

gofunc(){

for{

iferr:=delayServiceConsumerGroup.Consume(context.Background(),

[]string{delayTopic},consumer);err!=nil{

break

returnKafkaDelayQueueProducer{

producer:producer,

delayTopic:delayTopic,

//SendMessage发送消息

func(q*KafkaDelayQueueProducer)SendMessage(msg*sarama.ProducerMessage)(partitionint32,offsetint64,errerror){

msg.Topic=q.delayTopic

returnducer.SendMessage(msg)

//DelayServiceConsumer延迟服务消费者

typeDelayServiceConsumerstruct{

producersarama.SyncProducer

delaytime.Duration

realTopicstring

funcNewDelayServiceConsumer(producersarama.SyncProducer,delaytime.Duration,

realTopicstring)*DelayServiceConsumer{

returnDelayServiceConsumer{

producer:producer,

delay:delay,

realTopic:realTopic,

func(c*DelayServiceConsumer)ConsumeClaim(sessionsarama.ConsumerGroupSession,

claimsarama.ConsumerGroupClaim)error{

formessage:=rangeclaim.Messages(){

//如果消息已经超时,把消息发送到真实队列

now:=time.Now()

ifnow.Sub(message.Timestamp)=c.delay{

_,_,err:=ducer.SendMessage(sarama.ProducerMessage{

Topic:c.realTopic,

Key:sarama.ByteEncoder(message.Key),

Value:sarama.ByteEncoder(message.Value),

iferr==nil{

session.MarkMessage(message,"")

continue

//否则休眠一秒

time.Sleep(time.Second)

returnnil

returnnil

func(c*DelayServiceConsumer)Setup(sarama.ConsumerGroupSession)error{

returnnil

func(c*DelayServiceConsumer)Cleanup(sarama.ConsumerGroupSession)error{

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论