当前位置: 首页 > news >正文

golang库源码学习——Pond,小而精的工作池库

pond 是一个轻量级的 Goroutine 池库,用于高效管理并发任务。它提供了灵活的配置选项和多种策略,适合处理高并发场景。

GitHub - alitto/pond at v1

一、特点:

1.轻量级

pond 的代码库非常精简,它的V1版本仅有四个业务文件!因此它的体积小,加载速度快。

2.零依赖

只依赖于 Go 的标准库(如 sync、time 等),这个是它最大的特点,其实看代码就能看出来,基本上就是用的chan的封装,但是在这个基础上增加了动态设置的功能

3.稳定性高

因为依赖少,pond 不会因为第三方库的更新或兼容性问题而受到影响,稳定性更高。在复杂的项目环境中,零依赖的库更容易维护和调试

4.易于集成

pond基本可以无缝集成到任何 Go 项目中,无需担心依赖冲突或版本问题

二、场景:

1.嵌入式系统

在资源受限的嵌入式系统中,零依赖的库可以显著减少内存占用和二进制文件大小。

pond 的轻量级特性使其非常适合在嵌入式设备中管理并发任务。

2. 复杂框架

比如业界的负责RPC、HTTP框架,可以减少对原始框架的侵害

3. 微服务架构

在微服务架构中,每个服务通常需要独立部署和运行。零依赖的库可以避免引入不必要的依赖,减少部署复杂度。

pond 可以用于处理微服务中的高并发任务,如请求处理、数据同步等。

4. 高性能计算

在高性能计算场景中,零依赖的库可以减少额外的开销,提升计算效率。

pond 的 Goroutine 池机制可以高效管理并发任务,适合用于并行计算、数据处理等场景。

5. 库开发

如果你正在开发一个 Go 库,并且希望尽量减少对外部依赖的引入,pond 是一个理想的选择。

零依赖的特性可以确保你的库更加通用和易于集成。

三、功能:

1.动态设置 Goroutine 池的大小和工作线程:

package mainimport ("fmt""github.com/alitto/pond"
)func main() {// Create a buffered (non-blocking) pool that can scale up to 100 workers// and has a buffer capacity of 1000 taskspool := pond.New(100, 1000)// Submit 1000 tasksfor i := 0; i < 1000; i++ {n := ipool.Submit(func() {fmt.Printf("Running task #%d\n", n)})}// Stop the pool and wait for all submitted tasks to completepool.StopAndWait()
}

这里例子里面设置了一个1000协程的任务池,并有100个工作线程来处理

我们看下pond的源代码:

