深入理解Golangchannel的应用_第1页
深入理解Golangchannel的应用_第2页
深入理解Golangchannel的应用_第3页
深入理解Golangchannel的应用_第4页
深入理解Golangchannel的应用_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第深入理解Golangchannel的应用目录前言整体结构创建发送接收关闭

前言

channel是用于goroutine之间的同步、通信的数据结构

channel的底层是通过mutex来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

协程间通信,同步定时任务:和timer结合解耦生产方和消费方,实现阻塞队列控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Gochannel的数据结构如下所示:

typehchanstruct{

qcountuint//totaldatainthequeue

dataqsizuint//sizeofthecircularqueue

bufunsafe.Pointer//pointstoanarrayofdataqsizelements

elemsizeuint16

closeduint32

elemtype*_type//elementtype

sendxuint//sendindex

recvxuint//receiveindex

recvqwaitq//listofrecvwaiters

sendqwaitq//listofsendwaiters

lockmutex

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

记录下一个要发送到的位置,下一次从哪里还是接收这里用数组模拟队列,这两个变量即表示队列的队头,队尾因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

//无缓冲通道

ch1:=make(chanint)

//有缓冲通道

ch2:=make(chanint,10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递channel,而不用传递channel的指针

funcmakechan(t*chantype,sizeint)*hchan{

elem:=t.elem

//mem:缓冲区大小

mem,overflow:=math.MulUintptr(elem.size,uintptr(size))

ifoverflow||memmaxAlloc-hchanSize||size0{

panic(plainError("makechan:sizeoutofrange"))

varc*hchan

switch{

//缓冲区大小为空,只申请hchanSize大小的内存

casemem==0:

c=(*hchan)(mallocgc(hchanSize,nil,true))

c.buf=c.raceaddr()

//元素类型不包含指针,一次性分配hchanSize+mem大小的内存

caseelem.ptrdata==0:

c=(*hchan)(mallocgc(hchanSize+mem,nil,true))

c.buf=add(unsafe.Pointer(c),hchanSize)

//否则就是带缓存,且有指针,分配两次内存

default:

//Elementscontainpointers.

c=new(hchan)

c.buf=mallocgc(mem,elem,true)

//保存元素类型,元素大小,容量

c.elemsize=uint16(elem.size)

c.elemtype=elem

c.dataqsiz=uint(size)

lockInit(c.lock,lockRankHchan)

returnc

发送

执行以下代码时:

ch-3

编译器会转化为对chansend的调用

funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{

//如果channel是空

ifc==nil{

//非阻塞,直接返回

if!block{

returnfalse

//否则阻塞当前协程

gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)

throw("unreachable")

//非阻塞,没有关闭,且容量满了,无法发送,直接返回

if!blockc.closed==0full(c){

returnfalse

//加锁

lock(c.lock)

//如果已经关闭,无法发送,直接panic

ifc.closed!=0{

unlock(c.lock)

panic(plainError("sendonclosedchannel"))

//从接收队列弹出一个协程的包装结构sudog

ifsg:=c.recvq.dequeue();sg!=nil{

//如果能弹出,即有等到接收的协程,说明:

//该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待

//将要发送的数据拷贝到该协程的接收指针上

send(c,sg,ep,func(){unlock(c.lock)},3)

returntrue

//缓冲区还有空间

ifc.qcountc.dataqsiz{

//qp:计算要发送到的位置的地址

qp:=chanbuf(c,c.sendx)

//将数据从ep拷贝到qp

typedmemmove(c.elemtype,qp,ep)

//待发送位置移动

c.sendx++

//由于是数组模拟队列,sendx到顶了需要归零

ifc.sendx==c.dataqsiz{

c.sendx=0

//缓冲区数量++

c.qcount++

unlock(c.lock)

returntrue

//往下就是缓冲区无数据,也没有等到接收协程的情况了

//如果是非阻塞模式,直接返回

if!block{

unlock(c.lock)

returnfalse

//将当前协程包装成sudog,阻塞到channel上

gp:=getg()

mysg:=acquireSudog()

mysg.releasetime=0

ift0!=0{

mysg.releasetime=-1

mysg.elem=ep

mysg.waitlink=nil

mysg.g=gp

mysg.isSelect=false

mysg.c=c

gp.waiting=mysg

gp.param=nil

//当前协程进入发送等待队列

c.sendq.enqueue(mysg)

atomic.Store8(gp.parkingOnChan,1)

gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanSend,traceEvGoBlockSend,2)

//被唤醒后从这里开始执行

KeepAlive(ep)

ifmysg!=gp.waiting{

throw("Gwaitinglistiscorrupted")

gp.waiting=nil

gp.activeStackChans=false

closed:=!mysg.success

gp.param=nil

ifmysg.releasetime0{

blockevent(mysg.releasetime-t0,2)

mysg.c=nil

releaseSudog(mysg)

//被唤醒后发现channel关闭了,panic

ifclosed{

ifc.closed==0{

throw("chansend:spuriouswakeup")

panic(plainError("sendonclosedchannel"))

returntrue

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待将要发送的数据拷贝到该协程的接收指针上,返回这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

typesudogstruct{

//协程

g*g

//前一个,后一个指针

next*sudog

prev*sudog

//等到发送的数据在哪,等待从哪个位置接收数据

elemunsafe.Pointer

acquiretimeint64

releasetimeint64

ticketuint32

isSelectbool

successbool

parent*sudog//semaRootbinarytree

waitlink*sudog//g.waitinglistorsemaRoot

waittail*sudog//semaRoot

//在哪个channel上等待

c*hchan//channel

其目的是:

g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

funcfull(c*hchan)bool{

//无缓冲

ifc.dataqsiz==0{

//并且没有其他协程在等待

returnc.recvq.first==nil

//有缓冲,但容量装满了

returnc.qcount==c.dataqsiz

2.send方法:

/**

c:要操作的channel

sg:弹出的接收者协程

ep:要发送的数据在的位置

funcsend(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){

//如果接收者指针不为空,直接把数据从ep拷贝到sg.elem

ifsg.elem!=nil{

sendDirect(c.elemtype,sg,ep)

sg.elem=nil

gp:=sg.g

unlockf()

gp.param=unsafe.Pointer(sg)

sg.success=true

ifsg.releasetime!=0{

sg.releasetime=cputicks()

//唤醒该接收者协程

goready(gp,skip+1)

接收

从channel中接收数据有几种写法:

带不带ok接不接收返回值

根据带不带ok,决定用下面哪个方法

funcchanrecv1(c*hchan,elemunsafe.Pointer){

chanrecv(c,elem,true)

funcchanrecv2(c*hchan,elemunsafe.Pointer)(receivedbool){

_,received=chanrecv(c,elem,true)

return

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){

//如果channel为nil,根据参数中是否阻塞来决定是否阻塞

ifc==nil{

if!block{

return

gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2)

throw("unreachable")

//非阻塞,并且channel为空

if!blockempty(c){

//如果还没关闭,直接返回

ifatomic.Load(c.closed)==0{

return

//否则已经关闭,

//如果为空,返回该类型的零值

ifempty(c){

ifep!=nil{

typedmemclr(c.elemtype,ep)

returntrue,false

lock(c.lock)

//同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值

ifc.closed!=0c.qcount==0{

unlock(c.lock)

ifep!=nil{

typedmemclr(c.elemtype,ep)

returntrue,false

//如果有发送者正在阻塞,说明:

//1.无缓冲

//2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待

ifsg:=c.sendq.dequeue();sg!=nil{

//将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文

recv(c,sg,ep,func(){unlock(c.lock)},3)

returntrue,true

//如果缓存区有数据,

ifc.qcount0{

//qp为缓冲区中下一次接收的位置

qp:=chanbuf(c,c.recvx)

//将数据从qp拷贝到ep

ifep!=nil{

typedmemmove(c.elemtype,ep,qp)

typedmemclr(c.elemtype,qp)

c.recvx++

ifc.recvx==c.dataqsiz{

c.recvx=0

c.qcount--

unlock(c.lock)

returntrue,true

//接下来就是既没有发送者在等待,也缓冲区也没数据

if!block{

unlock(c.lock)

returnfalse,false

//将当前协程包装成sudog,阻塞到channel中

gp:=getg()

mysg:=acquireSudog()

mysg.releasetime=0

ift0!=0{

mysg.releasetime=-1

//记录接收地址

mysg.elem=ep

mysg.waitlink=nil

gp.waiting=mysg

mysg.g=gp

mysg.isSelect=false

mysg.c=c

gp.param=nil

c.recvq.enqueue(mysg)

atomic.Store8(gp.parkingOnChan,1)

gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2)

//从这里唤醒

ifmysg!=gp.waiting{

throw("Gwaitinglistiscorrupted")

gp.waiting=nil

gp.activeStackChans=false

ifmysg.releasetime0{

blockevent(mysg.releasetime-t0,2)

success:=mysg.success

gp.param=nil

mysg.c=nil

releaseSudog(mysg)

returntrue,success

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

要么是无缓冲有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据,则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

/**

c:操作的channel

sg:阻塞的发送协程

ep:接收者接收数据的地址

funcrecv(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){

//如果是无缓冲channel,直接将数据从发送者sg拷贝到ep

ifc.dataqsiz==0{

ifep!=nil{

recvDirect(c.elemtype,sg,ep)

//接下来是有缓冲,且缓冲区满的情况

}else{

//qp为channel缓冲区中,接收者下一次接收的地址

qp:=chanbuf(c,c.recvx)

//将数据从qp拷贝到ep

ifep!=nil{

typedmemmove(c.elemtype,ep,qp)

//将发送者的数据从sg.elem拷贝到qp

typedmemmove(c.elemtype,qp,sg.elem)

c.recvx++

ifc.recvx==c.dataqsiz{

c.recvx=0

//由于一接收已发送,缓冲区还是满的,因此c.sendx=c.recvx

c.sendx=c.recvx

sg.elem=nil

gp:=sg.g

unlockf()

gp.param=unsafe.Pointer(sg)

sg.success=true

ifsg.releasetime!=0{

sg.releasetime=cputicks()

//唤醒发送者

goready(gp,skip+1)

关闭

funcclosechan(c*hchan){

//不能关闭空channel

ifc==nil{

panic(plainError("closeofnilchannel"))

lock(c.lock)

//不能重复关闭

ifc.closed!=0{

unlock(c.lock)

panic(plainError("closeofclosedchannel"))

//修改关闭状态

c.close

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论