GO中常见并发模式总结
本文对Go语言中常见并发模式的详细总结,包括适用场景、代码示例和优缺点分析,帮助你在不同业务场景中做出合适的选择。
1. 任务分批处理(Batch Processing)
模式描述:
将大任务拆分为多个独立批次,每个批次由一个协程处理,适合任务可静态分割且批间无依赖的场景。
适用场景:
- 数据批量导入/导出(如CSV文件分批处理)
- 分页数据并行查询(如多页API数据拉取)
- 离线计算任务(如按日期范围分批统计)
代码示例:
func batchProcessing(items []int, batchSize int) {var wg sync.WaitGroupbatches := chunk(items, batchSize) // 自定义分批次函数for _, batch := range batches {wg.Add(1)go func(batch []int) {defer wg.Done()for _, item := range batch {process(item) // 处理单个任务}}(batch)}wg.Wait()
}
优点:
- 实现简单,无需复杂同步机制
- 批大小可控,避免内存溢出
缺点:
- 批处理粒度固定,难以动态调整
- 若单批任务耗时不均,可能导致整体效率下降
2. 生产者-消费者模式(Producer-Consumer)
模式描述:
通过通道连接生产者和消费者,生产者负责生成数据,消费者负责处理数据,实现解耦和并发控制。
适用场景:
- IO密集型任务(如文件读取与处理分离)
- 任务队列处理(如消息队列消费)
- 数据管道(如爬虫抓取与解析分离)
代码示例:
func producerConsumer(items []int, numWorkers int) {itemCh := make(chan int)resultCh := make(chan string)var wg sync.WaitGroup// 生产者go func() {defer close(itemCh)for _, item := range items {itemCh <- item}}()// 消费者wg.Add(numWorkers)for i := 0; i < numWorkers; i++ {go func() {defer wg.Done()for item := range itemCh {result := process(item)resultCh <- result}}()}// 关闭结果通道go func() {wg.Wait()close(resultCh)}()// 处理结果for result := range resultCh {fmt.Println(result)}
}
优点:
- 生产与消费解耦,可独立扩展
- 支持动态调整消费者数量
- 利用通道缓冲平衡速率差异
缺点:
- 需管理多个通道和等待组
- 错误处理需在通道中传递
3. 协程池(Goroutine Pool)
模式描述:
预先创建固定数量的协程,通过任务队列分配工作,避免频繁创建/销毁协程的开销。
适用场景:
- CPU密集型任务(限制并发数以避免CPU过载)
- 资源受限环境(如数据库连接数限制)
- 高频短任务(如HTTP请求处理)
代码示例:
func goroutinePool(items []int, poolSize int) {pool := workerpool.New(poolSize) // 使用第三方协程池库for _, item := range items {item := item // 避免闭包捕获循环变量pool.Submit(func() {process(item)})}pool.StopWait() // 等待所有任务完成
}
如果不愿意引入第三方库,可以简单通过channel进行携程数量限制:
var limit = make(chan int, 3)func main() {for _, w := range work {go func() {limit <- 1w()<-limit}()}select{}
}
优点:
- 严格控制并发上限,防止资源耗尽
- 减少协程创建开销,提升性能
- 适合长时间运行的服务
缺点:
- 需引入第三方库(如
go-workerpool、ants pool
) - 任务处理顺序不确定
- 池大小需根据实际场景调优
4. Stream流水线模式(Pipeline)
模式描述:
将任务分解为多个阶段,每个阶段通过通道连接,可并行处理不同阶段。
适用场景:
- 复杂数据处理流程(如ETL管道)
- 多步骤计算任务(如数据清洗→分析→存储)
- 响应式数据流处理
代码示例:
func streamPipeline(items []int) {fx.From(func(source chan<- any) {for _, item := range items {source <- item}}).Map(func(item any) any {return processStage1(item.(int)) // 第一阶段处理}, fx.WithWorkers(5)).Filter(func(item any) bool {return item.(int) > 10 // 过滤条件}).Map(func(item any) any {return processStage2(item.(int)) // 第二阶段处理}, fx.WithWorkers(3)).ForEach(func(item any) {fmt.Println(item) // 结果消费})
}
优点:
- 高可扩展性,各阶段可独立优化
- 支持并行和串行混合处理
- 代码结构清晰,便于维护
缺点:
- 学习曲线较陡,需理解流处理范式
- 调试复杂,需关注通道阻塞问题
- 错误传播需特殊处理
5. 扇入/扇出模式(Fan-in/Fan-out)
模式描述:
- 扇出(Fan-out):一个生产者将任务分发给多个消费者
- 扇入(Fan-in):多个生产者的结果汇总到一个消费者
适用场景:
- 并行计算(如多API结果合并)
- 分布式任务分发(如MapReduce)
- 实时数据聚合(如多传感器数据汇总)
代码示例:
// 扇出:单个生产者,多个消费者
func fanOut(items []int, numWorkers int) {itemCh := make(chan int)resultCh := make(chan string)// 生产者go func() {defer close(itemCh)for _, item := range items {itemCh <- item}}()// 多个消费者(扇出)for i := 0; i < numWorkers; i++ {go func() {for item := range itemCh {result := process(item)resultCh <- result}}()}// 扇入:单独协程收集结果go func() {for result := range resultCh {fmt.Println(result)}}()
}
优点:
- 充分利用多核资源,提升并行度
- 适合计算密集型任务的并行加速
- 可灵活调整生产者/消费者数量
缺点:
- 需处理多个通道的关闭逻辑
- 结果顺序可能不确定
- 资源消耗较大,需注意控制并发度
6. 并行获取不同服务合并(go-zero/mr)
模式描述:
使用go-zero框架的mr
包,并行调用多个独立服务并合并结果,简化错误处理和超时控制。
适用场景:
- 微服务聚合(如同时调用用户服务、订单服务、商品服务)
- 多数据源查询(如数据库+缓存+文件系统)
- 外部API并行调用(如同时查询多个第三方接口)
代码示例:
func parallelFetch() (userInfo, orderInfo, productInfo, error) {var (user userInfoorder orderInfoproduct productInfo)err := mr.Finish(func() error { // 第一个并行任务var err erroruser, err = userService.GetUser()return err},func() error { // 第二个并行任务var err errororder, err = orderService.GetOrder()return err},func() error { // 第三个并行任务var err errorproduct, err = productService.GetProduct()return err},)if err != nil {return userInfo{}, orderInfo{}, productInfo{}, err}return user, order, product, nil
}
优点:
- 一行代码实现多任务并行
- 自动处理错误聚合(首个错误优先返回)
- 支持超时控制(通过context)
缺点:
- 依赖go-zero框架
- 适用于简单并行场景,复杂流程仍需组合其他模式
- 结果收集方式较固定
7. 并行扫描(Parallel Scan)
模式描述:
将数据集分片,每个协程独立扫描一部分数据,最终合并结果。
适用场景:
- 大数据集搜索(如全文检索)
- 分布式系统扫描(如多节点数据汇总)
- 并发文件处理(如多文件并行解析)
代码示例:
func parallelScan(data [][]int) int {resultCh := make(chan int)var wg sync.WaitGroup// 为每个分片启动协程for i, chunk := range data {wg.Add(1)go func(idx int, chk []int) {defer wg.Done()sum := 0for _, num := range chk {sum += num}resultCh <- sum}(i, chunk)}// 关闭结果通道go func() {wg.Wait()close(resultCh)}()// 汇总结果total := 0for sum := range resultCh {total += sum}return total
}
优点:
- 线性扩展性能,适合大规模数据处理
- 实现简单,易于理解
- 可结合扇入模式处理结果
缺点:
- 数据需可静态分片
- 负载均衡依赖分片策略
- 不适合动态数据集
8. 令牌桶限流(Token Bucket)
模式描述:
通过令牌生成和消费机制控制并发请求速率,防止资源过载。
适用场景:
- 外部API限流(如第三方服务QPS限制)
- 数据库连接池保护
- 资源有限场景下的公平调度
代码示例:
func tokenBucket(items []int, rate int, burst int) {limiter := rate.NewLimiter(rate.Limit(rate), burst)var wg sync.WaitGroupfor _, item := range items {wg.Add(1)go func(i int) {defer wg.Done()limiter.Wait(context.Background()) // 获取令牌process(i)}(item)}wg.Wait()
}
优点:
- 精确控制请求速率
- 平滑突发流量(利用令牌桶容量)
- 防止系统被瞬时高流量压垮
缺点:
- 需引入
golang.org/x/time/rate
包 - 可能导致任务排队等待
- 配置参数需根据服务性能调优
综合对比表
模式 | 核心思想 | 适用场景 | 并发控制方式 | 代码复杂度 | 典型框架/库 |
---|---|---|---|---|---|
任务分批 | 将任务静态分片并行处理 | 批量数据处理、分页查询 | 批数固定 | 低 | 标准库 |
生产者-消费者 | 通过通道解耦生产和消费逻辑 | IO密集型任务、消息队列处理 | 动态调整消费者数 | 中高 | 标准库 |
协程池 | 固定数量协程处理任务队列 | CPU密集型任务、资源受限场景 | 池大小固定 | 中 | go-workerpool |
Stream流水线 | 将任务分解为多阶段并行处理 | 复杂数据流程、响应式数据流 | 各阶段独立配置 | 高 | go-zero/fx |
扇入/扇出 | 任务分发与结果汇总分离 | 并行计算、分布式任务 | 动态调整生产者/消费者 | 中高 | 标准库 |
并行服务合并 | 并行调用多个独立服务并聚合结果 | 微服务聚合、多数据源查询 | 自动并行 | 低 | go-zero/mr |
并行扫描 | 数据分片并行处理 | 大数据集搜索、分布式系统扫描 | 分片数固定 | 中 | 标准库 |
令牌桶限流 | 通过令牌控制请求速率 | API限流、资源保护 | 令牌生成速率 | 低 | golang.org/x/time/rate |
选择建议
-
IO密集型任务:
- 优先考虑生产者-消费者模式(解耦生产消费)
- 结合令牌桶限流控制外部API请求速率
-
CPU密集型任务:
- 使用协程池限制并发数,避免CPU过载
- 考虑并行扫描模式处理大数据集
-
复杂数据流程:
- 采用Stream流水线模式,提高代码可维护性
- 结合扇入/扇出模式提升并行度
-
服务聚合场景:
- 使用go-zero/mr并行调用多个服务,简化错误处理
-
动态负载场景:
- 优先选择支持动态调整并发度的模式(如生产者-消费者)
在实际应用中,往往需要组合多种并发模式以满足复杂需求。例如,一个完整的微服务可能同时使用生产者-消费者处理请求队列,用协程池限制数据库连接数,并用go-zero/mr并行调用其他服务。理解各种模式的核心思想和适用场景,是构建高效并发系统的关键。