go语言实现协程池
代码:
package mainimport ("sync"
)// GoPool 协程池结构
type GoPool struct {taskChannel chan func() // 任务存放的队列taskMutex sync.Mutex // 用于保证计数正确性的锁maxWorkerNum int64 // 协程最大数curWorkerNum int64 // 当前协程数waitGroup sync.WaitGroup // 用于等待所有任务完成
}// NewGoPoolWithCapacity 创建指定协程数量和最大任务容量的协程池
func NewGoPoolWithCapacity(workerNum int64, maxTask int) *GoPool {if workerNum < 1 || maxTask < 1 {panic("invalid go pool param")}return &GoPool{taskChannel: make(chan func(), maxTask),maxWorkerNum: workerNum,curWorkerNum: 0,}
}// NewGoPool 创建指定协程数量的协程池,默认最大任务容量为 1024
func NewGoPool(workerNum int64) *GoPool {return NewGoPoolWithCapacity(workerNum, 1024)
}// Go 提交一个任务到协程池
func (p *GoPool) Go(newTask func()) {if newTask == nil {return}p.taskMutex.Lock()// 对原有任务添加释放 wait group 功能p.waitGroup.Add(1)innerTask := func() {defer p.waitGroup.Done()newTask()}if p.curWorkerNum < p.maxWorkerNum {// 如果当前协程数量小于最大协程数量,则新建 goroutinep.curWorkerNum++go func() {defer func() {p.taskMutex.Lock()p.curWorkerNum--p.taskMutex.Unlock()}()innerTask() // 执行当前任务// 执行完当前任务之后,去队列拉一下最新任务for channelTask := range p.taskChannel {channelTask()}}()} else {// 如果当前协程数量大于等于最大协程数量,则当前任务需要排队p.taskChannel <- innerTask}p.taskMutex.Unlock()
}// CloseAndWait 关闭协程池,并等待所有任务执行完成
func (p *GoPool) CloseAndWait() {close(p.taskChannel)p.waitGroup.Wait()
}// CloseWithoutWait 关闭协程池,但是不等待所有任务执行完成
func (p *GoPool) CloseWithoutWait() {close(p.taskChannel)
}
使用:
func main() {// 创建一个最大 5 个 worker 的协程池pool := NewGoPool(5)// 提交 10 个任务for i := 0; i < 10; i++ {taskID := ipool.Go(func() {// 模拟任务执行// fmt.Printf("Task %d is running\n", taskID)// time.Sleep(time.Second)// fmt.Printf("Task %d is done\n", taskID)})}// 关闭协程池并等待所有任务完成pool.CloseAndWait()
}