李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
GO
正文
go并发之channel底层实现原理
Leefs
2024-01-19 PM
2716℃
0条
[TOC] ### 前言 在开始本章节之前还是要反复唠叨一句话: Go channel设计模式是:`不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存`。 如果不理解的可以看之前的文章,本章节不再过多赘述。 ### 一、核心数据结构 #### 1.1 hchan Go语言channel是first-class的,意味着它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。 作为Go语言的核心特征之一,虽然channel看上去很高端,但是其实channel仅仅就是一个数据结构而已,结构体定义如下: ```go type hchan struct { qcount uint // 循环数组中的元素数量,长度 dataqsiz uint // 循环数组的大小,容量 // channel分为无缓冲和有缓冲channel两种 // 有缓冲的channel使用ring buffer(环形缓冲区)来缓存写入的数据,本质是循环数组 // 为什么是循环数组?普通数组容量固定、更适合指定的空间,且弹出元素时,元素需要全部前移 buf unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区) elemsize uint16 // 循环队列中元素的大小 closed uint32 // 是否关闭的标志,0:未关闭,1:已关闭 elemtype *_type // 循环队列中元素的类型 // 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和当前写的下标位置 sendx uint // 已发送元素在循环队列中的位置 recvx uint // 已接收元素在循环队列中的位置 // 尝试读/写channel时被阻塞的goroutine recvq waitq // 等待接收的goroutine的等待队列 sendq waitq // 等待发送的goroutine的等待队列 // 互斥锁,保证读写channel时的并发安全问题 lock mutex // 控制chan并发访问的互斥锁 } ``` **说明** 过阅读 channel 的数据结构,可以发现 channel 是使用环形队列作为 channel 的缓冲区: + datasize 环形队列的长度是在创建 channel 时指定的; + sendx 和 recvx 两个字段分别表示环形队列的队尾和队首,sendx 表示数据写入的位置,recvx 表示数据读取的位置; + 字段 recvq 和 sendq 分别表示等待接收的协程队列和等待发送的协程队列 当 channel 缓冲区为空或无缓冲区时,当前协程会被阻塞,分别加入到 recvq 和 sendq 协程队列中,等待其它协程操作 channel 时被唤醒; 其中,读阻塞的协程被写协程唤醒,写阻塞的协程被读协程唤醒。 + 字段 elemtype 和 elemsize 表示 channel 中元素的类型和大小 一个 channel 只能传递一种类型的值,如果需要传递任意类型的数据,可以使用 interface{} 类型。 + 字段 lock 是保证同一时间只有一个协程读写 channel。 **Go Channel底层数据结构示意图**: ![02.go并发之channel底层实现原理01.png](https://lilinchao.com/usr/uploads/2024/01/3296709798.png) #### 1.2 阻塞协程队列waitq与sudog结构体 在 hchan 中我们可以看到 recvq 与 sendq 都是 waitq 类型,这代表协程等待队列。这个队列维护阻塞在一个channel上的所有协程。 first和last是指向sudog结构体类型的指针,表示队列的头和尾。 waitq里面连接的是一个sudog**双向链表**,保存的是等待的goroutine。 队列中的sudog也是一个结构体,代表一个协程 `sync.Mutex` 等待队列中的节点,包含了协程和数据的信息。 waitq 与 sudog 结构体包含以下字段,每个字段的含义已注释: ```go type waitq struct { //阻塞的协程队列 first *sudog //队列头部 last *sudog //队列尾部 } type sudog struct { //sudog:包装协程的节点 g *g //goroutine,协程; next *sudog //队列中的下一个节点; prev *sudog //队列中的前一个节点; elem unsafe.Pointer //读取/写入 channel 的数据的容器; isSelect bool //标识当前协程是否处在 select 多路复用的流程中; c *hchan //标识与当前 sudog 交互的 chan. } ``` ![02.go并发之channel底层实现原理02.png](https://lilinchao.com/usr/uploads/2024/01/2680888956.png) ### 二、channel的创建 创建 channel 实际上就是在内存中实例化出一个 `hchan` 的结构体,并返回一个指向该结构体的指针,所以channel是引用类型。我们在使用 channel 时,在函数之间的传递的即为此指针。 #### 2.1 创建流程 创建 channel 时主要分为两大块:**边界检查**和**分配内存**。 **分配内存的流程如下**: - 如果是无缓冲channel,直接给 hchan 结构体分配内存并返回指针。 - 如果是有缓冲channel,但元素不包含指针类型,则一次性为 hchan 结构体和底层循环数组分配连续内存并返回指针。(需要连续内存空间) - 如果是有缓冲channel,且元素包含指针类型,则分别分配hchan结构体内存和底层循环数组的内存并返回指针。(可以利用内存碎片) ![02.go并发之channel底层实现原理03.jpg](https://lilinchao.com/usr/uploads/2024/01/2491772730.jpg) - 元素大小不能超过 65536 字节,也即 64K; - 元素的对齐大小不能超过 `maxAlign` 也即 8 字节; - 计算出来的所需内存不能超过限制; #### 2.2 源码解析 创建channel的主要实现是在 `makechan()` 函数中: ```go func makechan(t *chantype, size int) *hchan { // 获取元素类型 elem := t.elem // compiler checks this but be safe. // 元素的大小必须小于64K // 编译器已经检查了这一点,但是为了安全起见再次进行检查 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 检查对齐是否正确,涉及操作系统的知识 // TODO if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 计算所需要的内存大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 检查是否溢出,所需内存是否超过maxAlloc - hchanSize 或者 size是否小于0 if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // 创建hchan指针 var c *hchan switch { case mem == 0: // Queue or element size is zero. // 队列或元素大小为0:只分配hchan的内存 c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不包含指针 // 一次性为hchan和buf分配连续的内存 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // 元素包含指针:分别分配hchan结构体和底层循环数组的内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 设置hchan中的elemsize、elemtype、dataqsiz、lock等属性 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) // 如果启用了debugChan,进行调试 if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } //返回hchan指针 return c } ``` + 判断申请内存空间大小是否越界,mem 大小为 element 类型大小与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致大小为 0; - 根据类型,初始 channel,分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel; - 倘若为无缓冲型,则仅申请一个大小为默认值 96 的空间; - 如若有缓冲的 struct 型,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置; - 倘若为有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续; - 对 channel 的其余字段进行初始化,包括元素类型大小、元素类型、容量以及锁的初始化. ### 三、发送数据 #### 3.1 数据发送流程 向channel中发送数据主要分为两大块:**边界检查**和**数据发送** 数据发送流程主要如下所示: + 如果channel的读等待队列中存在接收者goroutine,则为**同步发送**: + 无缓冲channel,不用经过channel直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。 + 有缓冲channel,但是元素个数为0,不用经过channel(假装经过channel)直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。 + 如果channel的读等待队列中不存在接收者goroutine: + 如果底层循环数组未满,那么把发送者携带的数据入队队尾,此为**异步发送** + 如果底层循环数组已满或者是无缓冲channel,那么将当前goroutine加入写等待队列,并将其挂起,等待被唤醒,此为**阻塞发送** ![02.go并发之channel底层实现原理04.jpg](https://lilinchao.com/usr/uploads/2024/01/897953987.jpg) #### 3.2 同步发送(写时存在阻塞读协程) ##### 3.2.1 流程图 ![02.go并发之channel底层实现原理05.jpg](https://lilinchao.com/usr/uploads/2024/01/921383918.jpg) ##### 3.2.2 源码 ```go // 加锁 lock(&c.lock) ...... // 从接收者队列recvq中取出一个接收者,接收者不为空的情况下,直接将数据传递给该接收者 if sg := c.recvq.dequeue(); sg != nil { // c: channel // sg: 从recvq中取出来的接收者 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ``` **说明** - 加锁; - 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog; - 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine; - 在 send 方法中会完成解锁动作. **同步发送源码** ```go func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 无缓存通道和有缓存通道的处理逻辑 if raceenabled { // 无缓冲通道的处理逻辑 if c.dataqsiz == 0 { racesync(c, sg) } else { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) // 假装经过channel // 相当于循环列表的rear指针向前进1 c.recvx++ // 队列数组中最后一个元素已经读取,则再次从头开始读取数据 if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // 将ep直接复制到接收者sg中 if sg.elem != nil { // 复制数据到sg中 sendDirect(c.elemtype, sg, ep) sg.elem = nil } // 接收者对应的goroutine gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 使接收者goroutine变成runnable状态,唤醒goroutine goready(gp, skip+1) } // 将src值复制到dst中 func typedmemmove(typ *_type, dst, src unsafe.Pointer) { if dst == src { return } ... memmove(dst, src, typ.size) ... } ``` 1. 调用sendDirect()函数将数据拷贝到接受变量的内存地址上 2. 调用goready()将等待接收的阻塞goroutine的状态改变成Grunnable。下一轮调度时会唤醒这个接收的goroutine #### 3.3 异步发送(写时无阻塞读协程但环形缓冲区仍有空间) ##### 3.3.1 流程图 ![02.go并发之channel底层实现原理07.jpg](https://lilinchao.com/usr/uploads/2024/01/1818115244.jpg) ##### 3.3.2 源码 ```go // 缓冲队列中的元素个数小于队列的大小 // 说明缓冲队列中还有空间 if c.qcount < c.dataqsiz { // qp指向循环数组中未使用的位置. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } // 将发送的数据写入到qp指向的循环数组中的位置 typedmemmove(c.elemtype, qp, ep) // 更新sendx c.sendx++ //当循环队列中最后一个元素已经使用,此时循环队列将再次从0开始 if c.sendx == c.dataqsiz { c.sendx = 0 } // 队列中元素计数+1 c.qcount++ // 释放锁 unlock(&c.lock) return true } ``` **说明** + 如果qcount还没有满,则调用 `chanbuf()` 获取sendx索引的元素指针值。 + 调用 `typedmemmove()` 方法将发送的值拷贝到缓冲区buf中。 + 拷贝完成,需要维护sendx索引下标值和qcount个数。 + 这里将buf缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。 #### 3.4 阻塞发送(写时无阻塞读协程且环形缓冲区无空间) ##### 3.4.1 流程图 ![02.go并发之channel底层实现原理09.jpg](https://lilinchao.com/usr/uploads/2024/01/991704397.jpg) ##### 3.4.2 源码 ```go // 获取当前的goroutine,用于绑定给一个sudog gp := getg() // 获取一个sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 设置sudog发送的数据 mysg.elem = ep mysg.waitlink = nil // 设置sudog绑定的goroutine mysg.g = gp mysg.isSelect = false // 设置sudog绑定的channel mysg.c = c gp.waiting = mysg gp.param = nil // 将发送者入队sendq c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) // 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel。 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 最后,KeepAlive()确保发送的值保持活跃状态,直到接收者将其复制出来 KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true ``` **说明** - 加锁; - 构造封装当前 goroutine 的 sudog 对象; - 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系; - 把 sudog 添加到当前 channel 的阻塞写协程队列中; - park 当前协程; - 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走); - 解锁,返回 #### 3.5 chansend()源码 发送操作在编译时转换为 `chansend` 函数: > `chansend` 接收 4 个参数: > > + `c` 是一个指向 `hchan` 类型的指针,表示要接收数据的通道; > + `ep` 是一个 `unsafe.Pointer` 类型的指针,用于接收接收到的数据; > + `block` 表示接收操作的模式。如果 `block` 为 `true`,为阻塞模式,即发送操作将会阻塞,直到有接收者接收元素;如果 `block` 为 `false`,为非阻塞模式,即发送操作不会阻塞,如果通道已满,发送操作会立即返回; > + `callerpc`:发送操作的调用者的程序计数器值。 ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. However, nothing here // guarantees forward progress. We rely on the side effects of lock release in // chanrecv() and closechan() to update this thread's view of c.closed and full(). if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true } ``` ### 四、接收数据 #### 4.1 接收数据流程 接收数据的流程主要由两大块组成:**边界检查**和**接收数据**,其中接收数据的处理逻辑如下。 + 如果 channel 的写等待队列存在发送者 goroutine,此为**同步接收** + 如果是无缓冲 channel,**直接**从第一个发送者 goroutine 那里把数据拷贝给接收变量,**唤醒发送的 goroutine**; + 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者 goroutine 的数据拷贝到 buf 循环数组队尾,**唤醒发送的 goroutine**; + 如果 channel 的写等待队列不存在发送者 goroutine: + 如果循环数组 buf 非空,将循环数组 buf 的队首元素拷贝给接收变量,此为**异步接收** + 如果循环数组 buf 为空,将当前 goroutine 加入读等待队列,并**挂起等待唤醒**,此为**阻塞接收** **接收数据总流程图** ![02.go并发之channel底层实现原理10.jpg](https://lilinchao.com/usr/uploads/2024/01/769135211.jpg) #### 4.2 同步接收(读时有阻塞的写协程) ##### 4.2.1 流程图 ![02.go并发之channel底层实现原理11.jpg](https://lilinchao.com/usr/uploads/2024/01/1072085879.jpg) ##### 4.2.2 源码 ```go // 如果发送队列中存在发送者,返回(true,true) lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { // 找到一个等待的发送者,如果缓冲区大小为0,则直接接收发送者的值 // 否则,从队列头部接收,并将发送者的值添加到队列的尾部 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ``` **同步接收源码1** ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 创建的channel是无缓存buf if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } // 直接将数据从发送者复制过去,即直接接收发送者的数据 if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 返回buf中待被接收的数据的指针 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 复制队列中的数据给接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者的数据加入到buf中 typedmemmove(c.elemtype, qp, sg.elem) // 更新recvx c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 更新sendx c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g // 释放锁 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送者,下一轮参与调度 goready(gp, skip+1) } ``` **同步接收源码2** ```go //返回buffer中下标为i的指针 func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) } ``` **说明** - 加锁; - 从阻塞写协程队列中获取到一个写协程; - 倘若 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程; - 倘若 channel 有缓冲区,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写写成; - 解锁,返回。 #### 4.3 异步接收(读时无阻塞写协程且缓冲区有元素) ##### 4.3.1 流程图 ![02.go并发之channel底层实现原理12.jpg](https://lilinchao.com/usr/uploads/2024/01/3530528617.jpg) ##### 4.3.2 源码 ```go if c.qcount > 0 { // 待接收数据的指针 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } // 直接从队列中接收数据 if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 更新recvx c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 更新qcount c.qcount-- // 释放锁 unlock(&c.lock) // 返回(true,true) return true, true } ``` **说明** - 加锁; - 获取到 recvx 对应位置的元素; - recvx++ - qcount-- - 解锁,返回 #### 4.4 阻塞接收(读时无阻塞写协程且缓冲区无元素) ##### 4.4.1 流程图 ![02.go并发之channel底层实现原理13.jpg](https://lilinchao.com/usr/uploads/2024/01/2154452785.jpg) ##### 4.4.2 源码 ```go // 获取当前的goroutine,用于绑定给一个sudog gp := getg() // 返回一个sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // sudog绑定接收的数据 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg // sudog绑定当前的goroutine mysg.g = gp mysg.isSelect = false // sudog绑定当前的channel mysg.c = c gp.param = nil // 将接收者入队recvq中 c.recvq.enqueue(mysg) gp.parkingOnChan.Store(true) // 挂起当前的goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) // 进行接收 return true, success ``` **说明** - 加锁; - 构造封装当前 goroutine 的 sudog 对象; - 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系; - 把 sudog 添加到当前 channel 的阻塞读协程队列中; - park 当前协程; - 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入); - 解锁,返回 #### 4.5 chanrecv()源码 接收操作在编译时转换为 `chanrecv` 函数。 `chanrecv` 的参数跟 `chansend` 几乎一致,返回值有 2 个,分别是 `selected`,`received`。`selected` 表示是否执行了接收操作,`received` 表示是否成功收到了数据。 - 如果 `selected = false`:表示没有进行接收操作; - 如果 `selected = true`:表示进行了接收操作: + 如果 `received = false`:表示虽然接收操作成功,但没有接收到实际的数据; + 如果 `received = true`:表示接收操作成功,并且接收到实际的数据。 ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // The channel has been closed, but the channel's buffer have data. } else { // Just found waiting sender with not closed. if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success } ``` ### 五、阻塞与非阻塞模式 在上述源码分析流程中,均是以阻塞模式为主线进行讲述,忽略非阻塞模式的有关处理逻辑。 此处阐明两个问题: - 非阻塞模式下,流程逻辑有何区别? - 何时会进入非阻塞模式? #### 5.1 非阻塞模式逻辑区别 非阻塞模式下,读/写 channel 方法通过一个 bool 型的响应参数,用以标识是否读取/写入成功。 - 所有需要使得当前 goroutine 被挂起的操作,在非阻塞模式下都会返回 false; - 所有是的当前 goroutine 会进入死锁的操作,在非阻塞模式下都会返回 false; - 所有能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true。 #### 5.2 何时进入非阻塞模式 默认情况下,读/写 channel 都是阻塞模式,只有在 select 语句组成的多路复用分支中,与 channel 的交互会变成非阻塞模式: ```go ch := make(chan int) select{ case <- ch: default: } ``` #### 5.3 代码 ```go func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { return chanrecv(c, elem, false) } ``` 在 select 语句包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 方法,底层同样复用 chanrecv 和 chansend 方法,但此时由于第三个入参 block 被设置为 false,导致后续会走进非阻塞的处理分支。 ### 六、两种读 channel 的协议 读取 channel 时,可以根据第二个 bool 型的返回值用以判断当前 channel 是否已处于关闭状态: ```go ch := make(chan int, 2) got1 := <- ch got2,ok := <- ch ``` 实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的方法: ```go func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } ``` ### 七、关闭 #### 7.1 流程图 关闭channel的总体流程如下 - 边界检查 - 从recvq释放所有的readers - 从sendq释放所有的writers(会产生panic) - 唤醒所有的readers和writers ![02.go并发之channel底层实现原理14.jpg](https://lilinchao.com/usr/uploads/2024/01/2860061092.jpg) #### 7.2 源码 ```go func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) ``` **说明** - 关闭未初始化过的 channel 会 panic; - 加锁; - 重复关闭 channel 会 panic; - 将阻塞读协程队列中的协程节点统一添加到 glist; - 将阻塞写协程队列中的协程节点统一添加到 glist; - 唤醒 glist 当中的所有协程. *附参考文章链接* https://mp.weixin.qq.com/s/QgNndPgN1kqxWh-ijSofkw https://blog.csdn.net/weixin_50850749/article/details/132133730
标签:
Golang
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2873.html
上一篇
go并发之channel
下一篇
golang之context介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
国产数据库改造
Python
Kafka
Redis
并发线程
微服务
SQL练习题
递归
JVM
Spring
Jquery
SpringBoot
ajax
JavaScript
前端
算法
Netty
栈
数据结构
Spark Core
NIO
DataX
Java编程思想
Thymeleaf
CentOS
GET和POST
ClickHouse
二叉树
Hbase
正则表达式
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