当前位置: 首页 > news >正文

Go语言的协程池Ants

本小节主要涉及的内容就是goland里面大名鼎鼎的ants协程池库的实现原理,由于实现的过程中涉及sync库下的几工具,所以需要大家有所了解,这些内容也可以参考我之前的文章

  • sync.Mutex 锁(lock)
  • sync.Pool sync.Pool
  • sync.Cond Cond和Once
  • sync.atomic 原子操作(atomic)

如果在不了解这些内容的同学可以先去看看,知道常用的方法即可,那么话不多说,开始今天的课程

一.Ants介绍

1.1 sync.Locker实现的自旋锁

type Locker interface {Lock()Unlock()
}

这个其实就是sync包提供的一个锁接口,可以自己实现一把锁,在ants里面就自行实现了一把锁,作者不希望使用Mutex这把重锁,而是自定义实现的一种轻量级的自旋锁

package syncimport ("runtime""sync""sync/atomic"
)type spinLock uint32const maxBackoff = 16func (sl *spinLock) Lock() {backoff := 1for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.for i := 0; i < backoff; i++ {runtime.Gosched()}if backoff < maxBackoff {// 左移一位,就相当于是*=2backoff <<= 1}}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}

该锁实现原理:

(1)通过一个整型状态值标识锁的状态:0-未加锁;1-加锁;

(2)加锁成功时,即把 0 改为 1;解锁时则把 1 改为 0;改写过程均通过 atomic 包保证并发安全;

(3)加锁通过 for 循环 + cas 操作实现自旋,无需操作系统介入执行 park 操作;

(4)通过变量 backoff 反映抢锁激烈度,每次抢锁失败,执行 backoff 次让 cpu 时间片动作;backoff 随失败次数逐渐升级,封顶 16.

1.2 为什么使用协程池呢

在了解过对象池sync.Pool的同学肯定不陌生,目的就是为了复用,提高性能等,方便管理协程的声明周期,防止协程无限制的创建和销毁。

1.3 Ants的底层数据结构

type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

这里主要说一下workerQueue和workerCache的区别,前者时候可以复用的goWorker对象列表,存放的是可以工作的goroutine对象,后者则是销毁那些长时间不使用的goroutine。

(注意图上的是之前版本,现在已经把workerArray改为workerQueue)

goWorker

首先来看看goWorker的结构

type goWorker struct {worker// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}

goWorker 可以简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务,其核心字段包含:

(1)pool:goWorker 所属的协程池

(2)task:goWorker 用于接收异步任务包的管道

(3)lastUsed:goWorker 回收到协程池的时间

这里的worker就是一个结构,可以把它理解为工作者。goWorker继承和重定义这些函数。

那他是如何接收到外部的任务的呢?

task chan func(),就是通过这个字段,通过一个任务管道,在内部遍历,从而执行这个函数。

Pool

接着看一下Ants下的Pool结构吧:

type Pool struct {*poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

其实就是对poolCommon的一个封装,这里在简单的介绍一下

(1)capacity:池子的容量

(2)running:出于运行中的协程数量

(3)lock:自制的自旋锁,保证取 goWorker 时并发安全

(4)workers:goWorker 列表,即“真正意义上的协程池”

(5)state:池子状态标识,0-打开;1-关闭

(6)cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程

(7)workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的 goWorker 仍存活,进入对象池的 goWorker 严格意义上已经销毁;

(8)waiting:标识出于等待状态的协程数量;

(9)heartbeatDone:标识回收协程是否关闭;

(10)stopHeartbeat:用于关闭回收协程的控制器函数;

(11)options:一些定制化的配置.

Options

在来看看,提供了哪些配置吧:

type Options struct {ExpiryDuration time.DurationPreAlloc boolMaxBlockingTasks intNonblocking boolPanicHandler func(any)Logger LoggerDisablePurge bool
}
  1. ExpiryDuration:清理协程的扫描周期。清理协程会每隔 ExpiryDuration 时间扫描一次所有 worker,并清除那些未被使用超过 ExpiryDuration 的 worker
  2. PreAlloc:初始化协程池时是否预分配内存。若为 true,池会提前分配内存以减少运行时的动态分配开销。
  3. MaxBlockingTasks:限制通过 pool.Submit 提交任务时,最多允许阻塞的 goroutine 数量。若超过此限制,提交任务会返回错误。
  4. Nonblocking:若为 true,则 pool.Submit 永远不会阻塞。若任务无法立即提交(如无空闲 worker 且池已满),会直接返回 ErrPoolOverload 错误
  5. PanicHandler:捕获 worker goroutine 中的 panic。若未设置,panic 会直接抛出到 worker 的 goroutine 中,可能导致程序崩溃。
  6. Logger:自定义日志器,用于记录协程池的运行信息(如 worker 创建、清理等)
  7. DisablePurge:若为 true,则 worker 不会被自动清理,即使空闲时间超过 ExpiryDuration 也会常驻内存。

在Ants里面参数也是设置为函数式编程,在后续也会介绍到,可以先不着急。

workerQueue

言归正传,回到我们的goWorker上面,对于这些任务,需要一个装载记录他们的组合,他就是workerQueue

type workerQueue interface {len() intisEmpty() boolinsert(worker) error// 获取goworkerdetach() worker// 将空闲时间过长的goworker进行回收refresh(duration time.Duration) []worker reset()
}

在他的下面有两个实现,一个是栈,一个是队列的

对于回收机制,还记得之前说过的一个字段lastUsed time.Time,他就是最后一次使用后放回的时间

1.4 Ants的核心api

Ants提供了两个核心api供我们使用,一个是NewPool,另外一个是Sumbit。

NewPool:创建一个协程池

Submit:把任务提交到协程池,有后续的协程运行。

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {pc, err := newPool(size, options...)if err != nil {return nil, err}pool := &Pool{poolCommon: pc}pool.workerCache.New = func() any {return &goWorker{pool: pool,task: make(chan func(), workerChanCap),}}return pool, nil
}
func newPool(size int, options ...Option) (*poolCommon, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &poolCommon{capacity: int32(size),allDone:  make(chan struct{}),lock:     syncx.NewSpinLock(),once:     &sync.Once{},options:  opts,}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}

接下来看看Submit函数,它的实现:

它主要做的就是从Pool中取出一个可用的goWorker,将用户提交的任务包添加goWorker的channel里面去。

func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}
func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}
// retrieveWorker returns an available worker to run the tasks.
func (p *poolCommon) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(worker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}

在之前讲解goWorker的函数的时候,只看了它的结构接下来看看它提供的一些方法:

package antsimport ("runtime/debug""time"
)type goWorker struct {workerpool *Pooltask chan func()lastUsed time.Time
}func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for fn := range w.task {if fn == nil {return}fn()if ok := w.pool.revertWorker(w); !ok {return}}}()
}// 完成任务将nil加入任务队列
func (w *goWorker) finish() {w.task <- nil
}
// 获取最后一次使用完放回池子的时间
func (w *goWorker) lastUsedTime() time.Time {return w.lastUsed
}
// 设置时间
func (w *goWorker) setLastUsedTime(t time.Time) {w.lastUsed = t
}
// 将任务加入channel
func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}

