go|channel源码分析
文章目录
- channel
- hchan
- makechan
- chansend
- chanrecv
- complie
- closechan
channel
先看一下源码中的说明
At least one of c.sendq and c.recvq is empty, except for the case of an unbuffered channel with a single goroutine blocked on it for both sending and receiving using a select statement, in which case the length of c.sendq and c.recvq is limited only by the size of the select statement.
For buffered channels, also:
c.qcount > 0 implies that c.recvq is empty.
c.qcount < c.dataqsiz implies that c.sendq is empty.
c.sendq和c.recvq至少有一个是empty;有一种情况例外:对于一个无缓冲的chan,在select语句中使用chan收发数据会被single gouroutine阻塞,[演示示例如下],此时c.sendq和c.recv.q的长度由select的size的决定;对于有缓冲的chan:
1.c.qcount>0,表明c.recvq为empty
2.c.qcount<c.dataqsiz表明sendq为empty
- 对于"unbuffered chan",放在select语句中会被阻塞
- 放在一个goroutine中会产生"dead lock"
- 放在不同的协程中就不会产生dead lock
- 对于nil chan放在同一个协程中和不同的协程中都会被阻塞产生deadlock;放在select语句可以走default分支
- nil chan只是声明而没有定义,没有为其分配内存;empty chan为其分配内存但是无缓冲
hchan
接下来看一下chan结构体"hchan"的定义
type hchan struct {qcount uint // buf中的元素个数dataqsiz uint // buf的容量buf unsafe.Pointer // 指向存储数据的底层数组elemsize uint16 //元素大小closed uint32 //标识chan是否关闭elemtype *_type // element typesendx uint // send indexrecvx uint // receive indexrecvq waitq // list of recv waiterssendq waitq // list of send waiters//Lock保护hchan中的所有字段,以及在此通道上阻塞的sudogs中的几个字段。lock mutex
}
type waitq struct {first *sudog //sudog represents a g in a wait list, //such as for sending/receiving on a channel.last *sudog
}
//只列出了部分字段
type sudog struct {g *gnext *sudogprev *sudog............c *hchan // channel
}
makechan
利用"make(chan int,1)“创建并初始化一个chan,会调用"makechan”
func makechan(t *chantype, size int) *hchan {//获取chan中存储元素的相关信息elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}//elem.size表示元素的大小,size表示元素的个数mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}var c *hchanswitch {case mem == 0://元素大小为0或者元素个数为0// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0://元素不包含指针类型的数据,一次性分配hchan和buf// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default://元素含有指针,需要先为hchan分配内存,然后再为buf分配内存,这样做是为了方便gc回收// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
总结一下分配的方式:
- 对于无缓冲的chan [size==0]或者chan中存储的元素大小为0[elemsize==0]只分配hchan结构体大小的内存
- 不含有指针类型的数据,一次性分配hchan+bufsize大小的内存
- 含有指针类型的数据,先为hchan结构体分配内存,然后再为buf分配内存
chansend
接下来看一下,向chan中发送数据的流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {if !block {return false}//当前的协程因向nilchan发送send而被挂起阻塞gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}/*fast path:在不获取锁的情况下,检查失败的非阻塞操作full(c)为true的情况:1.无缓冲但是没有等待接收的reciver2.有缓冲但是缓冲通道满了*/if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//上锁lock(&c.lock)if c.closed != 0 {//chan已经被关闭解锁,并报panicunlock(&c.lock)panic(plainError("send on closed channel"))}//c.recvq.dequeue获取recvq的第一个sudogif sg := c.recvq.dequeue(); sg != nil {//如果有正在等待的reciver,可以直接将数据拷贝给该//reciver;绕过chan的缓冲区;拷贝完之后释放锁;//send详解见下面send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//有发送数据的位置if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.//chanbuf返回buf中第sendx个 slotqp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}//将ep拷贝到qptypedmemmove(c.elemtype, qp, ep)//sendx指向下一个发送数据的位置,将其加1c.sendx++//循环数组if c.sendx == c.dataqsiz {c.sendx = 0}//chan中元素个数加1c.qcount++//解锁unlock(&c.lock)return true}if !block {//没有正在等待接收数据的reciver也没有可用的空位置存储数据,在非阻塞模式下直接解锁+返回unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us./*getg returns the pointer to the current g. The compiler rewrites calls to this function into instructions that fetch the g directly (from TLS or from the dedicated register)返回当前正在运行的goroutine*/gp := getg()//新创建一个sudogmysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}//初始化mysgmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil//将mysg加入到sendqc.sendq.enqueue(mysg)gp.parkingOnChan.Store(true)//将当前协程挂起,挂起原因是send chan// Puts the current goroutine into a waiting state and calls unlockf on the// system stack.gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)//确保qp对象在reciver接受它之前是有效的KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = false//chan 被关闭closed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
/*
full reports 向chan发送数据是否会被阻塞[通道满的情况下会被阻塞]
*/
func full(c *hchan) bool {//c.dataqsiz是一个不可变的字段,创建之后不会被改变。//因此对它的读是并发安全的if c.dataqsiz == 0 {//无缓冲的情况,此时要看是否有等待读取的协程return c.recvq.first == nil}// 通道已满return c.qcount == c.dataqsiz
}
//获取waitq中的一个sudog
func (q *waitq) dequeue() *sudog {for {//获取第一个sudogsgp := q.firstif sgp == nil {return nil}y := sgp.next//waitq中只有一个sudogif y == nil {q.first = nilq.last = nil} else {//ulink and modify first sudog y.prev = nilq.first = ysgp.next = nil // mark as removed (see dequeueSudoG)}/*if a goroutine was put on this queue because of a select, there is a small window between the goroutine being woken up by a different case and it grabbing the channel locks. Once it has the lock it removes itself from the queue, so we won't see it after that. We use a flag in the G struct to tell us when someone else has won the race to signal this goroutine but the goroutine hasn't removed itself from the queue yet.如果一个goroutine因为select操作被阻塞而被放入各个case中的waitq[recvq\sendq]中,在这个goroutine被不同的case唤醒到获取chan lock之间有一小段间隙;一旦这个goroutine获取到lock,就会从waitq中移除在g结构体中有一个字段[selectDone]用来通知goroutine1有其它goroutine2已经拿到了这个锁但是goroutine1还没有从waitq中移除。*//*isSelect indicates g is participating in a select, so g.selectDone must be CAS'd to win the wake-up race*///如果没有成功就会直接退出,进入下一个循环if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {continue}/*这里说一下个人的理解:select 的第一个case对于一个chan1 的recvq,第一个被阻塞的g1是因为等待接收数据而被阻塞的,g1并非当前执行select的协程;select 的第二个case向chan2发送数据,刚开始这两个case都被阻塞,于是这个goroutine被加入到chan1的recvq和chan2的sendq,第二轮循环中,向chan2写数据成功,goroutine被唤醒,然后当前goroutine的selectDone字段被标识为1,与此同时从chan1接收数据成功,但此时会通过selectDone标识判断出当前的g已经被其它case唤醒,于是会继续寻找下一个sudog[当前的goroutine已经从waitq中移除] */return sgp}
}
//向waitq中加入一个sudog
func (q *waitq) enqueue(sgp *sudog) {sgp.next = nilx := q.lastif x == nil {sgp.prev = nilq.first = sgpq.last = sgpreturn}sgp.prev = xx.next = sgpq.last = sgp
}
/*
send在一个empty channel上执行发送操作;发送方发送的ep值被拷贝到接收方的sg中。然后reciver被唤醒;
channel必须是empty或者被锁定。ep必须是非空的
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.g//解锁unlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}//Mark gp ready to run//标记状态由_Gwaiting到_Grunnable/*先尝试放入本地p的runnext字段,然后是本地runq如果这两个地方都满了,会将其放入全局runq*/goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// No need for cgo write barrier checks because dst is always// Go memory.//将src的数据拷贝到dstmemmove(dst, src, t.size)
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
总结一下发送数据的流程
-
向nil chan发送数据阻塞模式下会被挂起,非阻塞模式下返回false
-
首先从recvq中获取是否有正在等待接收数据的goroutine,有的话直接将发送者的数据拷贝到与goroutine绑定的sudog中
-
没有正在等待接收数据的reciver,但是缓冲区未满将发送者的数据发送到存储数据的循环数组中,并将元素个数加1
-
既没有正在等待接收数据的reciver也没有可用的缓冲位置,生成新的sudog结构与当前goroutine绑定加入到sendq的末尾,通过gopark将其挂起等待reciver从chan中读取数据
-
对nil通道close会产生panic
在真正进行发送数据的流程之前会进行加锁操作,在第一种情况发生之后会释放锁,否则会持续到第二种或者第三种情况;糟糕的情况是,锁一直持续到第三种情况才释放,加大了临界区的范围延长了锁持有的时间,降低了并发性能。
-
通道已经被关闭,向其发送数据会产生panic
-
通道close之后再次被close会被panic
chanrecv
/*
chanrecv从chan中接收数据并将数据拷贝到ep中
如果ep是nil,接收到的数据会被忽略
如果block是false并且chan中没有数据会返回false
如果chan被关闭,不影响数据的读取,读取到的数据是类型对应的零值*/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}//从nil chan中获取数据;非阻塞模式下返回false,false//阻塞模式下被挂起if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}//非阻塞模式下//empty(c)返回true的条件//无缓冲通道并且没有正在等待发送数据的sender//有缓冲通道但是c.qcount==0没有数据可以读取if !block && empty(c) {//通道没有被关闭if atomic.Load(&c.closed) == 0 {return}//通道被关闭//empty(c)返回true的条件//无缓冲通道并且没有正在等待发送数据的sender//有缓冲通道但是c.qcount==0没有数据可以读取if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}//清空ep指向的内存if ep != nil {typedmemclr(c.elemtype, ep)}//返回true是因为chan be closed//通道被关闭会通知所有监听该chan的goroutinereturn true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//上锁lock(&c.lock)//chan关闭之后不能写入数据但是可以读取数据if c.closed != 0 {//缓冲区中没有数据if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}//没有数据可以读解锁unlock(&c.lock)//清空ep指向的memoryif ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {//走到这个分支表明缓冲区中有数据并且//发现有阻塞等待发送数据的sender,发生的条件有//无缓冲通道;有缓冲通道但是缓冲区满if sg := c.sendq.dequeue(); sg != nil {//从阻塞等待发送数据的sender中获取数据recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}//走到这个分支表明通道被关闭并且缓冲区有数据或者//通道没有被关闭并且有数据可以读取if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}//将qp中的数据拷贝到epif ep != nil {typedmemmove(c.elemtype, ep, qp)}//清楚qptypedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}//元素个数减少c.qcount--//解锁unlock(&c.lock)return true, true}//没有数据可以读非阻塞直接返回if !block {//解锁unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()//创建新的sudogmysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysg//与当前goroutine绑定mysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil//加入到recvqc.recvq.enqueue(mysg)gp.parkingOnChan.Store(true)//将当前的goroutine挂起gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}
func empty(c *hchan) bool {// c.dataqsiz is immutable.if c.dataqsiz == 0 {return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil}return atomic.Loaduint(&c.qcount) == 0
}
/*
recv在一个full chan[buf is full]上执行接收数据的操作
对于无缓冲通道直接从sender中拷贝数据
对于有缓冲通道
1.将the head of queue的数据拷贝到reciver中
2.将sender的数据发送到缓冲区
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//无缓冲通道if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}//直接从sender中拷贝数据if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.//获取接受数数据的slotqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}//空出缓冲区,将sender中的数据发送到缓冲区// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}//抽取一个元素加入一个元素,buf依旧是满的c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.g//释放锁unlockf()gp.param = unsafe.Pointer(sg)//因chan close被唤醒标记为false;反之标记为truesg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}//将gp由_Gwaiting变为_Grunnable//尝试加入到本地p的runnext,//本地p的runq以及全局的runqgoready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {src := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}
总结一下读取数据的流程
-
对于nil chan,读取数据会被gopark挂起
-
chan be closed并且没有数据可读,返回
-
chan be closed并且有正在等待发送数据的sender
1.对于无缓冲,直接从sender那里拷贝数据;
2.对于有缓冲,先从queue的头部获取数据
然后将sender的数据发送到缓冲区
将sender的状态由_Gwaiting变为_Grunnable计入到本地p的runnext等待被调度 -
通道被关闭有数据可读或者没有被关闭但是有数据可读,直接从queue中读取数据,元素个数减少
-
通道没有被关闭没有数据可以读。创建新的sudog与接收数据的g绑定加入到recvq的末尾,通过gopark挂起等待其它goroutine向chan中发送数据
complie
在select中使用"ch<-i"或者"<-ch"会被编译成if-else语句
// compiler implementsselect {case c <- v:... foodefault:... bar}// asif selectnbsend(c, v) {... foo} else {... bar}
// compiler implementsselect {case v, ok = <-c:... foodefault:... bar}// asif selected, ok = selectnbrecv(&v, c); selected {... foo} else {... bar}
closechan
func closechan(c *hchan) {//关闭nil chan会报错if c == nil {panic(plainError("close of nil channel"))}//上锁lock(&c.lock)//chan 已经被关闭过if c.closed != 0 {//释放锁,报panicunlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}//将close字段标识为1c.closed = 1//var glist gList// release all readers//通知所有的reader/reciverfor {//从recvq中获取等待读取的sudogsg := c.recvq.dequeue()if sg == nil {//遍历完成break}if sg.elem != nil {//清空sg.elem指向的内存typedmemclr(c.elemtype, sg.elem)//将指针悬空,防止成为野指针sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}//获取与sudog绑定的ggp := sg.ggp.param = unsafe.Pointer(sg)//因为关闭被唤醒设置为falsesg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}//将gp加入到glist中glist.push(gp)}// release all writers (they will panic)//释放所有的sender,并报panic//因为向closed chan发送数据会产生panicfor {//获取sendq中的sendersg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}//获取sudog中的ggp := sg.ggp.param = unsafe.Pointer(sg)//因为close而被唤醒,设置为falsesg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}//将其加入到glist中glist.push(gp)}//唤醒所有的sender和reciver才可以解锁unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0//将gp由_Gwaiting变为_Grunnable//尝试加入到本地p的runnext,//本地p的runq以及全局的runqgoready(gp, 3)}
}
- 通道为nil,报panic: close of nil channel
- 通道已经关闭(通过closed标识判断),报panic: close of closed channel
- 唤醒recvq中的所有recver,并将唤醒的g加入到glist
- 唤醒sendq中的所有sender并报panic: send on closed channel。并将唤醒的g加入到glist