Go语言的协程池Ants
本小节主要涉及的内容就是goland里面大名鼎鼎的ants协程池库的实现原理,由于实现的过程中涉及sync库下的几工具,所以需要大家有所了解,这些内容也可以参考我之前的文章
- sync.Mutex 锁(lock)
- sync.Pool sync.Pool
- sync.Cond Cond和Once
- sync.atomic 原子操作(atomic)
如果在不了解这些内容的同学可以先去看看,知道常用的方法即可,那么话不多说,开始今天的课程
一.Ants介绍
1.1 sync.Locker实现的自旋锁
type Locker interface {Lock()Unlock()
}
这个其实就是sync包提供的一个锁接口,可以自己实现一把锁,在ants里面就自行实现了一把锁,作者不希望使用Mutex这把重锁,而是自定义实现的一种轻量级的自旋锁
package syncimport ("runtime""sync""sync/atomic"
)type spinLock uint32const maxBackoff = 16func (sl *spinLock) Lock() {backoff := 1for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.for i := 0; i < backoff; i++ {runtime.Gosched()}if backoff < maxBackoff {// 左移一位,就相当于是*=2backoff <<= 1}}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}
该锁实现原理:
(1)通过一个整型状态值标识锁的状态:0-未加锁;1-加锁;
(2)加锁成功时,即把 0 改为 1;解锁时则把 1 改为 0;改写过程均通过 atomic 包保证并发安全;
(3)加锁通过 for 循环 + cas 操作实现自旋,无需操作系统介入执行 park 操作;
(4)通过变量 backoff 反映抢锁激烈度,每次抢锁失败,执行 backoff 次让 cpu 时间片动作;backoff 随失败次数逐渐升级,封顶 16.
1.2 为什么使用协程池呢
在了解过对象池sync.Pool的同学肯定不陌生,目的就是为了复用,提高性能等,方便管理协程的声明周期,防止协程无限制的创建和销毁。
1.3 Ants的底层数据结构
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}
这里主要说一下workerQueue和workerCache的区别,前者时候可以复用的goWorker对象列表,存放的是可以工作的goroutine对象,后者则是销毁那些长时间不使用的goroutine。
(注意图上的是之前版本,现在已经把workerArray改为workerQueue)
goWorker
首先来看看goWorker的结构
type goWorker struct {worker// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}
goWorker 可以简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务,其核心字段包含:
(1)pool:goWorker 所属的协程池
(2)task:goWorker 用于接收异步任务包的管道
(3)lastUsed:goWorker 回收到协程池的时间
这里的worker就是一个结构,可以把它理解为工作者。goWorker继承和重定义这些函数。
那他是如何接收到外部的任务的呢?
task chan func(),就是通过这个字段,通过一个任务管道,在内部遍历,从而执行这个函数。
Pool
接着看一下Ants下的Pool结构吧:
type Pool struct {*poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}
其实就是对poolCommon的一个封装,这里在简单的介绍一下
(1)capacity:池子的容量
(2)running:出于运行中的协程数量
(3)lock:自制的自旋锁,保证取 goWorker 时并发安全
(4)workers:goWorker 列表,即“真正意义上的协程池”
(5)state:池子状态标识,0-打开;1-关闭
(6)cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程
(7)workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的 goWorker 仍存活,进入对象池的 goWorker 严格意义上已经销毁;
(8)waiting:标识出于等待状态的协程数量;
(9)heartbeatDone:标识回收协程是否关闭;
(10)stopHeartbeat:用于关闭回收协程的控制器函数;
(11)options:一些定制化的配置.
Options
在来看看,提供了哪些配置吧:
type Options struct {ExpiryDuration time.DurationPreAlloc boolMaxBlockingTasks intNonblocking boolPanicHandler func(any)Logger LoggerDisablePurge bool
}
- ExpiryDuration:清理协程的扫描周期。清理协程会每隔
ExpiryDuration
时间扫描一次所有 worker,并清除那些未被使用超过ExpiryDuration
的 worker - PreAlloc:初始化协程池时是否预分配内存。若为
true
,池会提前分配内存以减少运行时的动态分配开销。 - MaxBlockingTasks:限制通过
pool.Submit
提交任务时,最多允许阻塞的 goroutine 数量。若超过此限制,提交任务会返回错误。 - Nonblocking:若为
true
,则pool.Submit
永远不会阻塞。若任务无法立即提交(如无空闲 worker 且池已满),会直接返回ErrPoolOverload
错误 - PanicHandler:捕获 worker goroutine 中的 panic。若未设置,panic 会直接抛出到 worker 的 goroutine 中,可能导致程序崩溃。
- Logger:自定义日志器,用于记录协程池的运行信息(如 worker 创建、清理等)
- DisablePurge:若为
true
,则 worker 不会被自动清理,即使空闲时间超过ExpiryDuration
也会常驻内存。
在Ants里面参数也是设置为函数式编程,在后续也会介绍到,可以先不着急。
workerQueue
言归正传,回到我们的goWorker上面,对于这些任务,需要一个装载记录他们的组合,他就是workerQueue
type workerQueue interface {len() intisEmpty() boolinsert(worker) error// 获取goworkerdetach() worker// 将空闲时间过长的goworker进行回收refresh(duration time.Duration) []worker reset()
}
在他的下面有两个实现,一个是栈,一个是队列的
对于回收机制,还记得之前说过的一个字段lastUsed time.Time,他就是最后一次使用后放回的时间
1.4 Ants的核心api
Ants提供了两个核心api供我们使用,一个是NewPool,另外一个是Sumbit。
NewPool:创建一个协程池
Submit:把任务提交到协程池,有后续的协程运行。
// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {pc, err := newPool(size, options...)if err != nil {return nil, err}pool := &Pool{poolCommon: pc}pool.workerCache.New = func() any {return &goWorker{pool: pool,task: make(chan func(), workerChanCap),}}return pool, nil
}
func newPool(size int, options ...Option) (*poolCommon, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &poolCommon{capacity: int32(size),allDone: make(chan struct{}),lock: syncx.NewSpinLock(),once: &sync.Once{},options: opts,}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}
接下来看看Submit函数,它的实现:
它主要做的就是从Pool中取出一个可用的goWorker,将用户提交的任务包添加goWorker的channel里面去。
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}
func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}
// retrieveWorker returns an available worker to run the tasks.
func (p *poolCommon) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(worker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}
在之前讲解goWorker的函数的时候,只看了它的结构接下来看看它提供的一些方法:
package antsimport ("runtime/debug""time"
)type goWorker struct {workerpool *Pooltask chan func()lastUsed time.Time
}func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for fn := range w.task {if fn == nil {return}fn()if ok := w.pool.revertWorker(w); !ok {return}}}()
}// 完成任务将nil加入任务队列
func (w *goWorker) finish() {w.task <- nil
}
// 获取最后一次使用完放回池子的时间
func (w *goWorker) lastUsedTime() time.Time {return w.lastUsed
}
// 设置时间
func (w *goWorker) setLastUsedTime(t time.Time) {w.lastUsed = t
}
// 将任务加入channel
func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}
主要就是看这个run函数
(1)循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;
(2)执行完成 task 后,会将自己交还给协程池;
(3)倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.
在上述过程中看到了revertWorker,他其实就是pool进行回收使用过的协程
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *poolCommon) revertWorker(worker worker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.setLastUsedTime(p.nowTime())p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}
Pool.revertWorker 方法用于回收 goWorker 回到协程池:
(1)回收时更新 goWorker 回收时间,用于 goWorker 的定期清理;
(2)加锁后,将 goWorker 添加回协程池;
(3)通过协调器 cond 唤醒下一个阻塞等待的协程,并解锁.
还有最后一点,就是如何定期回收goworker?
func (p *poolCommon) purgeStaleWorkers() {ticker := time.NewTicker(p.options.ExpiryDuration)defer func() {ticker.Stop()atomic.StoreInt32(&p.purgeDone, 1)}()purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()for {select {case <-purgeCtx.Done():returncase <-ticker.C:}if p.IsClosed() {break}var isDormant boolp.lock.Lock()staleWorkers := p.workers.refresh(p.options.ExpiryDuration)n := p.Running()isDormant = n == 0 || n == len(staleWorkers)p.lock.Unlock()// Clean up the stale workers.for i := range staleWorkers {staleWorkers[i].finish()staleWorkers[i] = nil}// There might be a situation where all workers have been cleaned up (no worker is running),// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.if isDormant && p.Waiting() > 0 {p.cond.Broadcast()}}
}
(1)purgeStaleWorkers 方法开启了一个 ticker,按照用户预设的过期时间间隔轮询回收过期的 goWorker;
(2)回收的方式是往对应 goWorker 的 channel 中注入一个空值,goWorker 将会自动将自身放回协程池的对象池 workerCache 当中;
(3)倘若当前存在空闲的 goWorker 且有协程阻塞等待,会唤醒所有阻塞协程.