当前位置: 首页 > news >正文

go资深之路笔记(三) sync.WaitGroup, sync.errgroup和 sync.go-multierror

一、sync.WaitGroup

作用:协程结束后子协程会被立刻销毁,sync.WaitGroup 可以让协程等待子协程执行完,再执行下一步
常见场景:并行处理,初始化资源,多协程结束
主要接口:

var wg sync.WaitGroup
wg.Add(3)	// 增加三个等待计数
wg.Done() //减少一个等待计数
wg.Wait() // 阻塞,直到等待计数== 0

多协程结束代码:

func wg22() {var wg sync.WaitGroupwg.Add(3) // 增加三个等待计数ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(404*time.Millisecond))go func() error {defer wg.Done()	// 减少等待计数,最好用defer,不然中途panic就出问题了// ps:扩展下,如果这里不加defer的时候panic了,不会执行wg.Done,分情况 recover的时候会死锁。否则panic会导致所有协程结束time.Sleep(100 * time.Millisecond)fmt.Println("hello world")return nil}()go func() error {fmt.Println("hello world22")time.Sleep(200 * time.Millisecond)wg.Done()return fmt.Errorf("err")}()go func() error {fmt.Println("hello world33")time.Sleep(300 * time.Millisecond)select {case <-ctx.Done():	// 超时后触发fmt.Println("调用成功")default:fmt.Println("调用失败")return fmt.Errorf("err2")}wg.Done()return nil}()wg.Wait()	// 等待三个子协程返回
}

注意事项:
1.Add 必须在 Goroutine 外部调用,不然可能会出现wgWait()直接返回的情况
2. defer wg.Done() 可以防止panic导致没有执行,引发错误

wg.Add(1)	//步骤1
go func() error {defer wg.Done()	// 步骤2
}
  1. WaitGroup 不能被复制:
    和 sync.Mutex 一样,sync.WaitGroup 是值类型,但不应被复制。在函数间传递时,必须使用指针。
func process(wg *sync.WaitGroup) { // 传指针defer wg.Done()
}

4.复用与清零:
WaitGroup 在计数器归零后可以被再次使用。但是你必须确保它已经wait返回后,再执行wait

二、sync.errgroup

原理:在 sync.WaitGroup 的基础上,增加了错误传播和上下文(Context)取消功能。如果任一子任务返回错误,它可以自动取消所有其他正在执行的任务(通过 Context)。
除此之外还有控制并发数量的功能(通过SetLimit)
代码示例:

