当前位置: 首页 > news >正文

GO中常见并发模式总结

本文对Go语言中常见并发模式的详细总结,包括适用场景、代码示例和优缺点分析,帮助你在不同业务场景中做出合适的选择。

1. 任务分批处理(Batch Processing)

模式描述
将大任务拆分为多个独立批次,每个批次由一个协程处理,适合任务可静态分割且批间无依赖的场景。

适用场景

  • 数据批量导入/导出(如CSV文件分批处理)
  • 分页数据并行查询(如多页API数据拉取)
  • 离线计算任务(如按日期范围分批统计)

代码示例

func batchProcessing(items []int, batchSize int) {var wg sync.WaitGroupbatches := chunk(items, batchSize) // 自定义分批次函数for _, batch := range batches {wg.Add(1)go func(batch []int) {defer wg.Done()for _, item := range batch {process(item) // 处理单个任务}}(batch)}wg.Wait()
}

优点

  • 实现简单,无需复杂同步机制
  • 批大小可控,避免内存溢出

缺点

  • 批处理粒度固定,难以动态调整
  • 若单批任务耗时不均,可能导致整体效率下降

2. 生产者-消费者模式(Producer-Consumer)

模式描述
通过通道连接生产者和消费者,生产者负责生成数据,消费者负责处理数据,实现解耦和并发控制。

适用场景

  • IO密集型任务(如文件读取与处理分离)
  • 任务队列处理(如消息队列消费)
  • 数据管道(如爬虫抓取与解析分离)

代码示例

func producerConsumer(items []int, numWorkers int) {itemCh := make(chan int)resultCh := make(chan string)var wg sync.WaitGroup// 生产者go func() {defer close(itemCh)for _, item := range items {itemCh <- item}}()// 消费者wg.Add(numWorkers)for i := 0; i < numWorkers; i++ {go func() {defer wg.Done()for item := range itemCh {result := process(item)resultCh <- result}}()}// 关闭结果通道go func() {wg.Wait()close(resultCh)}()// 处理结果for result := range resultCh {fmt.Println(result)}
}

优点

  • 生产与消费解耦,可独立扩展
  • 支持动态调整消费者数量
  • 利用通道缓冲平衡速率差异

缺点

  • 需管理多个通道和等待组
  • 错误处理需在通道中传递

3. 协程池(Goroutine Pool)

模式描述
预先创建固定数量的协程,通过任务队列分配工作,避免频繁创建/销毁协程的开销。

适用场景

  • CPU密集型任务(限制并发数以避免CPU过载)
  • 资源受限环境(如数据库连接数限制)
  • 高频短任务(如HTTP请求处理)

代码示例

func goroutinePool(items []int, poolSize int) {pool := workerpool.New(poolSize) // 使用第三方协程池库for _, item := range items {item := item // 避免闭包捕获循环变量pool.Submit(func() {process(item)})}pool.StopWait() // 等待所有任务完成
}

如果不愿意引入第三方库,可以简单通过channel进行携程数量限制:

var limit = make(chan int, 3)func main() {for _, w := range work {go func() {limit <- 1w()<-limit}()}select{}
}

优点

  • 严格控制并发上限,防止资源耗尽
  • 减少协程创建开销,提升性能
  • 适合长时间运行的服务

