当前位置: 首页 > news >正文

Go进阶高并发(多线程)处理教程

Go进阶高并发处理教程

目录

  1. Go并发编程基础
  2. Goroutine深入理解
  3. 同步原语详解
  4. 并发模式与最佳实践
  5. 性能优化技巧
  6. 实战案例

Go并发编程基础

什么是并发?

并发是指程序能够同时处理多个任务的能力。Go语言从设计之初就将并发作为核心特性,提供了简洁而强大的并发编程模型。

Go并发模型的优势

  • 轻量级协程:Goroutine比传统线程更轻量
  • CSP模型:通过通信来共享内存,而不是通过共享内存来通信
  • 内置调度器:Go运行时自动管理goroutine的调度

Goroutine深入理解

创建和启动Goroutine

package mainimport ("fmt""time"
)func worker(id int) {fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {// 启动多个goroutinefor i := 1; i <= 5; i++ {go worker(i)}// 等待所有goroutine完成time.Sleep(2 * time.Second)fmt.Println("All workers completed")
}

Goroutine的生命周期

  1. 创建:使用go关键字创建
  2. 调度:由Go调度器管理
  3. 执行:在可用的OS线程上执行
  4. 结束:函数返回时自动结束

调度器工作原理

Go使用M:N调度模型:

  • M:OS线程(Machine)
  • P:处理器(Processor)
  • G:Goroutine
G1  G2  G3  G4\   |   |  /\  |   | /\ |   |/\|   |P1  P2|   |M1  M2

同步原语详解

sync.WaitGroup

用于等待一组goroutine完成:

package mainimport ("fmt""sync""time"
)func worker(id int, wg *sync.WaitGroup) {defer wg.Done() // 完成时调用Done()fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {var wg sync.WaitGroupfor i := 1; i <= 5; i++ {wg.Add(1) // 增加等待计数go worker(i, &wg)}wg.Wait() // 等待所有goroutine完成fmt.Println("All workers completed")
}

sync.Mutex

互斥锁用于保护共享资源:

package mainimport ("fmt""sync"
)type Counter struct {mu    sync.Mutexvalue int
}func (c *Counter) Increment() {c.mu.Lock()defer c.mu.Unlock()c.value++
}func (c *Counter) Value() int {c.mu.Lock()defer c.mu.Unlock()return c.value
}func main() {counter := &Counter{}var wg sync.WaitGroup// 启动100个goroutine同时增加计数器for i := 0; i < 100; i++ {wg.Add(1)go func() {defer wg.Done()for j := 0; j < 1000; j++ {counter.Increment()}}()}wg.Wait()fmt.Printf("Final counter value: %d\n", counter.Value())
}

sync.RWMutex

读写锁允许多个读操作同时进行:

type SafeMap struct {mu sync.RWMutexdata map[string]int
}func (sm *SafeMap) Get(key string) (int, bool) {sm.mu.RLock()defer sm.mu.RUnlock()val, ok := sm.data[key]return val, ok
}func (sm *SafeMap) Set(key string, value int) {sm.mu.Lock()defer sm.mu.Unlock()sm.data[key] = value
}

sync.Once

确保某个操作只执行一次:

package mainimport ("fmt""sync"
)var once sync.Once
var instance *Singletontype Singleton struct {data string
}func GetInstance() *Singleton {once.Do(func() {fmt.Println("Creating singleton instance")instance = &Singleton{data: "singleton"}})return instance
}func main() {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()s := GetInstance()fmt.Printf("Goroutine %d got instance: %s\n", id, s.data)}(i)}wg.Wait()
}

并发模式与最佳实践

Worker Pool模式

package mainimport ("fmt""sync""time"
)type Job struct {ID   intData string
}type Result struct {Job    JobOutput string
}func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {fmt.Printf("Worker %d processing job %d\n", id, job.ID)time.Sleep(time.Millisecond * 100) // 模拟工作result := Result{Job:    job,Output: fmt.Sprintf("Processed by worker %d", id),}results <- result}
}func main() {const numWorkers = 3const numJobs = 10jobs := make(chan Job, numJobs)results := make(chan Result, numJobs)var wg sync.WaitGroup// 启动workerfor i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, jobs, results, &wg)}// 发送任务for i := 1; i <= numJobs; i++ {jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}}close(jobs)// 等待所有worker完成go func() {wg.Wait()close(results)}()// 收集结果for result := range results {fmt.Printf("Job %d result: %s\n", result.Job.ID, result.Output)}
}

扇入扇出模式