func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {// Instantiate the poolpool := &WorkerPool{maxWorkers:   maxWorkers,maxCapacity:  maxCapacity,idleTimeout:  defaultIdleTimeout,strategy:     Eager(),panicHandler: defaultPanicHandler,}// Apply all optionsfor _, opt := range options {opt(pool)}// Make sure options are consistentif pool.maxWorkers <= 0 {pool.maxWorkers = 1}if pool.minWorkers > pool.maxWorkers {pool.minWorkers = pool.maxWorkers}if pool.maxCapacity < 0 {pool.maxCapacity = 0}if pool.idleTimeout < 0 {pool.idleTimeout = defaultIdleTimeout}// Initialize base context (if not already set)if pool.context == nil {Context(context.Background())(pool)}// Create tasks channelpool.tasks = make(chan func(), pool.maxCapacity)

本质上就是创建了一个长度为1000的chan

而启动并执行工作线程的前提这里主要做一个“正在工作的线程”数目的比较,如果

runningWorkerCount大于等于设置的线程,或者还有空闲工作线程,则不再生成新的工作线程

func (p *WorkerPool) incrementWorkerCount() bool {p.mutex.Lock()defer p.mutex.Unlock()runningWorkerCount := p.RunningWorkers()// Reached max workers, do not create a new oneif runningWorkerCount >= p.maxWorkers {return false}// Idle workers available, do not create a new oneif runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {return false}// Execute the resizing strategy to determine if we should create more workersif resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize {return false}// Increment worker countatomic.AddInt32(&p.workerCount, 1)// Increment wait groupp.workersWaitGroup.Add(1)return true
}

    当然这里还有一个策略问题后面会讲到

    2.动态设置 Goroutine 池的大小和工作线程及最小的工作协程数:

    package mainimport ("fmt""github.com/alitto/pond"
    )func main() {// Create an unbuffered (blocking) pool with a fixed // number of workerspool := pond.New(10, 0, pond.MinWorkers(10))// Submit 1000 tasksfor i := 0; i < 1000; i++ {n := ipool.Submit(func() {fmt.Printf("Running task #%d\n", n)})}// Stop the pool and wait for all submitted tasks to completepool.StopAndWait()
    }

    这样设置确保池中始终至少有 10 个 Goroutine,即使没有任务需要处理。当任务到来时,这些 Goroutine 可以立即处理任务,而不需要等待新的,这种设置适合的场景为:

    1. 高并发场景:

    如果你的应用需要处理大量并发任务,设置 MinWorkers 可以确保有足够的 Goroutine 来处理任务,避免任务堆积。

    2. 低延迟场景:

    如果你的应用对响应速度要求较高,设置 MinWorkers 可以减少任务处理的时间,提高整体性能。

    3. 资源敏感场景:

    如果你的应用需要严格控制资源使用,设置 MinWorkers 可以确保 Goroutine 的数量不会低于某个阈值,从而避免资源不足的问题

    其中,这里有一个很好的设计模式,设计模式:函数式选项模式(Functional Options Pattern)

    看下源码:

    func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {// Instantiate the poolpool := &WorkerPool{maxWorkers:   maxWorkers,maxCapacity:  maxCapacity,idleTimeout:  defaultIdleTimeout,strategy:     Eager(),panicHandler: defaultPanicHandler,}// Apply all optionsfor _, opt := range options {opt(pool)}...
    }
    
    func MinWorkers(minWorkers int) Option {return func(pool *WorkerPool) {pool.minWorkers = minWorkers}
    }

    可以看出来New方法这里传参都是函数式的,并通过opt进行执行

    这样就是典型的函数式选项模式(Functional Options Pattern)

    这种模式的核心思想是:

    ·通过传递函数来配置对象,而不是直接传递参数

    ·每个函数负责设置对象的一个特定属性

    为什么使用函数式选项模式?
    可扩展性

    如果直接在 New 函数中传递参数,当需要新增配置选项时,必须修改 New 函数的签名,这会导致破坏性变更(Breaking Change)。

    使用函数式选项模式,可以通过新增函数来扩展配置选项,而无需修改 New 函数的签名。

    灵活性

    函数式选项模式允许用户只设置需要的选项,而忽略其他选项。

    例如,pond.New 可以接受任意数量的配置函数,用户可以根据需求选择性地传递这些函数。

    可读性

    通过函数式选项模式,代码的可读性更高。每个配置函数都有一个明确的名称,可以直观地表达其作用。

    例如,pond.MinWorkers(10) 比直接传递一个 10 更容易理解。

    默认值

    函数式选项模式可以方便地为配置选项提供默认值。如果用户没有传递某个配置函数,则使用默认值。

    3.动态设置 Goroutine 池的大小和工作线程及任务组和上下文:

      单独创建组:

    package mainimport ("fmt""github.com/alitto/pond"
    )func main() {// Create a poolpool := pond.New(10, 1000)defer pool.StopAndWait()// Create a task groupgroup := pool.Group()// Submit a group of tasksfor i := 0; i < 20; i++ {n := igroup.Submit(func() {fmt.Printf("Running group task #%d\n", n)})}// Wait for all tasks in the group to completegroup.Wait()
    }

    创建组及设置组内上下文:

    package mainimport ("context""fmt""net/http""github.com/alitto/pond"
    )func main() {// Create a worker poolpool := pond.New(10, 1000)defer pool.StopAndWait()// Create a task group associated to a contextgroup, ctx := pool.GroupContext(context.Background())var urls = []string{"https://www.golang.org/","https://www.google.com/","https://www.github.com/",}// Submit tasks to fetch each URLfor _, url := range urls {url := urlgroup.Submit(func() error {req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)resp, err := http.DefaultClient.Do(req)if err == nil {resp.Body.Close()}return err})}// Wait for all HTTP requests to complete.err := group.Wait()if err != nil {fmt.Printf("Failed to fetch URLs: %v", err)} else {fmt.Println("Successfully fetched all URLs")}
    }

    此功能为共同任务的子任务提供同步、错误传播和上下文取消功能。类似于 golang.org/x/sync/errgroup 软件包中的 errgroup.Group,并发性受 Worker 池约束。

    这里主要是便于业务管理

    1. 一些灵活的设置:

    比如这是工作线程的自动销毁,为闲时降低工作负载

    // This will create a pool that will remove workers 100ms after they become idle 
    pool := pond.New(10, 1000, pond.IdleTimeout(100 * time.Millisecond))

    比如做一些panic的收集

    
    // Custom panic handler function
    panicHandler := func(p interface{}) {fmt.Printf("Task panicked: %v", p)
    }// This will create a pool that will handle panics using a custom panic handler
    pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))

    2. 工作线程策略:

    这里是一个比较有意思的地方

    池大小调整策略:预设了三种常见场景的策略:进取型、均衡型和懒惰型

    进取型:以提高资源使用率为代价最大化响应速度,在某些情况下可能会降低吞吐量。该策略适用于在大部分时间内以小部分容量运行,偶尔会收到突发任务的工人池。这是默认策略。

    均衡型:试图在响应速度和吞吐量之间找到平衡。它适用于一般用途的工作池,或那些大部分时间都以接近 50%的容量运行的工作池。

    懒惰型:以牺牲响应速度为代价最大化吞吐量。这种策略适用于大部分时间都将以接近最大容量运行的工人池。

    默认是Eager

    下图说明了随着提交任务数量的增加,不同池大小调整策略的行为。每条线代表池中工作进程的数量(池规模),X 轴代表已提交任务的数量(累计)。

    我们看看源码实现:

    var maxProcs = runtime.GOMAXPROCS(0)// Preset pool resizing strategies
    var (// Eager maximizes responsiveness at the expense of higher resource usage,// which can reduce throughput under certain conditions.// This strategy is meant for worker pools that will operate at a small percentage of their capacity// most of the time and may occasionally receive bursts of tasks. It's the default strategy.Eager = func() ResizingStrategy { return RatedResizer(1) }// Balanced tries to find a balance between responsiveness and throughput.// It's suitable for general purpose worker pools or those// that will operate close to 50% of their capacity most of the time.Balanced = func() ResizingStrategy { return RatedResizer(maxProcs / 2) }// Lazy maximizes throughput at the expense of responsiveness.// This strategy is meant for worker pools that will operate close to their max. capacity most of the time.Lazy = func() ResizingStrategy { return RatedResizer(maxProcs) }
    )// ratedResizer implements a rated resizing strategy
    type ratedResizer struct {rate uint64hits uint64
    }// RatedResizer creates a resizing strategy which can be configured
    // to create workers at a specific rate when the pool has no idle workers.
    // rate: determines the number of tasks to receive before creating an extra worker.
    // A value of 3 can be interpreted as: "Create a new worker every 3 tasks".
    func RatedResizer(rate int) ResizingStrategy {if rate < 1 {rate = 1}return &ratedResizer{rate: uint64(rate),}
    }func (r *ratedResizer) Resize(runningWorkers, minWorkers, maxWorkers int) bool {if r.rate == 1 || runningWorkers == 0 {return true}r.hits++return r.hits%r.rate == 1
    }

    可以看到三种策略的本质实现是:基于当前可以运行的CPU核数来判断的

    1.进取型默认为一,即主要需要工作线程,就增加

    2.均衡型为CPU核数的一半,即如果在一个16核的机器上,每增加8个任务,增加一个工作线程

    3.懒惰型为CPU核数,即如果在一个16核的机器上,每增加16个任务,增加一个工作线程

    这里就是为什么进取型适合前端页面API的类型,有时猛的过来一堆任务需要完成,但很多时候并不会有线程过来

    3. 异步工作:

    那提交任务是同步还是异步的?

    答案是可以同步、也可以异步

    分别是TrySubmit和Submit

    func (p *WorkerPool) TrySubmit(task func()) bool {return p.submit(task, false)
    }func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {if task == nil {return}if p.Stopped() {// Pool is stopped and caller must submit the taskif mustSubmit {panic(ErrSubmitOnStoppedPool)}return}// Increment submitted and waiting task counters as soon as we receive a taskatomic.AddUint64(&p.submittedTaskCount, 1)atomic.AddUint64(&p.waitingTaskCount, 1)p.tasksWaitGroup.Add(1)defer func() {if !submitted {// Task was not sumitted to the pool, decrement submitted and waiting task countersatomic.AddUint64(&p.submittedTaskCount, ^uint64(0))atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))p.tasksWaitGroup.Done()}}()// Start a worker as long as we haven't reached the limitif submitted = p.maybeStartWorker(task); submitted {return}if !mustSubmit {// Attempt to dispatch to an idle worker without blockingselect {case p.tasks <- task:submitted = truereturndefault:// Channel is full and can't wait for an idle worker, so need to exitreturn}}// Submit the task to the tasks channel and wait for it to be picked up by a workerp.tasks <- tasksubmitted = truereturn
    }

    通过源码可以知道:

    异步和同步的区别在于,提交任务后,是否要等提交成功再返回

    刚才说到pond本质上是一个chan,长度固定没,如果任务满了,再提交任务,chan会堵塞

    所以如果是异步提交就不会堵塞

    这里保证的服务不会卡在这里

    在很多程序中推荐使用TrySubmit

     可观测

    能看的数据:

    pool.RunningWorkers() int: Current number of running workers

    pool.IdleWorkers() int: Current number of idle workers

    pool.MinWorkers() int: Minimum number of worker goroutines

    pool.MaxWorkers() int: Maxmimum number of worker goroutines

    pool.MaxCapacity() int: Maximum number of tasks that can be waiting in the queue at any given time (queue capacity)

    pool.SubmittedTasks() uint64: Total number of tasks submitted since the pool was created

    pool.WaitingTasks() uint64: Current number of tasks in the queue that are waiting to be executed

    pool.SuccessfulTasks() uint64: Total number of tasks that have successfully completed their exection since the pool was created

    pool.FailedTasks() uint64: Total number of tasks that completed with panic since the pool was created

    pool.CompletedTasks() uint64: Total number of tasks that have completed their exection either successfully or with panic since the pool was created

    所以虽然代码量很少,但依然有做指标监控,使用的时候可以做日志上报

    相关文章:

  • 移动端前端调试调研纪实:从痛点出发,到 WebDebugX 的方案落地
  • 【C++ 真题】P1075 [NOIP 2012 普及组] 质因数分解
  • 论文篇-1.4.一篇好的论文是改出来的
  • 【18. 四数之和 】
  • 内存屏障指令
  • 人工智能价值:技术革命下的职业新坐标
  • 信息系统项目进度管理实践:从规划到控制的全流程解析
  • 【笔记】快速安装Poetry
  • 趣味编程:抽象图(椭圆组成)
  • 【Python-Day 16】代码复用基石:详解 Python 函数的定义与调用
  • C++继承:从生活实例谈面向对象的精髓
  • 学习Android(十二)Thread
  • 深度解析 Element Plus
  • java上机测试错题回顾(1)
  • Ubuntu学习记录
  • EXIST与JOIN连表比较
  • Flink基本理解
  • 缓存穿透、缓存击穿、缓存雪崩解决方案
  • MySQL 索引详解与原理分析
  • Typescript总结篇——配置TS、基础知识(类型、接口、类型别名、泛型、extendsinfer关键字)
  • 手机网站怎么打开/直接进网站的浏览器
  • 查询网站whois/外贸建站网站推广
  • 网页源代码在线查看/seo网站优化是什么
  • 湘潭房产网站建设/北京网站建设公司哪家好