




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第深入理解Golangchannel的应用目录前言整体结构创建发送接收关闭
前言
channel是用于goroutine之间的同步、通信的数据结构
channel的底层是通过mutex来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率
channel的用途包括但不限于以下几点:
协程间通信,同步定时任务:和timer结合解耦生产方和消费方,实现阻塞队列控制并发数
本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑
整体结构
Gochannel的数据结构如下所示:
typehchanstruct{
qcountuint//totaldatainthequeue
dataqsizuint//sizeofthecircularqueue
bufunsafe.Pointer//pointstoanarrayofdataqsizelements
elemsizeuint16
closeduint32
elemtype*_type//elementtype
sendxuint//sendindex
recvxuint//receiveindex
recvqwaitq//listofrecvwaiters
sendqwaitq//listofsendwaiters
lockmutex
qcount:已经存储了多少个元素
dataqsie:最多存储多少个元素,即缓冲区容量
buf:指向缓冲区的位置,实际上是一个数组
elemsize:每个元素占多大空间
closed:channel能够关闭,这里记录其关闭状态
elemtype:保存数据的类型信息,用于go运行时使用
sendx,recvx:
记录下一个要发送到的位置,下一次从哪里还是接收这里用数组模拟队列,这两个变量即表示队列的队头,队尾因此channel的缓冲也被称为环形缓冲区
recvq,sendq:
当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送
lock:channel支持协程间并发访问,因此需要一把锁来保护
创建
创建channel会被编译器编译为调用makechan函数
//无缓冲通道
ch1:=make(chanint)
//有缓冲通道
ch2:=make(chanint,10)
会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值
可以看出,创建出来的是hchan指针,这样就能在函数间直接传递channel,而不用传递channel的指针
funcmakechan(t*chantype,sizeint)*hchan{
elem:=t.elem
//mem:缓冲区大小
mem,overflow:=math.MulUintptr(elem.size,uintptr(size))
ifoverflow||memmaxAlloc-hchanSize||size0{
panic(plainError("makechan:sizeoutofrange"))
varc*hchan
switch{
//缓冲区大小为空,只申请hchanSize大小的内存
casemem==0:
c=(*hchan)(mallocgc(hchanSize,nil,true))
c.buf=c.raceaddr()
//元素类型不包含指针,一次性分配hchanSize+mem大小的内存
caseelem.ptrdata==0:
c=(*hchan)(mallocgc(hchanSize+mem,nil,true))
c.buf=add(unsafe.Pointer(c),hchanSize)
//否则就是带缓存,且有指针,分配两次内存
default:
//Elementscontainpointers.
c=new(hchan)
c.buf=mallocgc(mem,elem,true)
//保存元素类型,元素大小,容量
c.elemsize=uint16(elem.size)
c.elemtype=elem
c.dataqsiz=uint(size)
lockInit(c.lock,lockRankHchan)
returnc
发送
执行以下代码时:
ch-3
编译器会转化为对chansend的调用
funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{
//如果channel是空
ifc==nil{
//非阻塞,直接返回
if!block{
returnfalse
//否则阻塞当前协程
gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)
throw("unreachable")
//非阻塞,没有关闭,且容量满了,无法发送,直接返回
if!blockc.closed==0full(c){
returnfalse
//加锁
lock(c.lock)
//如果已经关闭,无法发送,直接panic
ifc.closed!=0{
unlock(c.lock)
panic(plainError("sendonclosedchannel"))
//从接收队列弹出一个协程的包装结构sudog
ifsg:=c.recvq.dequeue();sg!=nil{
//如果能弹出,即有等到接收的协程,说明:
//该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
//将要发送的数据拷贝到该协程的接收指针上
send(c,sg,ep,func(){unlock(c.lock)},3)
returntrue
//缓冲区还有空间
ifc.qcountc.dataqsiz{
//qp:计算要发送到的位置的地址
qp:=chanbuf(c,c.sendx)
//将数据从ep拷贝到qp
typedmemmove(c.elemtype,qp,ep)
//待发送位置移动
c.sendx++
//由于是数组模拟队列,sendx到顶了需要归零
ifc.sendx==c.dataqsiz{
c.sendx=0
//缓冲区数量++
c.qcount++
unlock(c.lock)
returntrue
//往下就是缓冲区无数据,也没有等到接收协程的情况了
//如果是非阻塞模式,直接返回
if!block{
unlock(c.lock)
returnfalse
//将当前协程包装成sudog,阻塞到channel上
gp:=getg()
mysg:=acquireSudog()
mysg.releasetime=0
ift0!=0{
mysg.releasetime=-1
mysg.elem=ep
mysg.waitlink=nil
mysg.g=gp
mysg.isSelect=false
mysg.c=c
gp.waiting=mysg
gp.param=nil
//当前协程进入发送等待队列
c.sendq.enqueue(mysg)
atomic.Store8(gp.parkingOnChan,1)
gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanSend,traceEvGoBlockSend,2)
//被唤醒后从这里开始执行
KeepAlive(ep)
ifmysg!=gp.waiting{
throw("Gwaitinglistiscorrupted")
gp.waiting=nil
gp.activeStackChans=false
closed:=!mysg.success
gp.param=nil
ifmysg.releasetime0{
blockevent(mysg.releasetime-t0,2)
mysg.c=nil
releaseSudog(mysg)
//被唤醒后发现channel关闭了,panic
ifclosed{
ifc.closed==0{
throw("chansend:spuriouswakeup")
panic(plainError("sendonclosedchannel"))
returntrue
整体流程为:
如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回
从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:
该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待将要发送的数据拷贝到该协程的接收指针上,返回这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝
否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回
接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上
将协程阻塞到channel的等待队列时,将其包装成了sudog结构:
typesudogstruct{
//协程
g*g
//前一个,后一个指针
next*sudog
prev*sudog
//等到发送的数据在哪,等待从哪个位置接收数据
elemunsafe.Pointer
acquiretimeint64
releasetimeint64
ticketuint32
isSelectbool
successbool
parent*sudog//semaRootbinarytree
waitlink*sudog//g.waitinglistorsemaRoot
waittail*sudog//semaRoot
//在哪个channel上等待
c*hchan//channel
其目的是:
g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝
来看看一些子函数:
1.判断channel是否是满的
funcfull(c*hchan)bool{
//无缓冲
ifc.dataqsiz==0{
//并且没有其他协程在等待
returnc.recvq.first==nil
//有缓冲,但容量装满了
returnc.qcount==c.dataqsiz
2.send方法:
/**
c:要操作的channel
sg:弹出的接收者协程
ep:要发送的数据在的位置
funcsend(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){
//如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
ifsg.elem!=nil{
sendDirect(c.elemtype,sg,ep)
sg.elem=nil
gp:=sg.g
unlockf()
gp.param=unsafe.Pointer(sg)
sg.success=true
ifsg.releasetime!=0{
sg.releasetime=cputicks()
//唤醒该接收者协程
goready(gp,skip+1)
接收
从channel中接收数据有几种写法:
带不带ok接不接收返回值
根据带不带ok,决定用下面哪个方法
funcchanrecv1(c*hchan,elemunsafe.Pointer){
chanrecv(c,elem,true)
funcchanrecv2(c*hchan,elemunsafe.Pointer)(receivedbool){
_,received=chanrecv(c,elem,true)
return
根据接不接收返回值,决定elem是不是nil
最终都会调用chanrecv方法:
funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){
//如果channel为nil,根据参数中是否阻塞来决定是否阻塞
ifc==nil{
if!block{
return
gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2)
throw("unreachable")
//非阻塞,并且channel为空
if!blockempty(c){
//如果还没关闭,直接返回
ifatomic.Load(c.closed)==0{
return
//否则已经关闭,
//如果为空,返回该类型的零值
ifempty(c){
ifep!=nil{
typedmemclr(c.elemtype,ep)
returntrue,false
lock(c.lock)
//同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
ifc.closed!=0c.qcount==0{
unlock(c.lock)
ifep!=nil{
typedmemclr(c.elemtype,ep)
returntrue,false
//如果有发送者正在阻塞,说明:
//1.无缓冲
//2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
ifsg:=c.sendq.dequeue();sg!=nil{
//将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
recv(c,sg,ep,func(){unlock(c.lock)},3)
returntrue,true
//如果缓存区有数据,
ifc.qcount0{
//qp为缓冲区中下一次接收的位置
qp:=chanbuf(c,c.recvx)
//将数据从qp拷贝到ep
ifep!=nil{
typedmemmove(c.elemtype,ep,qp)
typedmemclr(c.elemtype,qp)
c.recvx++
ifc.recvx==c.dataqsiz{
c.recvx=0
c.qcount--
unlock(c.lock)
returntrue,true
//接下来就是既没有发送者在等待,也缓冲区也没数据
if!block{
unlock(c.lock)
returnfalse,false
//将当前协程包装成sudog,阻塞到channel中
gp:=getg()
mysg:=acquireSudog()
mysg.releasetime=0
ift0!=0{
mysg.releasetime=-1
//记录接收地址
mysg.elem=ep
mysg.waitlink=nil
gp.waiting=mysg
mysg.g=gp
mysg.isSelect=false
mysg.c=c
gp.param=nil
c.recvq.enqueue(mysg)
atomic.Store8(gp.parkingOnChan,1)
gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2)
//从这里唤醒
ifmysg!=gp.waiting{
throw("Gwaitinglistiscorrupted")
gp.waiting=nil
gp.activeStackChans=false
ifmysg.releasetime0{
blockevent(mysg.releasetime-t0,2)
success:=mysg.success
gp.param=nil
mysg.c=nil
releaseSudog(mysg)
returntrue,success
接收流程如为:
如果channel为nil,根据参数中是否阻塞来决定是否阻塞
如果channel已经关闭,且缓冲区没有元素,返回该类型零值
如果有发送者正在阻塞,说明:
要么是无缓冲有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者
如果缓存区有数据,则从缓冲区将数据复制到ep,返回
接下来就是既没有发送者在等待,也缓冲区也没数据的情况:
将当前协程包装成sudog,阻塞到channel中
来看其中的子函数recv():
/**
c:操作的channel
sg:阻塞的发送协程
ep:接收者接收数据的地址
funcrecv(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){
//如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
ifc.dataqsiz==0{
ifep!=nil{
recvDirect(c.elemtype,sg,ep)
//接下来是有缓冲,且缓冲区满的情况
}else{
//qp为channel缓冲区中,接收者下一次接收的地址
qp:=chanbuf(c,c.recvx)
//将数据从qp拷贝到ep
ifep!=nil{
typedmemmove(c.elemtype,ep,qp)
//将发送者的数据从sg.elem拷贝到qp
typedmemmove(c.elemtype,qp,sg.elem)
c.recvx++
ifc.recvx==c.dataqsiz{
c.recvx=0
//由于一接收已发送,缓冲区还是满的,因此c.sendx=c.recvx
c.sendx=c.recvx
sg.elem=nil
gp:=sg.g
unlockf()
gp.param=unsafe.Pointer(sg)
sg.success=true
ifsg.releasetime!=0{
sg.releasetime=cputicks()
//唤醒发送者
goready(gp,skip+1)
关闭
funcclosechan(c*hchan){
//不能关闭空channel
ifc==nil{
panic(plainError("closeofnilchannel"))
lock(c.lock)
//不能重复关闭
ifc.closed!=0{
unlock(c.lock)
panic(plainError("closeofclosedchannel"))
//修改关闭状态
c.close
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 房地产买卖中介合同
- 性格色彩分析理论及应用
- 中级经济师考试的创新意识培养与试题及答案
- 2025年市政工程考试知识点剖析试题及答案
- 建筑泥工劳务分包合同
- 农村生物技术应用研究开发合同
- 员工关系在公共关系中的角色试题及答案
- 掌握中级经济师考试复习的主动权与试题及答案
- 行政管理专科公共关系学全面试题及答案
- 维护技术基础考试试题及答案
- 浙江开放大学2024年《法律文化》形考作业1-4答案
- 政治审查表(模板)
- 中国赛车游戏行业市场发展现状及竞争格局与投资前景研究报告(2024-2030)
- T∕CACM 1107-2018 中医治未病实践指南 亚健康中医干预
- 数字贸易学 课件 第20、21章 数字丝绸之路与数字基础设施、数字自由贸易与数字贸易壁垒
- 地理毕业生实习报告5000字范本2篇
- 高级思辨英语视听说智慧树知到期末考试答案2024年
- 养生酒行业分析
- 仓储物流部门人才梯队建设推进方案
- (完整版)铝合金门窗施工合同范本
- 2024年福建南平市武夷旅游集团有限公司招聘笔试参考题库含答案解析
评论
0/150
提交评论