协程池介绍
协程池简单理解就是有一个池子一样的东西,里面装着固定数量的goroutine,当有一个任务到来的时候,会将这个任务交给池子里的一个空闲的goroutine去处理,如果池子里没有空闲的goroutine了,任务就会阻塞等待。所以协程池有三个角色Worker,Task,Pool。
相关角色和方法
协程池所有的角色:task 任务,poll 池子,worker 任务执行单元
属性定义:
Task:具体的任务
Pool:池子
worker:用于执行任务的 goroutine
方法定义:
NewTask:创建任务
NewPool:创建协程池
AddTask:向协程池添加任务
run:worker 的逻辑,开始执行
incRunning:增加 worker 的数量
decRunning:减少 worker 的数量
getRunningWorkers:获得当前正在工作的 worker 数量
getCap:获得当前 worker 的容量
注意:用 for range 遍历 channel 时,若通道未关闭且无数据就会被阻塞;若通道已关闭且无数据则不会阻塞,会直接退出循环。
Coding
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Task struct {
f func() error //具体的任务逻辑
}
type Pool struct {
RunningWorkers int64 //运行着的 worker 数量
Capacity int64 //协程池 worker 数量
JobCh chan *Task //任务存放的位置
sync.Mutex
}
func NewTask(funcArg func() error) *Task {
return &Task{
f: funcArg,
}
}
func NewPool(capacity int64, taskNum int64) *Pool {
return &Pool{
Capacity: capacity,
JobCh: make(chan *Task, taskNum),
}
}
func (p *Pool) GetCap() int64 {
return p.Capacity
}
func (p *Pool) incRunning() {
atomic.AddInt64(&p.RunningWorkers, 1)
}
func (p *Pool) decRunning() {
atomic.AddInt64(&p.RunningWorkers, -1)
}
func (p *Pool) getRunningWorkers() int64 {
return atomic.LoadInt64(&p.RunningWorkers)
}
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
}()
// 如果 channel 没有 task,并且也没有被 close()
// 那么会一直阻塞在这里,为了正确的退出,我们在 main 函数里设置了 3s 的等待时间,确保程序正确退出
for task := range p.JobCh {
task.f()
}
}()
}
// AddTask 往协程池添加任务
func (p *Pool) AddTask(task *Task) {
//加锁,防止启动多个 worker,导致下面的判断出问题
p.Lock()
defer p.Unlock()
if p.getRunningWorkers() < p.GetCap() {
//启动创建一个 worker
p.run()
}
//将任务放入 channel,等待消费
p.JobCh <- task
}
func main() {
//创建协程池:3个协程,容量为 10 的任务队列 channel
pool := NewPool(3, 10)
for i := 0; i < 20; i++ {
//将任务放入池中
pool.AddTask(NewTask(func() error {
fmt.Printf("I am Task \n")
return nil
}))
}
//确保 20 个任务都执行完成
time.Sleep(time.Second * 3)
}