golang高并发系统限流策略漏桶和令牌桶算法源码剖析_第1页
golang高并发系统限流策略漏桶和令牌桶算法源码剖析_第2页
golang高并发系统限流策略漏桶和令牌桶算法源码剖析_第3页
golang高并发系统限流策略漏桶和令牌桶算法源码剖析_第4页
golang高并发系统限流策略漏桶和令牌桶算法源码剖析_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第golang高并发系统限流策略漏桶和令牌桶算法源码剖析目录前言漏桶算法样例源码实现令牌桶算法样例源码剖析Limit类型Limiter结构体Reservation结构体Limiter消费tokenlimiter归还Token总结

前言

今天与大家聊一聊高并发系统中的限流技术,限流又称为流量控制,是指限制到达系统的并发请求数,当达到限制条件则可以拒绝请求,可以起到保护下游服务,防止服务过载等作用。常用的限流策略有漏桶算法、令牌桶算法、滑动窗口;下文主要与大家一起分析一下漏桶算法和令牌桶算法,滑动窗口就不在这里这介绍了。好啦,废话不多话,开整。

文中测试代码已上传:/asong2025/G

漏桶算法

漏桶算法比较好理解,假设我们现在有一个水桶,我们向这个水桶里添水,虽然我们我们无法预计一次会添多少水,也无法预计水流入的速度,但是可以固定出水的速度,不论添水的速率有多大,都按照固定的速率流出,如果桶满了,溢出的上方水直接抛弃。我们把水当作HTTP请求,每次都把请求放到一个桶中,然后以固定的速率处理请求,说了这么多,不如看一个图加深理解(图片来自于网络,手残党不会画,多多包涵):

原理其实很简单,就看我们怎么实现它了,uber团队有一个开源的uber-go/ratelimit库,这个库就是漏桶的一种实现,下面我们一起来看一看他的实现思路。

样例

学习一个新东西的时候,往往是从会用开始的,慢慢才能明白其实现原理,所以我们先来看看这个库是怎样使用的,这里我们直接提供一个实际使用例子,配合Gin框架,我们添加一个限流中间件,来达到请求限流的作用,测试代码如下:

//定义全局限流器对象

varrateLimitratelimit.Limiter

//在gin.HandlerFunc加入限流逻辑

