Golang 并发快速上手
文章目录
- 1. 为什么要用协程?
- 1.1 进程与线程
- 1.2 协程
- 1.3 线程和协程的区别
- 线程
- 协程
- 1.4 Go 协程(goroutines)和协程(coroutines)
- 2.Go 协程基本内容
- 2.1 channel
- 2.2 select
- 2.3 future 模式
- 3. 实践示例
- 3.1 并发处理多个网络请求
- 3.2 工作池(Worker Pool)
- 3.3 协程使用最佳实践
- 4. 并发安全
- 4.1 happens-before 原则
- 4.2 并发安全类型
- 5. GMP 模型
- 5.1 GMP
- 5.2 协程数量
- 6. 协程池 ants
- 6.1 特性
- 6.2 运行流程
- 参考文献
不要通过共享内存来通信,而通过通信来共享内存。
1. 为什么要用协程?
线程先出现还是协程先出现?
1.1 进程与线程
进程是对运行时程序的封装,是系统进行资源调度和分配的的基本单位,实现了操作系统的并发。
线程是进程的子任务,作为CPU调度和分配的基本单元,它们既能确保程序实时响应,又实现了进程内部的并发执行。同时,线程也是操作系统能够识别的最小执行和调度单位。
用户态线程和内核态线程是操作系统层面的概念,而CPU线程数属于硬件层面的逻辑概念。CPU线程数指的是单个物理核心通过超线程技术虚拟出的逻辑核心数量,每个物理核心最多支持两个线程,因此系统显示的线程总数不会超过物理核心数的两倍。
1.2 协程
协程是早于线程出现的。协程哪怕没有操作系统干预也可以实现,毕竟任何编程语言自身就能够实现这个结构。早期的多任务大多来自于此。
协程是非抢占式多任务,线程是抢占式多任务。
协程需要编写代码者主动让出控制权,而线程可以无需规划让出控制权的时间点。
协程实现在用户态,线程实现在内核态。
以函数为例:
void func() {print("a")print("b")print("c")
}def A():co = func() # 得到该协程print("in function A") # do something
void func() {print("a")yield #java中的yield是请求释放cpu资源print("b")yieldprint("c")
}def A():co = func() # 得到该协程next(co) # 调用协程print("in function A") # do somethingnext(co) # 再次调用该协程
线程也可以被暂停,操作系统保存线程运行状态然后去调度其它线程,此后该线程再次被分配CPU时还可以继续运行,就像没有被暂停过一样。只不过线程的调度是操作系统实现的,这些对程序员都不可见,而协程是在用户态实现的,对程序员可见。这就是为什么有的人说可以把协程理解为用户态线程的原因。
1.3 线程和协程的区别
线程
- 共享变量(解决了通讯麻烦的问题,但是对于变量的访问需要加锁)
- 调度由操作系统完成
- 一个进程可以有多个线程,每个线程会共享父进程的资源(创建线程开销占用比进程小很多,可创建的数量也会很多)
- 通讯除了可使用进程间通讯的方式,还可以通过共享内存的方式进行通信(通过共享内存通信比通过内核要快很多)
- 线程的使用会给系统带来上下文切换的额外负担。
协程
- 调度完全由用户控制
- 一个线程(进程)可以有多个协程
- 每个线程(进程)循环按照指定的任务清单顺序完成不同的任务(当任务被堵塞时,执行下一个任务;当恢复时,再回来执行这个任务;任务间切换只需要保存任务的上下文,没有内核的开销,可以不加锁的访问全局变量)
- 协程需要保证是非堵塞的且没有相互依赖
- 协程基本上不能同步通讯,多采用异步的消息通讯,效率比较高
1.4 Go 协程(goroutines)和协程(coroutines)
应用程序在运行时会创建进程,这些进程独立运行但共享内存空间。通过多线程技术,程序能够同时处理多个请求,从而显著提升性能。不过,多线程访问共享内存时容易出现竞态条件问题。Go语言通过轻量级的协程(goroutines)和通信机制(channels)来实现并发,避免了直接的内存共享,有效降低了系统复杂度。
协程在多线程间灵活调度,实现高效的并发处理,同时管理栈内存,适合处理大量任务。Go 的并发模型支持确定性并发,通过go关键字启动协程,其栈动态伸缩,无需开发者干预。
在其他语言中,比如 C#,Lua 或者 Python 都有协程的概念。这个名字表明它和 Go协程有些相似,不过有两点不同:
- Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
- Go 协程通过通道来通信;协程通过让出和恢复操作来通信。
性能对比(协程 vs 线程):
特性 | Goroutine | OS Thread |
---|---|---|
创建开销 | ~2KB 内存,微秒级 | ~1MB 内存,毫秒级 |
切换成本 | 用户态调度,纳秒级 | 内核态切换,微秒级 |
最大数量 | 轻松支持 10 万+ | 通常限制在数千 |
协程由 Go 运行时调度(GMP 模型),在用户态实现高效并发,适合 I/O 密集型任务。计算密集型任务需结合 runtime.GOMAXPROCS
控制线程数。
2.Go 协程基本内容
2.1 channel
- 在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。
- 通道的发送和接收都是原子操作。
- 发送操作(协程或者函数中的),在接收者准备好之前是阻塞的;如果通道中没有数据,接收者就阻塞。使用带缓冲通道可以实现异步非阻塞。
var send_only chan<- int // 只接收通道,无法关闭
var recv_only <-chan int // 只发送通道func main() {ch := make(chan string)buf := 100channelWithBuffer := make(chan string, buf) #带缓冲通道go sendMsg(ch)go receiveMsg(ch)time.Sleep(1e9)
}func sendMsg(ch chan string) {ch <- "hello"ch <- "world"
}func receiveMsg(ch chan string) {var msg stringfor {msg = <-chfmt.Println(msg)}
}
- 如果在程序结束之前,向通道写值的协程未完成工作,则这个协程不会被垃圾回收。
一个通道被其发送数据协程队列和接收数据协程队列中的所有协程引用着。因此,如果一个通道的这两个队列只要有一个不为空(可达),则此通道肯定不会被垃圾回收。另一方面,如果一个协程处于一个通道的某个协程队列之中,则此协程也肯定不会被垃圾回收,即使此通道仅被此协程所引用。一个协程只有在退出后才能被垃圾回收。
- 通道在创建后(通常使用make函数)会持有一定量的内存。只有在以下两种情况下,该内存才会被释放:
- 通道关闭并且没有其他引用(包括发送和接收操作)。
- 通道变得不可达。
如何优雅的关闭通道可参考这里。
2.2 select
select 是 Go 语言中处理多通道操作的核心控制结构,专为并发编程设计。它允许 goroutine 同时等待多个通道操作,类似于 switch 语句,但专门用于通道(channel)。
select {case u := <-ch1:...case v := <-ch2:......default: // no value ready to be received...
}
select 做的就是:选择处理列出的多个通信情况中的一个。
- 如果都阻塞了,会等待直到其中一个可以处理。
- 如果多个可以处理,随机选择一个。
- 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的。
- 如果没有 default,select 就会一直阻塞。
2.3 future 模式
Future 模式是一种并发设计模式,它允许你启动一个异步任务并立即返回一个"占位符"(Future对象),你可以在稍后需要结果时从这个对象中获取计算结果。
Go 语言中没有内置的 Future 类型,但可以通过 goroutine 和 channel 轻松实现。
func InverseProduct(a Matrix, b Matrix) {a_inv_future := InverseFuture(a) // start as a goroutineb_inv_future := InverseFuture(b) // start as a goroutinea_inv := <-a_inv_futureb_inv := <-b_inv_futurereturn Product(a_inv, b_inv)
}func InverseFuture(a Matrix) chan Matrix {future := make(chan Matrix)go func() {future <- Inverse(a)}()return future
} public static Future<Matrix> inverseFuture(Matrix matrix) { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Matrix> task = () -> Matrix.Inverse(matrix); Future<Matrix> future = executor.submit(task); // 注意:通常不关闭executor,因为这里只是单个任务 // 但如果你有一个共享的executor,你可能需要管理它的生命周期 return future; } public static Matrix inverseProduct(Matrix a, Matrix b) throws InterruptedException, ExecutionException { Future<Matrix> aInvFuture = inverseFuture(a); Future<Matrix> bInvFuture = inverseFuture(b); Matrix aInv = aInvFuture.get(); // 等待获取a的逆 Matrix bInv = bInvFuture.get(); // 等待获取b的逆 return Matrix.Product(aInv, bInv); }
3. 实践示例
3.1 并发处理多个网络请求
需求:同时请求多个 API 接口,汇总结果后继续处理。
package mainimport ("fmt""io/ioutil""net/http""sync""time"
)func main() {urls := []string{"https://jsonplaceholder.typicode.com/posts/1","https://jsonplaceholder.typicode.com/posts/2","https://jsonplaceholder.typicode.com/posts/3",}// 使用 WaitGroup 等待所有协程完成var wg sync.WaitGroupresults := make(chan string, len(urls)) // 缓冲通道存储结果start := time.Now()// 为每个 URL 启动一个协程for _, url := range urls {wg.Add(1) // 计数器 +1go func(u string) {defer wg.Done() // 协程结束时计数器 -1resp, err := http.Get(u)if err != nil {results <- fmt.Sprintf("Error fetching %s: %v", u, err)return}defer resp.Body.Close()body, _ := ioutil.ReadAll(resp.Body)results <- fmt.Sprintf("Response from %s: %d bytes", u, len(body))}(url) // 注意:显式传递 url 避免闭包陷阱}// 等待所有协程完成go func() {wg.Wait() // 阻塞直到计数器归零close(results) // 关闭通道,通知主协程}()// 从通道读取结果for res := range results {fmt.Println(res)}fmt.Printf("Total time: %v\n", time.Since(start))
}
关键知识点解析:
-
协程创建
go func() { ... }
启动协程,轻量级(初始仅 2KB 栈)。 -
同步控制
sync.WaitGroup
等待协程组完成(Add(), Done(), Wait())。 -
通道 chan
协程间通信(此处用缓冲通道避免阻塞)。 -
闭包陷阱
循环中启动协程时,通过参数传递当前值(url),避免共享变量问题。 -
资源释放
defer resp.Body.Close()
确保 HTTP 响应体关闭。
3.2 工作池(Worker Pool)
需求:限制并发数,避免资源耗尽。
func worker(id int, jobs <-chan int, results chan<- int) {for j := range jobs {fmt.Printf("Worker %d processing job %d\n", id, j)time.Sleep(time.Second) // 模拟耗时任务results <- j * 2}
}func main() {const numJobs = 10const numWorkers = 3jobs := make(chan int, numJobs)results := make(chan int, numJobs)// 启动固定数量的工作协程for w := 1; w <= numWorkers; w++ {go worker(w, jobs, results)}// 发送任务for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs) // 关闭通道,通知 worker 退出// 收集结果for r := 1; r <= numJobs; r++ {<-results}
}
3.3 协程使用最佳实践
- 控制并发量
用带缓冲的通道或信号量(sem := make(chan struct{}, maxConcurrency))限制协程数量。
- 避免泄漏
确保协程能正常退出(如通过 context 取消)。
使用 defer 释放资源(文件句柄、网络连接)。
- 错误处理
在协程内部捕获 panic:
go func() {defer func() {if r := recover(); r != nil {log.Println("Recovered in goroutine:", r)}}()// 业务代码...
}()
- 优先用通道通信
遵循 Go 哲学:“不要通过共享内存来通信,而要通过通信来共享内存”。
4. 并发安全
4.1 happens-before 原则
与 goroutine 有关的 happens-before 保证场景有:
- goroutine的创建happens before其执行
- goroutine的完成不保证happens-before任何代码
4.2 并发安全类型
安全 | 不安全 |
---|---|
字节、布尔、整型、浮点型、字符型、atomic.Value(乐观锁)、指针、函数 | string、struct、复数型、数组、切片、映射、通道、接口 |
5. GMP 模型
5.1 GMP
Goroutine:是对 Go 中代码片段的封装,其实是一种轻量级的用户线程。
Machine:一个 machine 对应一个内核线程,相当于内核线程在 Go 进程中的映射。
Processor:一个 prcessor 表示执行 Go 代码片段的所必需的上下文环境,可以理解为用户代码逻辑的处理器。
每一个 M 都会以一个内核线程绑定,M 和 P 之间也是一对一的关系,而 P 和 G 的关系则是一对多。在运行过程中,M 和 内核线程之间对应关系的不会变化,在 M 的生命周期内,它只会与一个内核线程绑定,而 M 和 P 以及 P 和 G 之间的关系都是动态可变的。
M 和 P 必须组合使用才能为 G 提供有效的运行环境。多个可执行的 G 会按顺序排成队列,挂载在某个P上等待调度执行。
M 的创建一般是因为没有足够的 M 来和 P 组合以为 G 提供运行环境,在很多时候 M 的数量可能会比 P 要多。在单个 Go 进程中,P 的最大数量决定了程序的并发规模,且 P 的最大数量是由程序决定的。可以通过修改环境变量 GOMAXPROCS 和 调用函数 runtime#GOMAXPROCS
来设定 P 的最大值。
M 和 P 会适时的组合和断开,保证 P 中的待执行 G 队列能够得到及时运行。比如说上图中的 G0 此时因为网络 I/O 而阻塞了 M,那么 P 就会携带剩余的 G 投入到其他 M 的怀抱中。这个新的 M1 可能是新创建的,也可能是从调度器空闲 M 列表中获取的,取决于此时的调度器空闲 M 列表中是否存在 M,从而避免 M 的过多创建。
当 M 对应的内核线程被唤醒时,M 将会尝试为 G0 捕获一个 P 上下文,可能是从调度器的空闲 P 列表中获取,如果获取不成功,M 会被 G0 放入到调度器的可执行 G 队列中,等待其他 P 的查找。为了保证 G 的均衡执行,非空闲的 P 会运行完自身的可执行 G 队列中,会周期性从调度器的可执行 G 队列中获取代执行的 G,甚至从其他的 P 的可执行 G 队列中掠夺 G。
5.2 协程数量
CPU 密集型: 如果是CPU密集型应用,并且持续的时间很长,这时CPU就会优先达到瓶颈。因此,应当限制goroutine的数量,以避免过多的上下文切换。
IO密集型: 如果是IO密集型应用,则可以开启大量的goroutine,理论上内存会首先成为瓶颈(比如程序执行空操作)。因为 IO 操作相对较慢,goroutine在等待IO时会被阻塞,减少了CPU的使用。
6. 协程池 ants
ants 是一个高性能的 Golang 协程池库,通过复用协程(goroutine)显著减少频繁创建销毁的开销,特别适合高并发场景。
6.1 特性
- 自动管理和回收大量 goroutine。
- 定期清除过期的 goroutine。
- 丰富的API:提交任务,获取运行 goroutine 的数量,动态调整池的容量,释放池,重新启动池
- 优雅地处理死机以防止程序崩溃。
- 高效的内存使用,甚至比 Golang 中的无限 goroutine 实现了更高的性能。
- 非阻塞机制。
6.2 运行流程
- Pool :Ants 协程池核心结构。
- WorkerArray:Pool池中的worker队列,存放所有的 Worker。
- goWorker:运行任务的实际执行者,它启动一个 goroutine 来接受任务并执行函数调用。
- sync.Pool:golang 标准库下并发安全的对象池,缓存申请用于之后的重用,以减轻 GC 的压力。
- spinLock:基于CAS机制和指数退避算法实现的一种自旋锁。
参考文献
panjf2000/ants
如何优雅地关闭 channel?