这篇文章 2014年3月13日发表,作者 Sameer Ajmani
通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等
一些开源类库也沿用了其思想,例如MapReduces、并行处理等
这篇博客要以MapReduces或者生产消费模型的思想去阅读
博客开头的示例
一个比较基础的管道使用
将一组整数通过管道依次平方,最终输出结果
// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}输出
/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81过程中的一些说明
gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回- 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)
这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程
并行处理
官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现
依旧是求平方的案例
// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}输出
/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81说明:
- c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
- merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
- ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞
带取消功能的 chan
对并行处理的代码改进,在每个方法中都引入done
funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}- 在每个方法中,都加入了
done,内部使用select来监听是否关闭,并return 释放协程 - 如果
chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏
但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放
如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力
// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()额外注意的点
在多任务消费读取生产数据时
funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快
gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定
🧠🧠🧠🧠
对官方这篇博客,我的理解是
- 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
- 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
- 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
- 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
- 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close
原文出处 https://go.dev/blog/pipelines