李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
GO
正文
go并发之channel底层实现原理
Leefs
2024-01-19 PM
838℃
0条
[TOC] ### 前言 在开始本章节之前还是要反复唠叨一句话: Go channel设计模式是:`不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存`。 如果不理解的可以看之前的文章,本章节不再过多赘述。 ### 一、核心数据结构 #### 1.1 hchan Go语言channel是first-class的,意味着它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。 作为Go语言的核心特征之一,虽然channel看上去很高端,但是其实channel仅仅就是一个数据结构而已,结构体定义如下: ```go type hchan struct { qcount uint // 循环数组中的元素数量,长度 dataqsiz uint // 循环数组的大小,容量 // channel分为无缓冲和有缓冲channel两种 // 有缓冲的channel使用ring buffer(环形缓冲区)来缓存写入的数据,本质是循环数组 // 为什么是循环数组?普通数组容量固定、更适合指定的空间,且弹出元素时,元素需要全部前移 buf unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区) elemsize uint16 // 循环队列中元素的大小 closed uint32 // 是否关闭的标志,0:未关闭,1:已关闭 elemtype *_type // 循环队列中元素的类型 // 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和当前写的下标位置 sendx uint // 已发送元素在循环队列中的位置 recvx uint // 已接收元素在循环队列中的位置 // 尝试读/写channel时被阻塞的goroutine recvq waitq // 等待接收的goroutine的等待队列 sendq waitq // 等待发送的goroutine的等待队列 // 互斥锁,保证读写channel时的并发安全问题 lock mutex // 控制chan并发访问的互斥锁 } ``` **说明** 过阅读 channel 的数据结构,可以发现 channel 是使用环形队列作为 channel 的缓冲区: + datasize 环形队列的长度是在创建 channel 时指定的; + sendx 和 recvx 两个字段分别表示环形队列的队尾和队首,sendx 表示数据写入的位置,recvx 表示数据读取的位置; + 字段 recvq 和 sendq 分别表示等待接收的协程队列和等待发送的协程队列 当 channel 缓冲区为空或无缓冲区时,当前协程会被阻塞,分别加入到 recvq 和 sendq 协程队列中,等待其它协程操作 channel 时被唤醒; 其中,读阻塞的协程被写协程唤醒,写阻塞的协程被读协程唤醒。 + 字段 elemtype 和 elemsize 表示 channel 中元素的类型和大小 一个 channel 只能传递一种类型的值,如果需要传递任意类型的数据,可以使用 interface{} 类型。 + 字段 lock 是保证同一时间只有一个协程读写 channel。 **Go Channel底层数据结构示意图**: ![02.go并发之channel底层实现原理01.png](https://lilinchao.com/usr/uploads/2024/01/3296709798.png) #### 1.2 阻塞协程队列waitq与sudog结构体 在 hchan 中我们可以看到 recvq 与 sendq 都是 waitq 类型,这代表协程等待队列。这个队列维护阻塞在一个channel上的所有协程。 first和last是指向sudog结构体类型的指针,表示队列的头和尾。 waitq里面连接的是一个sudog**双向链表**,保存的是等待的goroutine。 队列中的sudog也是一个结构体,代表一个协程 `sync.Mutex` 等待队列中的节点,包含了协程和数据的信息。 waitq 与 sudog 结构体包含以下字段,每个字段的含义已注释: ```go type waitq struct { //阻塞的协程队列 first *sudog //队列头部 last *sudog //队列尾部 } type sudog struct { //sudog:包装协程的节点 g *g //goroutine,协程; next *sudog //队列中的下一个节点; prev *sudog //队列中的前一个节点; elem unsafe.Pointer //读取/写入 channel 的数据的容器; isSelect bool //标识当前协程是否处在 select 多路复用的流程中; c *hchan //标识与当前 sudog 交互的 chan. } ``` ![02.go并发之channel底层实现原理02.png](https://lilinchao.com/usr/uploads/2024/01/2680888956.png) ### 二、channel的创建 创建 channel 实际上就是在内存中实例化出一个 `hchan` 的结构体,并返回一个指向该结构体的指针,所以channel是引用类型。我们在使用 channel 时,在函数之间的传递的即为此指针。 #### 2.1 创建流程 创建 channel 时主要分为两大块:**边界检查**和**分配内存**。 **分配内存的流程如下**: - 如果是无缓冲channel,直接给 hchan 结构体分配内存并返回指针。 - 如果是有缓冲channel,但元素不包含指针类型,则一次性为 hchan 结构体和底层循环数组分配连续内存并返回指针。(需要连续内存空间) - 如果是有缓冲channel,且元素包含指针类型,则分别分配hchan结构体内存和底层循环数组的内存并返回指针。(可以利用内存碎片) ![02.go并发之channel底层实现原理03.jpg](https://lilinchao.com/usr/uploads/2024/01/2491772730.jpg) - 元素大小不能超过 65536 字节,也即 64K; - 元素的对齐大小不能超过 `maxAlign` 也即 8 字节; - 计算出来的所需内存不能超过限制; #### 2.2 源码解析 创建channel的主要实现是在 `makechan()` 函数中: ```go func makechan(t *chantype, size int) *hchan { // 获取元素类型 elem := t.elem // compiler checks this but be safe. // 元素的大小必须小于64K // 编译器已经检查了这一点,但是为了安全起见再次进行检查 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 检查对齐是否正确,涉及操作系统的知识 // TODO if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 计算所需要的内存大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 检查是否溢出,所需内存是否超过maxAlloc - hchanSize 或者 size是否小于0 if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // 创建hchan指针 var c *hchan switch { case mem == 0: // Queue or element size is zero. // 队列或元素大小为0:只分配hchan的内存 c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不包含指针 // 一次性为hchan和buf分配连续的内存 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // 元素包含指针:分别分配hchan结构体和底层循环数组的内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 设置hchan中的elemsize、elemtype、dataqsiz、lock等属性 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) // 如果启用了debugChan,进行调试 if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } //返回hchan指针 return c } ``` + 判断申请内存空间大小是否越界,mem 大小为 element 类型大小与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致大小为 0; - 根据类型,初始 channel,分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel; - 倘若为无缓冲型,则仅申请一个大小为默认值 96 的空间; - 如若有缓冲的 struct 型,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置; - 倘若为有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续; - 对 channel 的其余字段进行初始化,包括元素类型大小、元素类型、容量以及锁的初始化. ### 三、发送数据 #### 3.1 数据发送流程 向channel中发送数据主要分为两大块:**边界检查**和**数据发送** 数据发送流程主要如下所示: + 如果channel的读等待队列中存在接收者goroutine,则为**同步发送**: + 无缓冲channel,不用经过channel直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。 + 有缓冲channel,但是元素个数为0,不用经过channel(假装经过channel)直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。 + 如果channel的读等待队列中不存在接收者goroutine: + 如果底层循环数组未满,那么把发送者携带的数据入队队尾,此为**异步发送** + 如果底层循环数组已满或者是无缓冲channel,那么将当前goroutine加入写等待队列,并将其挂起,等待被唤醒,此为**阻塞发送** ![02.go并发之channel底层实现原理04.jpg](https://lilinchao.com/usr/uploads/2024/01/897953987.jpg) #### 3.2 同步发送(写时存在阻塞读协程) ##### 3.2.1 流程图 ![02.go并发之channel底层实现原理05.jpg](https://lilinchao.com/usr/uploads/2024/01/921383918.jpg) ##### 3.2.2 源码 ```go // 加锁 lock(&c.lock) ...... // 从接收者队列recvq中取出一个接收者,接收者不为空的情况下,直接将数据传递给该接收者 if sg := c.recvq.dequeue(); sg != nil { // c: channel // sg: 从recvq中取出来的接收者 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ``` **说明** - 加锁; - 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog; - 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine; - 在 send 方法中会完成解锁动作. **同步发送源码** ```go func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 无缓存通道和有缓存通道的处理逻辑 if raceenabled { // 无缓冲通道的处理逻辑 if c.dataqsiz == 0 { racesync(c, sg) } else { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) // 假装经过channel // 相当于循环列表的rear指针向前进1 c.recvx++ // 队列数组中最后一个元素已经读取,则再次从头开始读取数据 if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // 将ep直接复制到接收者sg中 if sg.elem != nil { // 复制数据到sg中 sendDirect(c.elemtype, sg, ep) sg.elem = nil } // 接收者对应的goroutine gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 使接收者goroutine变成runnable状态,唤醒goroutine goready(gp, skip+1) } // 将src值复制到dst中 func typedmemmove(typ *_type, dst, src unsafe.Pointer) { if dst == src { return } ... memmove(dst, src, typ.size) ... } ``` 1. 调用sendDirect()函数将数据拷贝到接受变量的内存地址上 2. 调用goready()将等待接收的阻塞goroutine的状态改变成Grunnable。下一轮调度时会唤醒这个接收的goroutine #### 3.3 异步发送(写时无阻塞读协程但环形缓冲区仍有空间) ##### 3.3.1 流程图 ![02.go并发之channel底层实现原理07.jpg](https://lilinchao.com/usr/uploads/2024/01/1818115244.jpg) ##### 3.3.2 源码 ```go // 缓冲队列中的元素个数小于队列的大小 // 说明缓冲队列中还有空间 if c.qcount < c.dataqsiz { // qp指向循环数组中未使用的位置. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } // 将发送的数据写入到qp指向的循环数组中的位置 typedmemmove(c.elemtype, qp, ep) // 更新sendx c.sendx++ //当循环队列中最后一个元素已经使用,此时循环队列将再次从0开始 if c.sendx == c.dataqsiz { c.sendx = 0 } // 队列中元素计数+1 c.qcount++ // 释放锁 unlock(&c.lock) return true } ``` **说明** + 如果qcount还没有满,则调用 `chanbuf()` 获取sendx索引的元素指针值。 + 调用 `typedmemmove()` 方法将发送的值拷贝到缓冲区buf中。 + 拷贝完成,需要维护sendx索引下标值和qcount个数。 + 这里将buf缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。 #### 3.4 阻塞发送(写时无阻塞读协程且环形缓冲区无空间) ##### 3.4.1 流程图 ![02.go并发之channel底层实现原理09.jpg](https://lilinchao.com/usr/uploads/2024/01/991704397.jpg) ##### 3.4.2 源码 ```go // 获取当前的goroutine,用于绑定给一个sudog gp := getg() // 获取一个sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 设置sudog发送的数据 mysg.elem = ep mysg.waitlink = nil // 设置sudog绑定的goroutine mysg.g = gp mysg.isSelect = false // 设置sudog绑定的channel mysg.c = c gp.waiting = mysg gp.param = nil // 将发送者入队sendq c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) // 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel。 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 最后,KeepAlive()确保发送的值保持活跃状态,直到接收者将其复制出来 KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true ``` **说明** - 加锁; - 构造封装当前 goroutine 的 sudog 对象; - 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系; - 把 sudog 添加到当前 channel 的阻塞写协程队列中; - park 当前协程; - 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走); - 解锁,返回 #### 3.5 chansend()源码 发送操作在编译时转换为 `chansend` 函数: > `chansend` 接收 4 个参数: > > + `c` 是一个指向 `hchan` 类型的指针,表示要接收数据的通道; > + `ep` 是一个 `unsafe.Pointer` 类型的指针,用于接收接收到的数据; > + `block` 表示接收操作的模式。如果 `block` 为 `true`,为阻塞模式,即发送操作将会阻塞,直到有接收者接收元素;如果 `block` 为 `false`,为非阻塞模式,即发送操作不会阻塞,如果通道已满,发送操作会立即返回; > + `callerpc`:发送操作的调用者的程序计数器值。 ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. However, nothing here // guarantees forward progress. We rely on the side effects of lock release in // chanrecv() and closechan() to update this thread's view of c.closed and full(). if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true } ``` ### 四、接收数据 #### 4.1 接收数据流程 接收数据的流程主要由两大块组成:**边界检查**和**接收数据**,其中接收数据的处理逻辑如下。 + 如果 channel 的写等待队列存在发送者 goroutine,此为**同步接收** + 如果是无缓冲 channel,**直接**从第一个发送者 goroutine 那里把数据拷贝给接收变量,**唤醒发送的 goroutine**; + 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者 goroutine 的数据拷贝到 buf 循环数组队尾,**唤醒发送的 goroutine**; + 如果 channel 的写等待队列不存在发送者 goroutine: + 如果循环数组 buf 非空,将循环数组 buf 的队首元素拷贝给接收变量,此为**异步接收** + 如果循环数组 buf 为空,将当前 goroutine 加入读等待队列,并**挂起等待唤醒**,此为**阻塞接收** **接收数据总流程图** ![02.go并发之channel底层实现原理10.jpg](https://lilinchao.com/usr/uploads/2024/01/769135211.jpg) #### 4.2 同步接收(读时有阻塞的写协程) ##### 4.2.1 流程图 ![02.go并发之channel底层实现原理11.jpg](https://lilinchao.com/usr/uploads/2024/01/1072085879.jpg) ##### 4.2.2 源码 ```go // 如果发送队列中存在发送者,返回(true,true) lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { // 找到一个等待的发送者,如果缓冲区大小为0,则直接接收发送者的值 // 否则,从队列头部接收,并将发送者的值添加到队列的尾部 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ``` **同步接收源码1** ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 创建的channel是无缓存buf if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } // 直接将数据从发送者复制过去,即直接接收发送者的数据 if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 返回buf中待被接收的数据的指针 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 复制队列中的数据给接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者的数据加入到buf中 typedmemmove(c.elemtype, qp, sg.elem) // 更新recvx c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 更新sendx c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g // 释放锁 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送者,下一轮参与调度 goready(gp, skip+1) } ``` **同步接收源码2** ```go //返回buffer中下标为i的指针 func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) } ``` **说明** - 加锁; - 从阻塞写协程队列中获取到一个写协程; - 倘若 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程; - 倘若 channel 有缓冲区,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写写成; - 解锁,返回。 #### 4.3 异步接收(读时无阻塞写协程且缓冲区有元素) ##### 4.3.1 流程图 ![02.go并发之channel底层实现原理12.jpg](https://lilinchao.com/usr/uploads/2024/01/3530528617.jpg) ##### 4.3.2 源码 ```go if c.qcount > 0 { // 待接收数据的指针 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } // 直接从队列中接收数据 if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 更新recvx c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 更新qcount c.qcount-- // 释放锁 unlock(&c.lock) // 返回(true,true) return true, true } ``` **说明** - 加锁; - 获取到 recvx 对应位置的元素; - recvx++ - qcount-- - 解锁,返回 #### 4.4 阻塞接收(读时无阻塞写协程且缓冲区无元素) ##### 4.4.1 流程图 ![02.go并发之channel底层实现原理13.jpg](https://lilinchao.com/usr/uploads/2024/01/2154452785.jpg) ##### 4.4.2 源码 ```go // 获取当前的goroutine,用于绑定给一个sudog gp := getg() // 返回一个sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // sudog绑定接收的数据 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg // sudog绑定当前的goroutine mysg.g = gp mysg.isSelect = false // sudog绑定当前的channel mysg.c = c gp.param = nil // 将接收者入队recvq中 c.recvq.enqueue(mysg) gp.parkingOnChan.Store(true) // 挂起当前的goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) // 进行接收 return true, success ``` **说明** - 加锁; - 构造封装当前 goroutine 的 sudog 对象; - 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系; - 把 sudog 添加到当前 channel 的阻塞读协程队列中; - park 当前协程; - 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入); - 解锁,返回 #### 4.5 chanrecv()源码 接收操作在编译时转换为 `chanrecv` 函数。 `chanrecv` 的参数跟 `chansend` 几乎一致,返回值有 2 个,分别是 `selected`,`received`。`selected` 表示是否执行了接收操作,`received` 表示是否成功收到了数据。 - 如果 `selected = false`:表示没有进行接收操作; - 如果 `selected = true`:表示进行了接收操作: + 如果 `received = false`:表示虽然接收操作成功,但没有接收到实际的数据; + 如果 `received = true`:表示接收操作成功,并且接收到实际的数据。 ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // The channel has been closed, but the channel's buffer have data. } else { // Just found waiting sender with not closed. if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success } ``` ### 五、阻塞与非阻塞模式 在上述源码分析流程中,均是以阻塞模式为主线进行讲述,忽略非阻塞模式的有关处理逻辑。 此处阐明两个问题: - 非阻塞模式下,流程逻辑有何区别? - 何时会进入非阻塞模式? #### 5.1 非阻塞模式逻辑区别 非阻塞模式下,读/写 channel 方法通过一个 bool 型的响应参数,用以标识是否读取/写入成功。 - 所有需要使得当前 goroutine 被挂起的操作,在非阻塞模式下都会返回 false; - 所有是的当前 goroutine 会进入死锁的操作,在非阻塞模式下都会返回 false; - 所有能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true。 #### 5.2 何时进入非阻塞模式 默认情况下,读/写 channel 都是阻塞模式,只有在 select 语句组成的多路复用分支中,与 channel 的交互会变成非阻塞模式: ```go ch := make(chan int) select{ case <- ch: default: } ``` #### 5.3 代码 ```go func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { return chanrecv(c, elem, false) } ``` 在 select 语句包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 方法,底层同样复用 chanrecv 和 chansend 方法,但此时由于第三个入参 block 被设置为 false,导致后续会走进非阻塞的处理分支。 ### 六、两种读 channel 的协议 读取 channel 时,可以根据第二个 bool 型的返回值用以判断当前 channel 是否已处于关闭状态: ```go ch := make(chan int, 2) got1 := <- ch got2,ok := <- ch ``` 实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的方法: ```go func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } ``` ### 七、关闭 #### 7.1 流程图 关闭channel的总体流程如下 - 边界检查 - 从recvq释放所有的readers - 从sendq释放所有的writers(会产生panic) - 唤醒所有的readers和writers ![02.go并发之channel底层实现原理14.jpg](https://lilinchao.com/usr/uploads/2024/01/2860061092.jpg) #### 7.2 源码 ```go func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) ``` **说明** - 关闭未初始化过的 channel 会 panic; - 加锁; - 重复关闭 channel 会 panic; - 将阻塞读协程队列中的协程节点统一添加到 glist; - 将阻塞写协程队列中的协程节点统一添加到 glist; - 唤醒 glist 当中的所有协程. *附参考文章链接* https://mp.weixin.qq.com/s/QgNndPgN1kqxWh-ijSofkw https://blog.csdn.net/weixin_50850749/article/details/132133730
标签:
Golang
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2873.html
上一篇
go并发之channel
下一篇
golang之context介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Quartz
设计模式
DataWarehouse
机器学习
Redis
查找
Jenkins
Tomcat
Typora
GET和POST
BurpSuite
gorm
容器深入研究
Java工具类
Beego
人工智能
哈希表
Spark Streaming
DataX
Livy
Linux
散列
数据结构
SpringCloud
Scala
SpringCloudAlibaba
SpringBoot
MyBatis
Flume
JavaWeb
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