基于Golang实现延迟队列(DelayQueue)_第1页
基于Golang实现延迟队列(DelayQueue)_第2页
基于Golang实现延迟队列(DelayQueue)_第3页
基于Golang实现延迟队列(DelayQueue)_第4页
基于Golang实现延迟队列(DelayQueue)_第5页
已阅读5页,还剩3页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第基于Golang实现延迟队列(DelayQueue)目录背景原理堆随机删除重置元素到期时间Golang实现数据结构实现原理添加元素阻塞获取元素Channel方式阻塞读取性能测试总结

背景

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。

由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):

缓存:用户淘汰过期元素通知:在指定时间通知用户,比如会议开始前30分钟订单:30分钟未支付取消订单超时:服务器自动断开太长时间没有心跳的连接

其实在Golang中是自带定时器的,也就是time.After()、time.AfterFunc()等函数,它们的性能也是非常好的,随着Golang版本升级还会优化。但是对于某些场景来说确实不够方便,比如缓存场景我们需要能够支持随机删除定时器,随机重置过期时间,更加灵活的删除一小批过期元素。而且像Kafka的时间轮算法(TimeWheel)里面也用到了延迟队列,因此还是有必要了解下如何实现延迟队列。

原理

延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。

随机删除

有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:

元素添加删除标记字段:堆中每个元素都添加一个删除标记字段,并把这个元素的地址返回给用户,用户就可以标记元素的这个字段为true,这样元素到达堆顶时如果判断到这个字段为true就会被清除,而延迟队列里的元素逻辑上是一定会到达堆顶的(因为时间会流逝)。这是一种懒删除的方式。元素添加堆中下标字段(或用map记录下标):堆中每个元素都添加一个堆中下标字段,并把这个元素的地址返回给用户,这样我们就可以通过这个元素里面记录的下标快速定位元素在堆中的位置,从而删除元素。详细可以看文章如何实现一个支持O(log(n))随机删除元素的堆。

重置元素到期时间

如果需要重置延迟队列里面元素的到期时间,则必须知道元素在堆中的下标,因为重置到期时间之后必须对堆进行调整,因此只能是元素添加堆中下标字段。

Golang实现

这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。

代码地址

数据结构

主要的结构可以看到就是一个heap,Entry是每个元素在堆中的表示,Value是具体的元素值,Expired是为了堆中元素根据到期时间排序。

mutex是一个互斥锁,主要是保证操作并发安全。

wakeup是一个缓冲区长度为1的通道,通过它实现添加元素的时候唤醒等待队列不为空或者有更小到期时间元素加入的协程。(重点)

typeEntry[Tany]struct{

ValueT

Expiredtime.Time//到期时间

//延迟队列

typeDelayQueue[Tany]struct{

h*heap.Heap[*Entry[T]]

mutexsync.Mutex//保证并发安全

wakeupchanstruct{}//唤醒通道

//创建延迟队列

funcNew[Tany]()*DelayQueue[T]{

returnDelayQueue[T]{

h:heap.New(nil,func(e1,e2*Entry[T])bool{

returne1.Expired.Before(e2.Expired)

wakeup:make(chanstruct{},1),

}

实现原理

阻塞获取元素的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期、队列不为空或者有更小到期时间元素加入。

其中元素到期协程在阻塞获取元素时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空和有更小到期时间元素加入则需要另外一个协程在添加元素时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。

添加元素

一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。

因为我们Take()操作,既阻塞获取元素操作,在不满足条件时会去等待wakeup通道,但是等待通道前必须释放锁,否则Push()无法写入新元素去满足条件队列不为空和有更小到期时间元素加入。而从释放锁后到开始读取wakeup通道这段时间是没有锁保护的,如果Push()在这期间插入新元素,为了保证通道不阻塞同时又能通知到Take()协程,我们的通道的长度需要是1,同时使用select+default保证在通道里面已经有元素的时候不阻塞Push()协程。

//添加延迟元素到队列

func(q*DelayQueue[T])Push(valueT,delaytime.Duration){

q.mutex.Lock()

deferq.mutex.Unlock()

entry:=Entry[T]{

Value:value,

Expired:time.Now().Add(delay),

q.h.Push(entry)

//唤醒等待的协程

//这里表示新添加的元素到期时间是最早的,或者原来队列为空

//因此必须唤醒等待的协程,因为可以拿到更早到期的元素

ifq.h.Peek()==entry{

select{

caseq.wakeup-struct{}{}:

default:

}

阻塞获取元素

这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。否则等待直到超时或者元素到期或者有新的元素到达。

这里在解锁之前会清空wakeup通道,这样可以保证下面读取的wakeup通道里的元素肯定是新加入的。

//等待直到有元素到期

//或者ctx被关闭

func(q*DelayQueue[T])Take(ctxcontext.Context)(T,bool){

for{

varexpired*time.Timer

q.mutex.Lock()

//有元素

if!q.h.Empty(){

//获取元素

entry:=q.h.Peek()

iftime.Now().After(entry.Expired){

q.h.Pop()

q.mutex.Unlock()

returnentry.Value,true

//到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器

expired=time.NewTimer(time.Until(entry.Expired))

//避免被之前的元素假唤醒

select{

case-q.wakeup:

default:

q.mutex.Unlock()

//不为空,需要同时等待元素到期

//并且除非expired到期,否则都需要关闭expired避免泄露

ifexpired!=nil{

select{

case-q.wakeup://新的更快到期元素

expired.Stop()

case-expired.C://首元素到期

case-ctx.Done()://被关闭

expired.Stop()

vartT

returnt,false

}else{

select{

case-q.wakeup://新的更快到期元素

case-ctx.Done()://被关闭

vartT

returnt,false

}

Channel方式阻塞读取

Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。

//返回一个通道,输出到期元素

//size是通道缓存大小

func(q*DelayQueue[T])Channel(ctxcontext.Context,sizeint)-chanT{

out:=make(chanT,size)

gofunc(){

for{

entry,ok:=q.Take(ctx)

if!ok{

return

out-entry

returnout

}

使用方式

forentry:=rangeq.Channel(context.Background(),10){

//dosomething

}

性能测试

这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。

funcBenchmarkPushAndTake(b*testing.B){

q:=New[int]()

b.ResetTimer()

//添加元素

fori:=0;ib.N;i++{

q.Push(i,time.Duration(i))

//等待全部元素到期

b.StopTimer()

time.Sleep(time.Duration(b.N))

b.StartTimer()

//获取元素

fori:=0;ib.N;i++{

_,ok:=q.Take(context.Background())

if!ok{

b.Errorf("want%v,but%v",true,ok)

}

测试结果:

Benchmark-82331534476.8ns/op

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论