funcleakyBucket()gin.HandlerFunc{

prev:=time.Now()

returnfunc(c*gin.Context){

now:=rateLimit.Take()

fmt.Println(now.Sub(prev))//为了打印时间间隔

prev=now//记录上一次的时间,没有这个打印的会有问题

funcmain(){

rateLimit=ratelimit.New(10)

r:=gin.Default()

r.GET("/ping",leakyBucket(),func(c*gin.Context){

c.JSON(200,true)

r.Run()//listenandserveon:8080(forwindows"localhost:8080")

我们简单使用压测工具ab测试一下:ab-n10-c2:8080/ping,执行结果部分如下:

观察结果可知,每次处理请求的时间间隔是10ms,并且后面的请求耗时越来越久,为什么会这样呢?这里先卖个小关子,看完uber的实现你就知道了~

源码实现

我们首先来看一下其核心结构:

typelimiterstruct{

sync.Mutex

lasttime.Time

sleepFortime.Duration

perRequesttime.Duration

maxSlacktime.Duration

clockClock

typeLimiterinterface{

//TakeshouldblocktomakesurethattheRPSismet.

Take()time.Time

限制器接口只提供了一个方法take(),take()方法会阻塞确保两次请求之间的时间走完,具体实现我们在下面进行分析。实现限制器接口的结构体中各个字段的意义如下:

sync.Mutext:互斥锁,控制并发的作用

last:记录上一次的时刻

sleepFor:距离处理下一次请求需要等待的时间

perRequest:每次请求的时间间隔

maxSlack:最大松弛量,用来解决突发流量

clock:一个时钟或模拟时钟,提供了now和sleep方法,是实例化速率限制器

要是用该限制器,首先需要通过New方法进行初始化,一个必传的参数是rate,代表的是每秒请求量(RPS),还有一个可选参数,参数类型option,也就是我们可以自定义limit,不过一般使用场景不多,这里就不过多介绍了。我主要看一下他是怎么保证固定速率的,截取New方法部分代码如下:

l:=limiter{

perRequest:time.Second/time.Duration(rate),

maxSlack:-10*time.Second/time.Duration(rate),

根据我们传入的请求数量,能计算出1s内要通过n个请求,每个请求之间的间隔时间是多少,这样在take方法中就可以根据这个字段来处理请求的固定速率问题,这里还初始化了最大松弛化字段,他的值是负数,默认最大松弛量是10个请求的时间间隔。

接下来我们主要看一下take方法:

func(t*limiter)Take()time.Time{

t.Lock()

defert.Unlock()

now:=t.clock.Now()

ift.last.IsZero(){

t.last=now

returnt.last

t.sleepFor+=t.perRequest-now.Sub(t.last)

ift.sleepFort.maxSlack{

t.sleepFor=t.maxSlack

ift.sleepFor0{

t.clock.Sleep(t.sleepFor)

t.last=now.Add(t.sleepFor)

t.sleepFor=0

}else{

t.last=now

returnt.last

take()方法的执行步骤如下:

为了控制并发,所以进入该方法就需要进行上锁,该锁的粒度比较大,整个方法都加上了锁通过IsZero方法来判断当前是否是第一次请求,如果是第一次请求,直接取now时间即可返回。如果不是第一次请求,就需要计算距离处理下一次请求需要等待的时间,这里有一个要注意点的是累加需要等待的时间,目的是可以给后面的抵消使用如果当前累加需要等待的时间大于最大松弛量了,将等待的时间设置为最大松弛量的时间。如果当前请求多余的时间无法完全抵消此次的所需量,调用sleep方法进行阻塞,同时清空等待的时间。如果sleepFor小于0,说明此次请求时间间隔大于预期间隔,也就说无需等待可以直接处理请求。

步骤其实不是很多,主要需要注意一个知识点最大松弛量。

漏桶算法有个天然缺陷就是无法应对突发流量(匀速,两次请求req1和req2之间的延迟至少应该=perRequest),举个例子说明:假设我们现在有三个请求req1、req2、req3按顺序处理,每个请求处理间隔为100ms,req1请求处理完成之后150ms,req2请求到来,依据限速策略可以对req2立即处理,当req2完成后,50ms后,req3到来,这个时候距离上次请求还不足100ms,因此还需要等待50ms才能继续执行,但是,对于这种情况,实际上这三个请求一共消耗了250ms才完成,并不是预期的200ms。

对于上面这种情况,我们可以把之前间隔比较长的请求的时间匀给后面的请求判断限流时使用,减少请求等待的时间了,但是当两个请求之间到达的间隔比较大时,就会产生很大的可抵消时间,以至于后面大量请求瞬间到达时,也无法抵消这个时间,那样就已经失去了限流的意义,所以引入了最大松弛量(maxSlack)的概念,该值为负值,表示允许抵消的最长时间,防止以上情况的出现。

以上就是漏桶实现的基本思路了,整体还是很简单的,你学会了吗?

令牌桶算法

令牌桶其实和漏桶的原理类似,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。从网上找了图,表述非常恰当:

关于令牌桶限流算法的实现,Github有一个高效的基于令牌桶限流算法实现的限流库:/juju/ratelimit,Golang的timer/rate也是令牌桶的一种实现,本文就不介绍juju/ratelimit库了,有兴趣的自己学习一下的他的实现思想吧,我们主要来看一看time/rate是如何实现的。

还是老样子,我们还是结合gin写一个限流中间件看看他是怎么使用的,例子如下:

import(

"net/http"

"time"

"/gin-gonic/gin"

"/x/time/rate"

varrateLimit*rate.Limiter

functokenBucket()gin.HandlerFunc{

returnfunc(c*gin.Context){

ifrateLimit.Allow(){

c.String(http.StatusOK,"ratelimit,Drop")

c.Abort()

return

c.Next()

funcmain(){

limit:=rate.Every(100*time.Millisecond)

rateLimit=rate.NewLimiter(limit,10)

r:=gin.Default()

r.GET("/ping",tokenBucket(),func(c*gin.Context){

c.JSON(200,true)

r.Run()//listenandserveon:8080(forwindows"localhost:8080")

上面的例子我们首先调用NewLimiter方法构造一个限流器,第一个参数是rlimit,代表每秒可以向Token桶中产生多少token,第二个参数是bint,代表Token桶的容量大小,对于上面的例子,表示每100ms往桶中放一个token,也就是1s钟产生10个,桶的容量就是10。消费token的方法这里我们使用Allow方法,Allow实际上就是AllowN(time.Now(),1),AllowN方法表示,截止到某一时刻,目前桶中数目是否至少为n个,满足则返回true,同时从桶中消费n个token。反之返回不消费Token。对应上面的例子,当桶中的数目不足于1个时,就会丢掉该请求。

源码剖析

Limit类型

time/rate自定义了一个limit类型,其实他本质就是float64的别名,Limit定了事件的最大频率,表示每秒事件的数据量,0就表示无限制。Inf是无限的速率限制;它允许所有事件(即使突发为0)。还提供Every方法来指定向Token桶中放置Token的间隔,计算出每秒时间的数据量。

typeLimitfloat64

//Infistheinfiniteratelimit;itallowsallevents(evenifburstiszero).

constInf=Limit(math.MaxFloat64)

//EveryconvertsaminimumtimeintervalbetweeneventstoaLimit.

funcEvery(intervaltime.Duration)Limit{

ifinterval=0{

returnInf

return1/Limit(interval.Seconds())

Limiter结构体

typeLimiterstruct{

musync.Mutex

limitLimit

burstint

tokensfloat64

//lastisthelasttimethelimiter'stokensfieldwasupdated

lasttime.Time

//lastEventisthelatesttimeofarate-limitedevent(pastorfuture)

lastEventtime.Time

各个字段含义如下:

mu:互斥锁、为了控制并发limit:每秒允许处理的事件数量,即每秒处理事件的频率burst:令牌桶的最大数量,如果burst为0,并且limit==Inf,则允许处理任何事件,否则不允许tokens:令牌桶中可用的令牌数量last:记录上次limiter的tokens被更新的时间lastEvent:lastEvent记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation预定的结束时间点)

Reservation结构体

typeReservationstruct{

okbool

lim*Limiter

tokensint

timeToActtime.Time

//ThisistheLimitatreservationtime,itcanchangelater.

limitLimit

各个字段含义如下:

ok:到截至时间是否可以获取足够的令牌

lim:limiter对象

tokens:需要获取的令牌数量

timeToAct:需要等待的时间点

limit:代表预定的时间,是可以更改的。

reservation就是一个预定令牌的操作,timeToAct是本次预约需要等待到的指定时间点才有足够预约的令牌。

Limiter消费token

Limiter有三个token的消费方法,分别是Allow、Reserve和Wait,最终三种消费方式都调用了reserveN、advance这两个方法来生成和消费Token。所以我们主要看看reserveN、advance函数的具体实现。

advance方法的实现:

func(lim*Limiter)advance(nowtime.Time)(newNowtime.Time,newLasttime.Time,newTokensfloat64){

//last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少

last:=lim.last

ifnow.Before(last){

last=now

//根据令牌桶的缺数计算出令牌桶未进行更新的最大时间

maxElapsed:=lim.limit.durationFromTokens(float64(lim.burst)-lim.tokens)

elapsed:=now.Sub(last)//令牌桶未进行更新的时间段

ifelapsedmaxElapsed{

elapsed=maxElapsed

//根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数

delta:=lim.limit.tokensFromDuration(elapsed)

tokens:=lim.tokens+delta//计算出可用的令牌数

ifburst:=float64(lim.burst);tokensburst{

tokens=burst

returnnow,last,tokens

advance方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed算出需要向桶中加入的令牌数delta,然后算出桶中可用的令牌数newTokens.

reserveN方法的实现:reserveN是AllowN,ReserveN及WaitN的辅助方法,用于判断在maxFutureReserve时间内是否有足够的令牌。

//@paramn要消费的token数量

//@parammaxFutureReserve愿意等待的最长时间

func(lim*Limiter)reserveN(nowtime.Time,nint,maxFutureReservetime.Duration)Reservation{

lim.mu.Lock()

//如果没有限制

iflim.limit==Inf{

lim.mu.Unlock()

returnReservation{

ok:true,//桶中有足够的令牌

lim:lim,

tokens:n,

timeToAct:now,

//更新令牌桶的状态,tokens为目前可用的令牌数量

now,last,tokens:=lim.advance(now)

//计算取完之后桶还能剩能下多少token

tokens-=float64(n)

varwaitDurationtime.Duration

//如果token0,说明目前的token不够,需要等待一段时间

iftokens0{

waitDuration=lim.limit.durationFromTokens(-tokens)

ok:=n=lim.burstwaitDuration=maxFutureReserve

r:=Reservation{

ok:ok,

lim:lim,

limit:lim.limit,

//timeToAct表示当桶中满足token数目等于n的时间

ifok{

r.tokens=n

r.timeToAct=now.Add(waitDuration)

//更新桶里面的token数目

//更新last时间

//lastEvent

ifok{

lim.last=now

lim.tokens=tokens

lim.lastEvent=r.timeToAct

}else{

lim.last=last

lim.mu.Unlock()

returnr

上面的代码我已经进行了注释,这里在总结一下流程:

首选判断是否拥有速率限制,没有速率限制也就是桶中一致拥有足够的令牌。计算从上次取Token的时间到当前时刻,期间一共新产生了多少Token:我们只在取Token之前生成新的Token,也就意味着每次取Token的间隔,实际上也是生成Token的间隔。我们可以利用tokensFromDuration,轻易的算出这段时间一共产生Token的数目。所以当前Token数目=新产生的Token数目+之前剩余的Token数目-要消费的Token数目。如果消费后剩余Token数目大于零,说明此时Token桶内仍不为空,此时Token充足,无需调用侧等待。如果Token数目小于零,则需等待一段时间。那么这个时候,我们可以利用durationFromTokens将当前负值的Token数转化为需要等待的时间。将需要等待的时间等相关结果返回给调用方

其实整个过程就是利用了Token数可以和时间相互转化的原理。而如果Token数为负,则需要等待相应时间即可。

上面提到了durationFromTokens、tokensFromDuration这两个方法,是关键,他们的实现如下:

func(limitLimit)durationFromTokens(tokensfloat64)time.Duration{

seconds:=tokens/float64(limit)

returntime.Nanosecond*time.Duration(1e9*seconds)

func(limitLimit)tokensFromDuration(dtime.Duration)float64{

//Splittheintegerandfractionalpartsourselftominimizeroundingerrors.

//See/issues/34861.

sec:=float64(d/time.Second)*float64(limit)

nsec:=float64(d%time.Second)*float64(limit)

returnsec+nsec/1e9

durationFromTokens:功能是计算出生成N个新的Token一共需要多久。tokensFromDuration:给定一段时长,这段时间一共可以生成多少个Token。

细心的网友会发现tokensFromDuration方法既然是计算一段时间一共可以生成多少个Token,为什么不直接进行相乘呢?其实Golang最初的版本就是采用d.Seconds()*float64(limit)直接相乘实现的,虽然看上去一点问题没有,但是这里是两个小数相乘,会带来精度损失,所以采用现在这种方法实现,分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。

limiter归还Token

既然我们可以消费Token,那么对应也可以取消此次消费,将token归还,当调用Cancel()函数时,消费的Token数将会尽可能归还给Token桶。归还也并不是那么简单,接下我们我们看看归还token是如何实现的。

func(r*Reservation)CancelAt(nowtime.Time){

if!r.ok{

return

r.lim.mu.Lock()

deferr.lim.mu.Unlock()

1.如果无需限流

2.tokens为0(需要获取的令牌数量为0)

3.已经过了截至时间

以上三种情况无需处理取消操作

ifr.lim.limit==Inf||r.tokens==0||r.timeToAct.Before(now){

return

//计算出需要还原的令牌数量

//这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去

restoreTokens:=float64(r.tokens)-r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))

//当小于0,表示已经都预支完了,不能归还了

ifrestoreTokens=0{

return

//从新计算令牌桶的状态

now,_,tokens:=r.lim.advance(now)

//还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens

tokens+=restoreTokens

//如果tokens大于桶的最大容量,则将tokens置为桶的最大容量

ifburst:=float64(r.lim.burst)

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论