Go从入门到精通(27) - 并行任务处理器
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、示例代码
- 二、输出结果
- 总结
- 这个例子的核心价值(通用场景适配):
- 实际开发中的应用场景:
前言
并行任务处理器,用于处理批量任务(如批量数据处理、文件转换、API 调用等)。这种模式在实际开发中非常常见,比如批量处理订单、批量发送通知、批量图片压缩等场景。
一、示例代码
这个例子实现了一个可配置的并行任务池,支持控制并发数量,通过通道传递任务和结果,避免资源耗尽,同时支持错误处理。
package mainimport ("errors""fmt""math/rand""sync""time"
)// Task 表示一个待处理的任务
type Task struct {ID int // 任务IDData string // 任务数据Result chan<- Result // 用于返回结果的通道
}// Result 表示任务处理结果
type Result struct {TaskID intSuccess boolOutput stringErr error
}// worker 工作协程:从任务通道获取任务并处理
func worker(id int, taskChan <-chan Task, wg *sync.WaitGroup) {defer wg.Done()fmt.Printf("工作协程 #%d 启动\n", id)for task := range taskChan {fmt.Printf("工作协程 #%d 开始处理任务 #%d\n", id, task.ID)// 模拟任务处理(如数据计算、API调用、文件处理等)time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)+300))// 模拟处理结果(随机成功/失败)var result Resultif rand.Float64() < 0.2 { // 20% 失败率result = Result{TaskID: task.ID,Success: false,Err: errors.New("处理失败:模拟错误"),}} else {result = Result{TaskID: task.ID,Success: true,Output: fmt.Sprintf("处理完成:%s(由工作协程 #%d 处理)", task.Data, id),}}// 将结果发送回结果通道task.Result <- resultfmt.Printf("工作协程 #%d 完成任务 #%d\n", id, task.ID)}fmt.Printf("工作协程 #%d 退出\n", id)
}// createTasks 生成一批任务
func createTasks(count int, resultChan chan<- Result) []Task {tasks := make([]Task, count)for i := 0; i < count; i++ {tasks[i] = Task{ID: i + 1,Data: fmt.Sprintf("任务数据 %d", i+1),Result: resultChan, // 所有任务共享同一个结果通道}}return tasks
}func main() {rand.Seed(time.Now().UnixNano())// 配置参数(实际开发中可通过配置文件或参数传入)const (taskCount = 10 // 总任务数workerCount = 3 // 并发工作协程数(控制并发量))// 创建通道taskChan := make(chan Task, taskCount) // 任务通道(带缓冲避免阻塞)resultChan := make(chan Result, taskCount) // 结果通道// 启动工作协程池var wg sync.WaitGroupwg.Add(workerCount)for i := 0; i < workerCount; i++ {go worker(i+1, taskChan, &wg)}// 生成任务并发送到任务通道tasks := createTasks(taskCount, resultChan)go func() {for _, task := range tasks {taskChan <- task}close(taskChan) // 所有任务发送完成后关闭通道,通知worker退出}()// 收集结果(单独的协程处理,不阻塞主逻辑)go func() {wg.Wait() // 等待所有worker完成close(resultChan) // 关闭结果通道,通知主程序结果收集完成}()// 处理结果successCount := 0failCount := 0for result := range resultChan {if result.Success {successCount++fmt.Printf("任务 #%d 成功: %s\n", result.TaskID, result.Output)} else {failCount++fmt.Printf("任务 #%d 失败: %v\n", result.TaskID, result.Err)}}// 输出汇总信息fmt.Printf("\n处理完成:总任务数=%d, 成功=%d, 失败=%d\n",taskCount, successCount, failCount)
}
二、输出结果
工作协程 #1 启动
工作协程 #2 启动
工作协程 #3 启动
工作协程 #3 开始处理任务 #3
工作协程 #1 开始处理任务 #1
工作协程 #2 开始处理任务 #2
工作协程 #1 完成任务 #1
工作协程 #1 开始处理任务 #4
任务 #1 成功: 处理完成:任务数据 1(由工作协程 #1 处理)
工作协程 #3 完成任务 #3
工作协程 #3 开始处理任务 #5
任务 #3 成功: 处理完成:任务数据 3(由工作协程 #3 处理)
工作协程 #2 完成任务 #2
工作协程 #2 开始处理任务 #6
任务 #2 成功: 处理完成:任务数据 2(由工作协程 #2 处理)
工作协程 #3 完成任务 #5
工作协程 #3 开始处理任务 #7
任务 #5 成功: 处理完成:任务数据 5(由工作协程 #3 处理)
工作协程 #2 完成任务 #6
工作协程 #2 开始处理任务 #8
任务 #6 成功: 处理完成:任务数据 6(由工作协程 #2 处理)
工作协程 #1 完成任务 #4
工作协程 #1 开始处理任务 #9
任务 #4 成功: 处理完成:任务数据 4(由工作协程 #1 处理)
工作协程 #3 完成任务 #7
工作协程 #3 开始处理任务 #10
任务 #7 成功: 处理完成:任务数据 7(由工作协程 #3 处理)
工作协程 #2 完成任务 #8
工作协程 #2 退出
任务 #8 失败: 处理失败:模拟错误
工作协程 #1 完成任务 #9
工作协程 #1 退出
任务 #9 成功: 处理完成:任务数据 9(由工作协程 #1 处理)
工作协程 #3 完成任务 #10
工作协程 #3 退出
任务 #10 成功: 处理完成:任务数据 10(由工作协程 #3 处理)
处理完成:总任务数=10, 成功=9, 失败=1
总结
这个例子的核心价值(通用场景适配):
- 并发控制:
通过 workerCount 控制并发数量,避免因任务过多导致资源耗尽(如过多的数据库连接、网络请求)。 - 任务解耦:
任务生产者(createTasks)只负责生成任务,不关心谁处理
工作协程(worker)只负责处理任务,不关心任务来源
结果处理器只负责收集结果,不关心处理过程 - 错误隔离:
单个任务处理失败不会影响其他任务,适合需要部分失败容忍的场景(如批量导入数据)。 - 可扩展性:
可根据系统负载动态调整 workerCount(如根据 CPU 核心数自动设置)
可通过修改 worker 函数的逻辑适配不同任务(如替换为文件处理、API 调用等)
可增加任务优先级队列(通过多个通道实现不同优先级)
实际开发中的应用场景:
批量处理用户数据(如数据清洗、格式转换)
并发调用第三方 API(如批量查询物流信息)
图片 / 视频批量处理(如压缩、格式转换)
日志批量分析(如统计错误率、提取关键词)
数据库批量操作(如批量插入、更新)
运行程序后,可以看到 3 个工作协程并行处理 10 个任务,任务结果会被统一收集并统计,整个过程通过通道实现了安全的通信,避免了共享内存带来的并发问题。