func TestErrGroup() {g, ctx := errgroup.WithContext(context.Background())urls := []string{"url1", "url2", "bad-url"} for _, url := range urls {url := url g.Go(func() error {fmt.Printf("Fetching %s...\n", url)if url == "bad-ur1l" {	//模拟一个错误 返回errorreturn errors.New("failed to fetch: bad-url")} select {case <-time.After(100 * time.Millisecond):	// 模拟工作fmt.Printf("Success: %s\n", url)return nilcase <-ctx.Done():	// 子协程报错后,errgroup 会向这个通道发送消息,从而结束子协程fmt.Printf("Cancelled fetching %s due to error elsewhere\n", url)return ctx.Err() // 返回取消原因,或者直接返回nil也可}})}// Wait 会阻塞,直到所有 goroutine 都完成。// 它会返回第一个非空的错误(如果有的话),如果所有都成功则返回nil。// 只要有一个错误,就会立刻返回,并给ctx发送消息,结束其他子线程if err := g.Wait(); err != nil {fmt.Println("Overall error:", err)} else {fmt.Println("All successes!")}
}

源码解析1: 遇到error的取消原理

// 这个函数会生成一个 errgroup 和一个可取消的上下文
func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancelCause(ctx)return &Group{cancel: cancel}, ctx
}func (g *Group) Wait() error {g.wg.Wait()if g.cancel != nil {	// 任务结束之后,这里会在检查一遍,并再次取消g.cancel(g.err)}return g.err
}func (g *Group) Go(f func() error) {...g.wg.Add(1)go func() {defer g.done()if err := f(); err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel(g.err)	// 执行该函数,取消上下文}})}}()
}
// 然后就是业务代码调用
select {case <-ctx.Done():			// 通道接受消息,结束阻塞...

对于 取消上下文不太了解的,可以看我之前写的 go资深之路笔记(一) Context第3点

源码解析2: 限制并发协程数
errgroup 的 SetLimit 函数,可以限制其同时并发数量,原理其实就是用有缓存的chan,来控制协程发送给通道数据得数量,从而控制并发数量

func (g *Group) SetLimit(n int) {if n < 0 {g.sem = nilreturn}if len(g.sem) != 0 {panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))}g.sem = make(chan token, n)		// 有缓存的chan
}func (g *Group) done() {if g.sem != nil {<-g.sem		// 释放消息}g.wg.Done()
}func (g *Group) Go(f func() error) {if g.sem != nil {g.sem <- token{}	//// 写入消息,如果已经满了,等待。从而实现并发数量控制}
}

三、 multierror

作用:errgroup只能返回一个错误,这种适用于快速返回错误的情况。但有时候可能需要收集全部错误,这个时候 multierror就派上用场了
代码示例:

func TestMultierror() {var wg sync.WaitGroupvar mu sync.Mutex // 保护merr的并发安全var merr *multierror.Errortasks := []string{"task1", "task2", "task3_error", "task4_error"}for _, task := range tasks {wg.Add(1)go func(t string) {defer wg.Done()err := doWork(t)if err != nil {mu.Lock()merr = multierror.Append(merr, err)	// 增加errormu.Unlock()}}(task)}wg.Wait() // 等待所有任务完成if merr != nil {merr.ErrorFormat = func(errors []error) string {// 可以自定义错误输出的格式return fmt.Sprintf("All errors: %v", errors)}fmt.Println(merr.Error())// 输出: All errors: [error in task3_error error in task4_error]}
}func doWork(task string) error {if task == "task3_error" || task == "task4_error" {return errors.New("error in " + task)}return nil
}

总结:

这个没啥好讲的,只是封装了一个 []error 和一个格式化函数而已。当工具用就行。

http://www.dtcms.com/a/391567.html

相关文章:

  • Docker 与数据库环境
  • Node.js 模块系统详解
  • proxy代理应用记录
  • 基于python大数据的汽车数据分析系统设计与实现
  • WebSocket实现原理
  • 从保存到加载Docker镜像文件操作全图解
  • IDEA文件修改后改变文件名和文件夹颜色
  • 【MySQL 】MySQL 入门之旅 · 第十篇:子查询与嵌套查询
  • TM52F1376 SSOP24电子元器件 HITENX海速芯 8位微控制器MCU 芯片 深度解析
  • 基于Matlab图像处理的工件表面缺陷检测系统
  • 业务上云实践MYSQL架构改造
  • 深入解析TCP/IP协议分层与通信原理
  • 【人工智能通识专栏】第二十讲:科创项目选题
  • 数据治理系列(三):SQL2API 平台格局与发展趋势
  • 软考-系统架构设计师 软件项目管理详细讲解
  • three.js添加CSS2DRenderer对象
  • 磁共振成像原理(理论)9:射频回波 (RF Echoes)-三脉冲回波(2)
  • 栈的主要知识
  • question:使用同一请求数据且渲染顺序不确定时复用
  • Redis群集三种模式介绍和创建
  • 【LeetCode 每日一题】1935. 可以输入的最大单词数
  • eeprom和flash的区别
  • [vibe code追踪] 分支图可视化 | SVG画布 | D3.js
  • [硬件电路-264]:数字电路的电源系统的主要特性包括哪些
  • 算法题(212):01背包(空间优化)
  • TP4054和TP4056对比
  • AD5165(超低功耗逻辑电平数字电位器)芯片的详细用法
  • 38、多模态模型基础实现:视觉与语言的智能融合
  • 租赁合同管理系统如何使用?功能深度解析
  • 构建高质量RAG知识库,文档解析破解AI应用的数据质量难题