当前位置: 首页 > news >正文

Go从入门到精通(27) - 并行任务处理器

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • 一、示例代码
  • 二、输出结果
  • 总结
    • 这个例子的核心价值(通用场景适配):
    • 实际开发中的应用场景:


前言

并行任务处理器,用于处理批量任务(如批量数据处理、文件转换、API 调用等)。这种模式在实际开发中非常常见,比如批量处理订单、批量发送通知、批量图片压缩等场景。


一、示例代码

这个例子实现了一个可配置的并行任务池,支持控制并发数量,通过通道传递任务和结果,避免资源耗尽,同时支持错误处理。

package mainimport ("errors""fmt""math/rand""sync""time"
)// Task 表示一个待处理的任务
type Task struct {ID     int    // 任务IDData   string // 任务数据Result chan<- Result // 用于返回结果的通道
}// Result 表示任务处理结果
type Result struct {TaskID  intSuccess boolOutput  stringErr     error
}// worker 工作协程:从任务通道获取任务并处理
func worker(id int, taskChan <-chan Task, wg *sync.WaitGroup) {defer wg.Done()fmt.Printf("工作协程 #%d 启动\n", id)for task := range taskChan {fmt.Printf("工作协程 #%d 开始处理任务 #%d\n", id, task.ID)// 模拟任务处理(如数据计算、API调用、文件处理等)time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)+300))// 模拟处理结果(随机成功/失败)var result Resultif rand.Float64() < 0.2 { // 20% 失败率result = Result{TaskID:  task.ID,Success: false,Err:     errors.New("处理失败:模拟错误"),}} else {result = Result{TaskID:  task.ID,Success: true,Output:  fmt.Sprintf("处理完成:%s(由工作协程 #%d 处理)", task.Data, id),}}// 将结果发送回结果通道task.Result <- resultfmt.Printf("工作协程 #%d 完成任务 #%d\n", id, task.ID)}fmt.Printf("工作协程 #%d 退出\n", id)
}// createTasks 生成一批任务
func createTasks(count int, resultChan chan<- Result) []Task {tasks := make([]Task, count)for i := 0; i < count; i++ {tasks[i] = Task{ID:     i + 1,Data:   fmt.Sprintf("任务数据 %d", i+1),Result: resultChan, // 所有任务共享同一个结果通道}}return tasks
}func main() {rand.Seed(time.Now().UnixNano())// 配置参数(实际开发中可通过配置文件或参数传入)const (taskCount  = 10    // 总任务数workerCount = 3    // 并发工作协程数(控制并发量))// 创建通道taskChan := make(chan Task, taskCount)   // 任务通道(带缓冲避免阻塞)resultChan := make(chan Result, taskCount) // 结果通道// 启动工作协程池var wg sync.WaitGroupwg.Add(workerCount)for i := 0; i < workerCount; i++ {go worker(i+1, taskChan, &wg)}// 生成任务并发送到任务通道tasks := createTasks(taskCount, resultChan)go func() {for _, task := range tasks {taskChan <- task}close(taskChan) // 所有任务发送完成后关闭通道,通知worker退出}()// 收集结果(单独的协程处理,不阻塞主逻辑)go func() {wg.Wait() // 等待所有worker完成close(resultChan) // 关闭结果通道,通知主程序结果收集完成}()// 处理结果successCount := 0failCount := 0for result := range resultChan {if result.Success {successCount++fmt.Printf("任务 #%d 成功: %s\n", result.TaskID, result.Output)} else {failCount++fmt.Printf("任务 #%d 失败: %v\n", result.TaskID, result.Err)}}// 输出汇总信息fmt.Printf("\n处理完成:总任务数=%d, 成功=%d, 失败=%d\n",taskCount, successCount, failCount)
}

二、输出结果

工作协程 #1 启动
工作协程 #2 启动
工作协程 #3 启动
工作协程 #3 开始处理任务 #3
工作协程 #1 开始处理任务 #1
工作协程 #2 开始处理任务 #2
工作协程 #1 完成任务 #1
工作协程 #1 开始处理任务 #4
任务 #1 成功: 处理完成:任务数据 1(由工作协程 #1 处理)
工作协程 #3 完成任务 #3
工作协程 #3 开始处理任务 #5
任务 #3 成功: 处理完成:任务数据 3(由工作协程 #3 处理)
工作协程 #2 完成任务 #2
工作协程 #2 开始处理任务 #6
任务 #2 成功: 处理完成:任务数据 2(由工作协程 #2 处理)
工作协程 #3 完成任务 #5
工作协程 #3 开始处理任务 #7
任务 #5 成功: 处理完成:任务数据 5(由工作协程 #3 处理)
工作协程 #2 完成任务 #6
工作协程 #2 开始处理任务 #8
任务 #6 成功: 处理完成:任务数据 6(由工作协程 #2 处理)
工作协程 #1 完成任务 #4
工作协程 #1 开始处理任务 #9
任务 #4 成功: 处理完成:任务数据 4(由工作协程 #1 处理)
工作协程 #3 完成任务 #7
工作协程 #3 开始处理任务 #10
任务 #7 成功: 处理完成:任务数据 7(由工作协程 #3 处理)
工作协程 #2 完成任务 #8
工作协程 #2 退出
任务 #8 失败: 处理失败:模拟错误
工作协程 #1 完成任务 #9
工作协程 #1 退出
任务 #9 成功: 处理完成:任务数据 9(由工作协程 #1 处理)
工作协程 #3 完成任务 #10
工作协程 #3 退出
任务 #10 成功: 处理完成:任务数据 10(由工作协程 #3 处理)
处理完成:总任务数=10, 成功=9, 失败=1

总结

这个例子的核心价值(通用场景适配):

  1. 并发控制
    通过 workerCount 控制并发数量,避免因任务过多导致资源耗尽(如过多的数据库连接、网络请求)。
  2. 任务解耦:
    任务生产者(createTasks)只负责生成任务,不关心谁处理
    工作协程(worker)只负责处理任务,不关心任务来源
    结果处理器只负责收集结果,不关心处理过程
  3. 错误隔离:
    单个任务处理失败不会影响其他任务,适合需要部分失败容忍的场景(如批量导入数据)。
  4. 可扩展性:
    可根据系统负载动态调整 workerCount(如根据 CPU 核心数自动设置)
    可通过修改 worker 函数的逻辑适配不同任务(如替换为文件处理、API 调用等)
    可增加任务优先级队列(通过多个通道实现不同优先级)

实际开发中的应用场景:

批量处理用户数据(如数据清洗、格式转换)
并发调用第三方 API(如批量查询物流信息)
图片 / 视频批量处理(如压缩、格式转换)
日志批量分析(如统计错误率、提取关键词)
数据库批量操作(如批量插入、更新)

运行程序后,可以看到 3 个工作协程并行处理 10 个任务,任务结果会被统一收集并统计,整个过程通过通道实现了安全的通信,避免了共享内存带来的并发问题。

http://www.dtcms.com/a/568669.html

相关文章:

  • Claude Code 使用 MiniMax M2 模型
  • Auto CAD二次开发——复制和旋转图形对象
  • 全屏响应式网站模板网站seo综合公司
  • php做简单网站教程视频教程企业门户网站模板 下载
  • Rust开发实战之WebSocket通信实现(tokio-tungstenite)
  • 编译缓存利器 ccahce、sccahce
  • Rust开发实战之使用 Reqwest 实现 HTTP 客户端请求
  • 各大公司开源网站广州出台21条措施扶持餐饮住宿
  • gmt_create为啥叫gmt
  • 从 NGINX 到 Kubernetes Ingress:现代微服务流量管理实战
  • 【C++】继承(2):继承与友元,静态成员,多继承黑/白盒复用
  • css实战:常用伪元素选择器介绍
  • 4.4 路由算法与路由协议【2013统考真题】
  • 营销型网站建设需要备案吗上饶网站建设企业
  • 福建网站建设科技有限公司品牌建设还需持续力
  • 工业CMOS相机的原理及基础知识
  • 无人机电气隔离与抗干扰技术概述
  • Elasticsearch的学习
  • GitHub 热榜项目 - 日榜(2025-11-04)
  • SAP 概述
  • 深圳家具网站建设做网站需要会写代码6
  • 常见的网站文件后缀名
  • 18、docker-macvlan-2-示例
  • ICCV2025 | GLEAM:通过全局-局部变换增强的面向视觉-语言预训练模型的可迁移对抗性攻击
  • Visual Studio 编程工程设置
  • 自我系统更新
  • 【数据结构】双向链表的实现
  • 《Linux系统编程之开发工具》【版本控制器 + 调试器】
  • C++ :C宏函数的升级:内联函数inline
  • 青海网站建设费用织梦后台怎么建设网站