主要就是看这个run函数

(1)循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;

(2)执行完成 task 后,会将自己交还给协程池;

(3)倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.

在上述过程中看到了revertWorker,他其实就是pool进行回收使用过的协程

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *poolCommon) revertWorker(worker worker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.setLastUsedTime(p.nowTime())p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}

Pool.revertWorker 方法用于回收 goWorker 回到协程池:

(1)回收时更新 goWorker 回收时间,用于 goWorker 的定期清理;

(2)加锁后,将 goWorker 添加回协程池;

(3)通过协调器 cond 唤醒下一个阻塞等待的协程,并解锁.

还有最后一点,就是如何定期回收goworker?

func (p *poolCommon) purgeStaleWorkers() {ticker := time.NewTicker(p.options.ExpiryDuration)defer func() {ticker.Stop()atomic.StoreInt32(&p.purgeDone, 1)}()purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()for {select {case <-purgeCtx.Done():returncase <-ticker.C:}if p.IsClosed() {break}var isDormant boolp.lock.Lock()staleWorkers := p.workers.refresh(p.options.ExpiryDuration)n := p.Running()isDormant = n == 0 || n == len(staleWorkers)p.lock.Unlock()// Clean up the stale workers.for i := range staleWorkers {staleWorkers[i].finish()staleWorkers[i] = nil}// There might be a situation where all workers have been cleaned up (no worker is running),// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.if isDormant && p.Waiting() > 0 {p.cond.Broadcast()}}
}

(1)purgeStaleWorkers 方法开启了一个 ticker,按照用户预设的过期时间间隔轮询回收过期的 goWorker;

(2)回收的方式是往对应 goWorker 的 channel 中注入一个空值,goWorker 将会自动将自身放回协程池的对象池 workerCache 当中;

(3)倘若当前存在空闲的 goWorker 且有协程阻塞等待,会唤醒所有阻塞协程.

http://www.dtcms.com/a/266894.html

相关文章:

  • yolo性能评价指标(训练后生成文件解读)results、mAP、Precision、Recall、FPS、Confienc--笔记
  • 韩顺平之第九章综合练习-----------房屋出租管理系统
  • 从0写自己的操作系统(3)x86操作系统的中断和异常处理
  • 02每日简报20250704
  • Spring Boot + 本地部署大模型实现:安全性与可靠性保障
  • 高档宠物食品对宠物的健康益处有哪些?
  • MySQL/MariaDB数据库主从复制之基于二进制日志的方式
  • 如何查看自己电脑的显卡信息?
  • 力扣hot100题(1)
  • C++26 下一代C++标准
  • 通用人工智能三大方向系统梳理
  • 学习者的Python项目灵感
  • 【python实用小脚本-128】基于 Python 的 Hacker News 爬虫工具:自动化抓取新闻数据
  • [数据结构]详解红黑树
  • 小架构step系列04:springboot提供的依赖
  • mobaxterm终端sqlplus乱码问题解决
  • 使用循环抵消算法求解最小费用流问题
  • opencv的颜色通道问题 rgb bgr
  • 智绅科技:以科技为翼,构建养老安全守护网
  • Vue中对象赋值问题:对象引用被保留,仅部分属性被覆盖
  • 八股学习(三)---MySQL
  • 高流量发布会,保障支付系统稳定运行感想
  • Flink-05学习 接上节,将FlinkJedisPoolConfig 从Kafka写入Redis
  • 关于python
  • Javaweb - 10.2 Servlet
  • 【51单片机倒计时选位最右侧2位显示秒钟后最左侧1位显示8两秒后复位初始状态2个外部中断组合按键功能】2022-7-5
  • 数据库位函数:原理、应用与性能优化
  • Nuxt 3 面试题合集(中高级)
  • 在 C++ 中,判断 `std::string` 是否为空字符串
  • 【贪心】P2660 zzc 种田