news 2025/12/29 18:30:27

go.dev博客阅读-pipelines

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
go.dev博客阅读-pipelines

这篇文章 2014年3月13日发表,作者 Sameer Ajmani

通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等

一些开源类库也沿用了其思想,例如MapReduces、并行处理等

这篇博客要以MapReduces或者生产消费模型的思想去阅读

博客开头的示例

一个比较基础的管道使用

将一组整数通过管道依次平方,最终输出结果

// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81

过程中的一些说明

  1. gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回
  2. 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
  3. sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次
  4. gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)

这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程

并行处理

官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现

依旧是求平方的案例

// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81

说明:

  1. c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
  2. merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
  3. ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞

带取消功能的 chan

并行处理的代码改进,在每个方法中都引入done

funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}
  1. 在每个方法中,都加入了done,内部使用select来监听是否关闭,并return 释放协程
  2. 如果chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏

但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放

如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力

// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()

额外注意的点

在多任务消费读取生产数据时

funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}

这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快

gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定

🧠🧠🧠🧠

对官方这篇博客,我的理解是

  1. 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
  2. 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
  3. 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
  4. 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
  5. 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close

原文出处 https://go.dev/blog/pipelines

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/23 1:10:50

Go Mod vs 传统依赖管理:效率提升300%

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个对比测试项目&#xff0c;分别用GOPATH和go mod方式管理相同的依赖集。要求&#xff1a;1) 统计初始化时间 2) 记录构建时间 3) 分析依赖解析效率。使用AI自动生成测试脚本…

作者头像 李华
网站建设 2025/12/26 14:57:20

YUM707新手入门指南:从零开始学AI编程

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 使用YUM707生成一个简单的Python程序&#xff0c;实现‘Hello World’功能&#xff0c;并扩展为交互式问候程序&#xff0c;能够根据用户输入的名字输出个性化问候语。请提供详细的…

作者头像 李华
网站建设 2025/12/28 13:31:43

HslControls:工业级UI控件库的终极指南

HslControls&#xff1a;工业级UI控件库的终极指南 【免费下载链接】HslControlsDemo HslControls控件库的使用demo&#xff0c;HslControls是一个工业物联网的控件库&#xff0c;基于C#开发&#xff0c;配套HslCommunication组件可以实现工业上位机软件的快速开发&#xff0c;…

作者头像 李华
网站建设 2025/12/27 20:32:21

零基础学MoviePy:用Python做第一个视频剪辑

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个面向初学者的MoviePy教学项目&#xff0c;包含&#xff1a;1.分步安装指南 2.基础代码示例&#xff08;加载视频、简单剪辑&#xff09;3.常见错误解决方法 4.一个完整的示…

作者头像 李华
网站建设 2025/12/24 11:58:03

3分钟学会用手机实时调试Android应用:LogcatViewer完整使用指南

3分钟学会用手机实时调试Android应用&#xff1a;LogcatViewer完整使用指南 【免费下载链接】LogcatViewer Android Logcat Viewer 项目地址: https://gitcode.com/gh_mirrors/lo/LogcatViewer 还在为Android应用调试需要连接电脑而烦恼吗&#xff1f;LogcatViewer这款革…

作者头像 李华