// 扇出:将工作分发给多个goroutine
func fanOut(input <-chan int, workers int) []<-chan int {outputs := make([]<-chan int, workers)for i := 0; i < workers; i++ {output := make(chan int)outputs[i] = outputgo func(out chan<- int) {defer close(out)for n := range input {out <- n * n // 计算平方}}(output)}return outputs
}// 扇入:将多个channel的结果合并
func fanIn(inputs ...<-chan int) <-chan int {output := make(chan int)var wg sync.WaitGroupfor _, input := range inputs {wg.Add(1)go func(in <-chan int) {defer wg.Done()for n := range in {output <- n}}(input)}go func() {wg.Wait()close(output)}()return output
}

性能优化技巧

1. 合理设置GOMAXPROCS

import "runtime"func init() {// 设置使用的CPU核心数runtime.GOMAXPROCS(runtime.NumCPU())
}

2. 避免goroutine泄漏

// 错误示例:可能导致goroutine泄漏
func badExample() {ch := make(chan int)go func() {ch <- 1 // 如果没有接收者,这个goroutine会永远阻塞}()// 函数返回,但goroutine仍在运行
}// 正确示例:使用context控制goroutine生命周期
func goodExample(ctx context.Context) {ch := make(chan int, 1) // 使用缓冲channelgo func() {select {case ch <- 1:case <-ctx.Done():return}}()
}

3. 使用对象池减少GC压力

import "sync"var pool = sync.Pool{New: func() interface{} {return make([]byte, 1024)},
}func processData(data []byte) {buf := pool.Get().([]byte)defer pool.Put(buf)// 使用buf处理数据
}

实战案例

并发HTTP客户端

package mainimport ("fmt""net/http""sync""time"
)type Result struct {URL        stringStatusCode intDuration   time.DurationError      error
}func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := Result{URL:      url,Duration: duration,Error:    err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org",}results := make(chan Result, len(urls))var wg sync.WaitGroup// 并发请求所有URLfor _, url := range urls {wg.Add(1)go fetchURL(url, results, &wg)}// 等待所有请求完成go func() {wg.Wait()close(results)}()// 处理结果for result := range results {if result.Error != nil {fmt.Printf("Error fetching %s: %v\n", result.URL, result.Error)} else {fmt.Printf("%s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}

总结

Go语言的并发编程提供了强大而简洁的工具:

  1. Goroutine:轻量级协程,易于创建和管理
  2. Channel:类型安全的通信机制
  3. sync包:提供各种同步原语
  4. 并发模式:Worker Pool、扇入扇出等经典模式

掌握这些概念和技巧,能够帮助您构建高性能、可扩展的并发应用程序。记住Go的并发哲学:通过通信来共享内存,而不是通过共享内存来通信

参考资源

  • Go官方文档 - 并发
  • Go并发模式
  • Go内存模型
http://www.dtcms.com/a/295373.html

相关文章:

  • 中小企业安全落地:低成本漏洞管理与攻击防御方案
  • 新手操作steam搬砖项目,应该如何快速起步
  • 图机器学习(19)——金融数据分析
  • 深度分析Java类加载机制
  • 医疗AI轻量化部署方案的深度梳理与优化路径判研
  • k8s把某个secret挂在某命名空间下
  • MySQL深度理解-MySQL事务优化
  • 现代C++的一般编程规范
  • 【CMake】CMake 常用语法总结
  • SSP通过SDK对接流量的原理与实现
  • SSM之表现层数据封装-统一响应格式全局异常处理
  • 主要分布在背侧海马体(dHPC)CA1区域(dCA1)的位置细胞对NLP中的深层语义分析的积极影响和启示
  • 大模型处理私有数据的核心技术
  • 《R 矩阵》
  • 基础NLP | 02 深度学习基本原理
  • Unity 多人游戏框架学习系列九
  • RocketMQ搭建及测试(Windows环境)
  • 基于深度学习的图像分类:使用MobileNet实现高效分类
  • 路径总和Ⅲ(树)C++
  • 网络编程基石:TCP 原理全解析
  • AbMole小课堂 | Nivolumab(BMS-936558):PD-1人源化单抗的作用机制与抗肿瘤应用
  • 给定一个长度为n的数组,和一个长度为w的滑动窗口,w < n, 窗口沿着数组每次滑动一个位置,求出每次滑动后,滑动窗口内的最大值。 C++实现高效代码
  • 数据库底层索引讲解-排序和数据结构
  • Ethereum: 从零到一为DApp开发搭建专属的私有测试网络
  • Compose 适配 - 键鼠模式
  • Ethereum: 从 1e+21 到千枚以太币:解密 Geth 控制台的余额查询
  • Day30| 452. 用最少数量的箭引爆气球、435. 无重叠区间、763.划分字母区间
  • 风险分级响应管理分析系统
  • 基于 PIC16 系列的多功能电子烟(温控 + 电压控制 + 多模式)方案
  • 亚马逊云科技 EC2 部署 Dify,集成 Amazon Bedrock 构建生成式 AI 应用