李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
GO
正文
go并发之channel
Leefs
2024-01-18 PM
2468℃
0条
[TOC] ### 一、channel的发送与接收特性 Go 语言中最常见的、也是经常被人提及的设计模式就是:`不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存`。 在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。 下面是多线程之间使用共享内存实现传递数据图示。 ![01.go并发之channel01.png](https://lilinchao.com/usr/uploads/2024/01/2322204636.png) 虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。 Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。 ![01.go并发之channel02.png](https://lilinchao.com/usr/uploads/2024/01/2802556021.png) 上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。 > 发送和接收的基本特性 + 对于同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的。 + 发送操作和接收操作中对元素值的处理都是不可分割的。 + 发送操作在完全完成之前会被阻塞。接收操作也是如此。 下面是对三个特性详细的分析和说明: > **通道的第一个基本特性**` 在同一时刻,Go语言的运行时系统只会执行对同一个通道的任意个接收操作中的某一个。直到这个元素值被完全复制进该通道之后,其他针对该通道的发送操作才可能被执行。 类似的,在同一时刻,运行时系统也只会执行,对同一个通道的任意个操作中的某一个。直到这个元素值完全被移出该通道之后,其他针对该通道的接收操作才可能被执行。即使这些操作是并发执行的也是如此。 这里所谓的并发执行,可以这样认为,多个代码块分别在不同的 goroutine 之中,并有机会在同一个时间段内被执行。 另外,对于通道中的同一个元素值来说,发送操作和接收操作之间也是互斥的。例如,虽然会出现,正在被复制进通道但还未复制完成的元素值,但是这时它绝不会被想接收它的一方看到和取走。 `这里要注意的一个细节是`,元素值从外界进入通道时会被复制。更具体地说,进入通道的并不是在接收操作符右边的那个元素值,而是它的副本。 另一方面,元素值从通道进入外界时会被移动。这个移动操作实际上包含了两步, - 第一步是生成正在通道中的这个元素值的副本,并准备给到接收方; - 第二步是删除在通道中的这个元素值; > **通道的第二个基本特性** 这里的“不可分割”的意思是,它们处理元素值时都是一气呵成的,绝不会被打断。 1. 例如,发送操作要么还没复制元素值,要么已经复制完毕,绝不会出现只复制了一部分的情况。 2. 例如,接收操作在准备好元素值的副本之后,一定会删除掉通道中的原值,绝不会出现通道中仍有残留的情况。 这既是为了保证通道中元素值的完整性,也是为了保证通道操作的唯一性。对于通道中的同一个元素值来说,它只可能是某一个发送操作放入的,同时也只可能被某一个接收操作取出。 > **通道的第三个基本特性** 一般情况下,发送操作包括了“复制元素值”和“放置副本到通道内部”这两个步骤。在这两个步骤完全完成之前,发起这个发送操作的那句代码会一直阻塞在那里。也就是说,在它之后的代码不会有执行的机会,直到这句代码的阻塞解除。 更细致地说,在通道完成发送操作之后,运行时系统会通知这句代码所在的 goroutine,以使它去争取继续运行代码的机会。另外,接收操作通常包含了“复制通道内的元素值”“放置副本到接收方”“删掉原值”三个步骤。 在所有这些步骤完全完成之前,发起该操作的代码也会一直阻塞,直到该代码所在的 goroutine 收到了运行时系统的通知并重新获得运行机会为止。说到这里,可能已经感觉到,如此阻塞代码其实就是为了实现操作的互斥和元素值的完整。 ### 二、应用实践 #### 2.1 并发编程基础操作 ```go import ( "fmt" "time" ) /** 并发编程示例 */ type Tasks struct { A int B int } var ch = make(chan Tasks,100) // 生产者 func producer(){ for i := 0; i <= 10; i++{ ch <- Tasks{i+10,i-2} // 向 chan 管道中放入数据 } } // 消费者 func consumer(){ for i := 0; i <= 10; i++ { task := <-ch // 写出数据 sum := task.A + task.B fmt.Println(task.A,"+",task.B,"=",sum) } } func main() { go producer() // 调用生产者 go consumer() // 调用消费者 time.Sleep(time.Second) // 睡眠一秒钟 } ``` #### 2.2 交叉打印数字和字母 ```go import ( "fmt" "sync" ) /** 数字和字母交叉打印 */ func CrossPrinting(){ var wg sync.WaitGroup letter := make(chan string,26) number := make(chan int,27) wg.Add(3) // 记录字母 go func(){ defer wg.Done() for i := 'A'; i <= 'Z' ;i++ { letter <- string(i) } close(letter) }() // 记录数字 go func(){ defer wg.Done() for i := 1; i <= 26 ; i++ { number <- i } close(number) }() // 打印数字和字母 go func(){ defer wg.Done() for i := range number{ fmt.Printf("%d%s\t",i,<-letter) } }() wg.Wait() fmt.Println("\n","") } func main() { CrossPrinting() } ``` #### 2.3 通过管道来合并文件 ```go import ( "bufio" "fmt" "io" "os" ) /** 通过管道来合并文件 借助管道来作为数据交换的媒介 */ var( content = make(chan string,1000) // 文件内容缓存管道 readFileCh = make(chan struct{},3) // 监控文件是否读取完毕 方便生产者和消费者之间进行协同 writeFileCh = make(chan struct{},0) // 监控主线程退出时间 ) // 读文件 // infile:需要读取的文件路径 func readFile(infile string){ fin,err := os.Open(infile) // 打开要读取的文件 if err != nil{ // 打开文件报错输出 fmt.Println(err) } // 退出时关闭文件 defer fin.Close() // 读取文件 // fin : 表示文件句柄 reader := bufio.NewReader(fin) // 通过一个死循环不断的去读 // 一个 for 循环结束代表一个文件读取完毕 for{ // 读取整行文件 line,err := reader.ReadString('\n') // 每次遇到换行符停止读取文件 if err == nil { // 如果最后一行有换行符 // 将读取的内容写入到管道中 content <- line } else{ if err == io.EOF { // 读取到文件末尾 if len(line) > 0 { // 输入文件的最后一行没有换行符 content <- (line + "\n") } break }else { // 读取文件报错 fmt.Println(err) } } } // 当一个文件读取完毕后则取走一个元素 <- readFileCh // 当管道变空之后,将 content 关闭掉 if len(readFileCh) == 0 { close(content) } } // 写文件(合并文件) // mergedFile 合并的文件地址 func writeFile(mergedFile string){ // 打开文件 fout,err := os.OpenFile(mergedFile,os.O_CREATE|os.O_TRUNC|os.O_RDWR,0666) if err != nil { fmt.Println(err) } defer fout.Close() // 方法完成后关闭文件 writer := bufio.NewWriter(fout) // 带缓冲的写入信息 // 从管道中读取内容写入到磁盘中 // for range 遍历并取走管道里的元素。当 content 管道为空且被close时,for循环才会退出 for line := range content{ writer.WriteString(line) // 写入文件内容 } writer.Flush() // 强制将内存中的数据写入磁盘 // 消费者结束后向管道中放入一个元素 writeFileCh <- struct{}{} } func main() { // 将管道填满 for i := 0; i < 3 ; i++ { readFileCh <- struct{}{} } go readFile("datas/1.txt") go readFile("datas/2.txt") go readFile("datas/3.txt") go writeFile("datas/big.txt") <- writeFileCh // 阻塞 只有当写入完毕数据向管道中插入一个数据后,此方法才能取出数据结束阻塞 } ``` ### 三、通道实践的几大坑 在使用 channel 进行 goroutine 之间的通信时,有时候场面会变得十分复杂,以至于写出难以觉察、难以定位的偶现 bug,而且上线的时候往往跑得好好的,直到某一天深夜收到服务挂了、OOM 了之类的告警…… > 来梳理一下使用 channel 中常见的三大坑:**panic**、**死锁**、**内存泄漏**,做到防患于未然。 #### 3.1 死锁 go 语言新手在编译时很容易碰到这个死锁的问题: ``` fatal error: all goroutines are asleep - deadlock! ``` go 语言中造成「死锁」的原因是两个 goroutine 互相等待,导致程序耗在那里,无法继续跑下去。 看了很多死锁的案例后,channel 导致的死锁可以归纳为以下几类案例(先讨论 unbuffered channel 的情况) > **(1)只有生产者,没有消费者,或者反过来** channel 的生产者和消费者必须成对出现,如果缺乏一个,就会造成死锁,例如: ```go // 只有生产者,没有消费者 func f1() { ch := make(chan int) ch <- 1 } // 只有消费者,没有生产者 func f2() { ch := make(chan int) <-ch } ``` > **(2)生产者和消费者出现在同一个 goroutine 中** 除了需要成对出现,还需要出现在不同的 goroutine 中,例如: ```go // 同一个 goroutine 中同时出现生产者和消费者 func f3() { ch := make(chan int) ch <- 1 // 由于消费者还没执行到,这里会一直阻塞住 <-ch } ``` 对于 buffered channel 则是下面这种情况 > **(3)buffered channel 已满,且在同一个goroutine中** buffered channel 会将收到的元素先存在 hchan 结构体的 ringbuffer 中,继而才会发生阻塞。而当发生阻塞时,如果阻塞了主 goroutine ,则也会出现死锁 所以实际使用中,推荐尽量使用 buffered channel ,使用起来会更安全,在下文的「内存泄漏」相关内容也会提及。 #### 3.2 内存泄漏 内存泄漏一般都是通过 OOM(Out of Memory) 告警或者发布过程中对内存的观察发现的,服务内存往往都是缓慢上升,直到被系统 OOM 掉清空内存再周而复始。 在 go 语言中,错误地使用 channel 会导致 goroutine 泄漏,进而导致内存泄漏。 > **(1)如何实现 goroutine 泄漏呢?** 让 goroutine 泄漏的核心就是:生产者/消费者所在的 goroutine 已经退出,而其对应的消费者/生产者 所在的 goroutine 会永远阻塞住,直到进程退出 > **(2)生产者阻塞导致泄漏** 一般会用 channel 来做一些超时控制,例如下面这个例子: ```go func leak1() { ch := make(chan int) // g1 go func() { time.Sleep(2 * time.Second) // 模拟 io 操作 ch <- 100 // 模拟返回结果 }() // g2 // 阻塞住,直到超时或返回 select { case <-time.After(500 * time.Millisecond): fmt.Println("timeout! exit...") case result := <-ch: fmt.Printf("result: %d\n", result) } } ``` **解析** 这里用 goroutine g1 来模拟 io 操作,用 goroutine g2 来模拟客户端的处理逻辑。 ①. 假设客户端超时为 500ms,而实际请求耗时为 2s,则 select 会走到 timeout 的逻辑,这时 g2 退出,channel ch 没有消费者,会一直在等待状态,输出如下: ``` Goroutine num: 1 timeout! exit... Goroutine num: 2 ``` 如果这是在 server 代码中,这个请求处理完后,g1 就会挂起、发生泄漏了,就等着 OOM 吧 。 ②. 假设客户端超时调整为 5000ms,实际请求耗时 2s,则 select 会进入获取 result 的分支,输出如下: ``` Goroutine num: 1 result: 100 Goroutine num: 1 ``` > **(3)消费者阻塞导致泄漏** 如果生产者不继续生产,消费者所在的 goroutine 也会阻塞住,不会退出,例如: ```go func leak2() { ch := make(chan int) // 消费者 g1 go func() { for result := range ch { fmt.Printf("result: %d\n", result) } }() // 生产者 g2 ch <- 1 ch <- 2 time.Sleep(time.Second) // 模拟耗时 fmt.Println("main goroutine g2 done...") } ``` 这种情况下,只需要增加 close(ch) 的操作即可,for-range 操作在收到 close 的信号后会退出、goroutine 不再阻塞,能够被回收。 > **(4)如何预防内存泄漏** 预防 goroutine 泄漏的核心就是:创建 goroutine 时就要想清楚它什么时候被回收。 具体到执行层面,包括: + 当 goroutine 退出时,需要考虑它使用的 channel 有没有可能阻塞对应的生产者、消费者的 goroutine; + 尽量使用 buffered channel使用 buffered channel 能减少阻塞发生、即使疏忽了一些极端情况,也能降低 goroutine 泄漏的概率; #### 3.3 panic panic 就更刺激了,一般是测试的时候没发现,上线之后偶现,程序挂掉,服务出现一个超时毛刺后触发告警。 channel 导致的 panic 一般是以下几个原因: > **(1)向已经 close 掉的 channel 继续发送数据** ```go func p1() { ch := make(chan int, 1) close(ch) ch <- 1 } // panic: send on closed channel ``` 在实际开发过程中,处理多个 goroutine 之间协作时,可能存在一个 goroutine 已经 close 掉 channel 了,另外一个不知道,也去 close 一下,就会 panic 掉,例如: ```go func p1() { ch := make(chan int, 1) done := make(chan struct{}, 1) go func() { <- time.After(2*time.Second) println("close2") close(ch) close(done) }() go func() { <- time.After(1*time.Second) println("close1") ch <- 1 close(ch) }() <-done } ``` 万恶之源就是在 go 语言里,是无法知道一个 channel 是否已经被 close 掉的,所以在尝试做 close 操作的时候,就应该做好会 panic 的准备…… > **(2)多次 close 同一个 channel** 同上,在尝试往 channel 里发送数据时,就应该考虑 - 这个 channel 已经关了吗? - 这个 channel 什么时候、在哪个 goroutine 里关呢? - 谁来关呢?还是干脆不关? #### 3.4 如何优雅地 close channel > **(1)需要检查 channel 是否关闭吗?** 刚遇到上面说的 panic 问题时,也试过去找一个内置的 closed 函数来检查关闭状态,结果发现,并没有这样一个函数…… 那么,如果有这样的函数,真能彻底解决 panic 的问题么?答案是不能。 因为 channel 是在一个并发的环境下去做收发操作,就算当前执行 closed(ch) 得到的结果是 false,还是不能直接去关,例如代码: ```go if !closed(ch) { // 返回 false // 在这中间出了幺蛾子! close(ch) // 还是 panic 了…… } ``` 遵循 less is more 的原则,这个 closed 函数是要不得了 > **(2)需要 close 吗?为什么?** 结论:除非必须关闭 chan,否则不要主动关闭。关闭 chan 最优雅的方式,就是不要关闭 chan~ 当一个 chan 没有 sender 和 receiver 时,即不再被使用时,GC 会在一段时间后标记、清理掉这个 chan。那么什么时候必须关闭 chan 呢? 比较常见的是将 close 作为一种通知机制,尤其是生产者与消费者之间是 1:M 的关系时,通过 close 告诉下游:我收工了,你们别读了。 > (3)谁来关? chan 关闭的原则: 1. Don’t close a channel from the receiver side 不要在消费者端关闭 chan 2. Don’t close a channel if the channel has multiple concurrent senders 有多个并发写的生产者时也别关 只要遵循这两条原则,就能避免两种 panic 的场景,即:向 closed chan 发送数据,或者是 close 一个 closed chan。 **按照生产者和消费者的关系可以拆解成以下几类情况:** 1. **一写一读**:生产者关闭即可 2. **一写多读**:生产者关闭即可,关闭时下游全部消费者都能收到通知 3. **多写一读**:多个生产者之间需要引入一个协调 channel 来处理信号 4. **多写多读**:与 3 类似,核心思路是引入一个中间层以及使用 try-send 的套路来处理非阻塞的写入. **代码示例** ```go import ( "log" "math/rand" "strconv" "sync" "time" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make(chan int) stopCh := make(chan struct{}) // stopCh 是额外引入的一个信号 channel. // 它的生产者是下面的 toStop channel, // 消费者是上面 dataCh 的生产者和消费者 toStop := make(chan string, 1) // toStop 是拿来关闭 stopCh 用的,由 dataCh 的生产者和消费者写入 // 由下面的匿名中介函数(moderator)消费 // 要注意,这个一定要是 buffered channel (否则没法用 try-send 来处理了) var stoppedBy string // moderator go func() { stoppedBy = <-toStop close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { // try-send 操作 // 如果 toStop 满了,就会走 default 分支啥也不干,也不会阻塞 select { case toStop <- "sender#" + id: default: } return } // try-receive 操作,尽快退出 // 如果没有这一步,下面的 select 操作可能造成 panic select { case <- stopCh: return default: } // 如果尝试从 stopCh 取数据的同时,也尝试向 dataCh // 写数据,则会命中 select 的伪随机逻辑,可能会写入数据 select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // receivers for i := 0; i < NumReceivers; i++ { go func(id string) { defer wgReceivers.Done() for { // 同上 select { case <- stopCh: return default: } // 尝试读数据 select { case <- stopCh: return case value := <-dataCh: if value == Max-1 { select { case toStop <- "receiver#" + id: default: } return } log.Println(value) } } }(strconv.Itoa(i)) } wgReceivers.Wait() log.Println("stopped by", stoppedBy) } ``` *附参考原文链接* https://blog.csdn.net/qq_41893274/article/details/130785190
标签:
Golang
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2858.html
上一篇
HBase过滤器介绍
下一篇
go并发之channel底层实现原理
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Typora
二叉树
Flink
pytorch
Hadoop
SQL练习题
SpringBoot
MyBatisX
队列
MyBatis
Git
高并发
机器学习
nginx
Spark RDD
Yarn
Java
Map
Sentinel
Azkaban
微服务
数据结构和算法
ajax
Elasticsearch
Livy
FileBeat
Spark SQL
BurpSuite
Jenkins
Hbase
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