缺点

  • 需引入第三方库(如go-workerpool、ants pool
  • 任务处理顺序不确定
  • 池大小需根据实际场景调优

4. Stream流水线模式(Pipeline)

模式描述
将任务分解为多个阶段,每个阶段通过通道连接,可并行处理不同阶段。

适用场景

  • 复杂数据处理流程(如ETL管道)
  • 多步骤计算任务(如数据清洗→分析→存储)
  • 响应式数据流处理

代码示例

func streamPipeline(items []int) {fx.From(func(source chan<- any) {for _, item := range items {source <- item}}).Map(func(item any) any {return processStage1(item.(int)) // 第一阶段处理}, fx.WithWorkers(5)).Filter(func(item any) bool {return item.(int) > 10 // 过滤条件}).Map(func(item any) any {return processStage2(item.(int)) // 第二阶段处理}, fx.WithWorkers(3)).ForEach(func(item any) {fmt.Println(item) // 结果消费})
}

优点

  • 高可扩展性,各阶段可独立优化
  • 支持并行和串行混合处理
  • 代码结构清晰,便于维护

缺点

  • 学习曲线较陡,需理解流处理范式
  • 调试复杂,需关注通道阻塞问题
  • 错误传播需特殊处理

5. 扇入/扇出模式(Fan-in/Fan-out)

模式描述

  • 扇出(Fan-out):一个生产者将任务分发给多个消费者
  • 扇入(Fan-in):多个生产者的结果汇总到一个消费者

适用场景

  • 并行计算(如多API结果合并)
  • 分布式任务分发(如MapReduce)
  • 实时数据聚合(如多传感器数据汇总)

代码示例

// 扇出:单个生产者,多个消费者
func fanOut(items []int, numWorkers int) {itemCh := make(chan int)resultCh := make(chan string)// 生产者go func() {defer close(itemCh)for _, item := range items {itemCh <- item}}()// 多个消费者(扇出)for i := 0; i < numWorkers; i++ {go func() {for item := range itemCh {result := process(item)resultCh <- result}}()}// 扇入:单独协程收集结果go func() {for result := range resultCh {fmt.Println(result)}}()
}

优点

  • 充分利用多核资源,提升并行度
  • 适合计算密集型任务的并行加速
  • 可灵活调整生产者/消费者数量

缺点

  • 需处理多个通道的关闭逻辑
  • 结果顺序可能不确定
  • 资源消耗较大,需注意控制并发度

6. 并行获取不同服务合并(go-zero/mr)

模式描述
使用go-zero框架的mr包,并行调用多个独立服务并合并结果,简化错误处理和超时控制。

适用场景

  • 微服务聚合(如同时调用用户服务、订单服务、商品服务)
  • 多数据源查询(如数据库+缓存+文件系统)
  • 外部API并行调用(如同时查询多个第三方接口)

代码示例

func parallelFetch() (userInfo, orderInfo, productInfo, error) {var (user    userInfoorder   orderInfoproduct productInfo)err := mr.Finish(func() error { // 第一个并行任务var err erroruser, err = userService.GetUser()return err},func() error { // 第二个并行任务var err errororder, err = orderService.GetOrder()return err},func() error { // 第三个并行任务var err errorproduct, err = productService.GetProduct()return err},)if err != nil {return userInfo{}, orderInfo{}, productInfo{}, err}return user, order, product, nil
}

优点

  • 一行代码实现多任务并行
  • 自动处理错误聚合(首个错误优先返回)
  • 支持超时控制(通过context)

缺点

  • 依赖go-zero框架
  • 适用于简单并行场景,复杂流程仍需组合其他模式
  • 结果收集方式较固定

7. 并行扫描(Parallel Scan)

模式描述
将数据集分片,每个协程独立扫描一部分数据,最终合并结果。

适用场景

  • 大数据集搜索(如全文检索)
  • 分布式系统扫描(如多节点数据汇总)
  • 并发文件处理(如多文件并行解析)

代码示例

func parallelScan(data [][]int) int {resultCh := make(chan int)var wg sync.WaitGroup// 为每个分片启动协程for i, chunk := range data {wg.Add(1)go func(idx int, chk []int) {defer wg.Done()sum := 0for _, num := range chk {sum += num}resultCh <- sum}(i, chunk)}// 关闭结果通道go func() {wg.Wait()close(resultCh)}()// 汇总结果total := 0for sum := range resultCh {total += sum}return total
}

优点

  • 线性扩展性能,适合大规模数据处理
  • 实现简单,易于理解
  • 可结合扇入模式处理结果

缺点

  • 数据需可静态分片
  • 负载均衡依赖分片策略
  • 不适合动态数据集

8. 令牌桶限流(Token Bucket)

模式描述
通过令牌生成和消费机制控制并发请求速率,防止资源过载。

适用场景

  • 外部API限流(如第三方服务QPS限制)
  • 数据库连接池保护
  • 资源有限场景下的公平调度

代码示例

func tokenBucket(items []int, rate int, burst int) {limiter := rate.NewLimiter(rate.Limit(rate), burst)var wg sync.WaitGroupfor _, item := range items {wg.Add(1)go func(i int) {defer wg.Done()limiter.Wait(context.Background()) // 获取令牌process(i)}(item)}wg.Wait()
}

优点

  • 精确控制请求速率
  • 平滑突发流量(利用令牌桶容量)
  • 防止系统被瞬时高流量压垮

缺点

  • 需引入golang.org/x/time/rate
  • 可能导致任务排队等待
  • 配置参数需根据服务性能调优

综合对比表

模式核心思想适用场景并发控制方式代码复杂度典型框架/库
任务分批将任务静态分片并行处理批量数据处理、分页查询批数固定标准库
生产者-消费者通过通道解耦生产和消费逻辑IO密集型任务、消息队列处理动态调整消费者数中高标准库
协程池固定数量协程处理任务队列CPU密集型任务、资源受限场景池大小固定go-workerpool
Stream流水线将任务分解为多阶段并行处理复杂数据流程、响应式数据流各阶段独立配置go-zero/fx
扇入/扇出任务分发与结果汇总分离并行计算、分布式任务动态调整生产者/消费者中高标准库
并行服务合并并行调用多个独立服务并聚合结果微服务聚合、多数据源查询自动并行go-zero/mr
并行扫描数据分片并行处理大数据集搜索、分布式系统扫描分片数固定标准库
令牌桶限流通过令牌控制请求速率API限流、资源保护令牌生成速率golang.org/x/time/rate

选择建议

  1. IO密集型任务

    • 优先考虑生产者-消费者模式(解耦生产消费)
    • 结合令牌桶限流控制外部API请求速率
  2. CPU密集型任务

    • 使用协程池限制并发数,避免CPU过载
    • 考虑并行扫描模式处理大数据集
  3. 复杂数据流程

    • 采用Stream流水线模式,提高代码可维护性
    • 结合扇入/扇出模式提升并行度
  4. 服务聚合场景

    • 使用go-zero/mr并行调用多个服务,简化错误处理
  5. 动态负载场景

    • 优先选择支持动态调整并发度的模式(如生产者-消费者)

在实际应用中,往往需要组合多种并发模式以满足复杂需求。例如,一个完整的微服务可能同时使用生产者-消费者处理请求队列,用协程池限制数据库连接数,并用go-zero/mr并行调用其他服务。理解各种模式的核心思想和适用场景,是构建高效并发系统的关键。

相关文章:

  • 佰力博科技与您谈谈高温介电温谱仪如何保养
  • Python-ArcGIS蒸散发组分解析与GPP估算技术
  • 从中控屏看HMI设计的安全与美学博弈
  • Mac 每日磁盘写入量异常高
  • Linux 安装 Remmina
  • ubuntu 24 下使用pip 时碰到Externally Managed Environment Error的解决办法
  • Qt 多线程环境下的全局变量管理与密码安全
  • 当UI设计师遇上数字孪生:如何用设计思维重构工业流程?
  • 优秘AI短视频数字人6月功能更新预告:新增多个AIGC热门功能,智能体和知识库再升级
  • mysql安装教程--笔记
  • PXC集群
  • 判断手机屏幕上的横向滑动(左滑和右滑)
  • Elasticsearch搜索机制与分页优化策略
  • AI应用 Markdown 渲染对比与原生实现方案
  • 基于大模型预测视神经脊髓炎的技术方案大纲
  • 【AUTOSAR】时间保护(Timing Protection)概念、应用与实现源代码解析(下篇)
  • 华为HCIP-Cloud-Service认证H13-821V2.0-002
  • 大模型应用开发第三讲:大模型是Agent的“大脑”,提供通用推理能力(如GPT-4、Claude 3)
  • 记录一ubuntu22.04做开机启动mysql、nginx、redis
  • ceph recovery 相关参数
  • 网站建设公司圣辉友联/网络推广需要多少钱
  • 高唐做网站建设公司/长沙网站包年优化
  • pageadmin政府网站管理系/百度搜索指数的数据来源
  • 网站开发 页面功能布局/深圳整合营销
  • 有没有什么专门做兼职的网站/怎么创建一个网页
  • 网站建设 北京 淘宝/百度网站下拉排名