Go语言的协程池Ants

发布于:2025-07-06 ⋅ 阅读:(20) ⋅ 点赞:(0)

本小节主要涉及的内容就是goland里面大名鼎鼎的ants协程池库的实现原理,由于实现的过程中涉及sync库下的几工具,所以需要大家有所了解,这些内容也可以参考我之前的文章

如果在不了解这些内容的同学可以先去看看,知道常用的方法即可,那么话不多说,开始今天的课程

一.Ants介绍

1.1 sync.Locker实现的自旋锁

type Locker interface {
	Lock()
	Unlock()
}

这个其实就是sync包提供的一个锁接口,可以自己实现一把锁,在ants里面就自行实现了一把锁,作者不希望使用Mutex这把重锁,而是自定义实现的一种轻量级的自旋锁

package sync

import (
	"runtime"
	"sync"
	"sync/atomic"
)

type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
	backoff := 1
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.
		for i := 0; i < backoff; i++ {
			runtime.Gosched()
		}
		if backoff < maxBackoff {
            // 左移一位,就相当于是*=2
			backoff <<= 1
		}
	}
}

func (sl *spinLock) Unlock() {
	atomic.StoreUint32((*uint32)(sl), 0)
}

该锁实现原理:

(1)通过一个整型状态值标识锁的状态:0-未加锁;1-加锁;

(2)加锁成功时,即把 0 改为 1;解锁时则把 1 改为 0;改写过程均通过 atomic 包保证并发安全;

(3)加锁通过 for 循环 + cas 操作实现自旋,无需操作系统介入执行 park 操作;

(4)通过变量 backoff 反映抢锁激烈度,每次抢锁失败,执行 backoff 次让 cpu 时间片动作;backoff 随失败次数逐渐升级,封顶 16.

1.2 为什么使用协程池呢

在了解过对象池sync.Pool的同学肯定不陌生,目的就是为了复用,提高性能等,方便管理协程的声明周期,防止协程无限制的创建和销毁。

1.3 Ants的底层数据结构

type poolCommon struct {
	// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
	// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
	// which submits a new task to the same pool.
	capacity int32

	// running is the number of the currently running goroutines.
	running int32

	// lock for protecting the worker queue.
	lock sync.Locker

	// workers is a slice that store the available workers.
	workers workerQueue

	// state is used to notice the pool to closed itself.
	state int32

	// cond for waiting to get an idle worker.
	cond *sync.Cond

	// done is used to indicate that all workers are done.
	allDone chan struct{}
	// once is used to make sure the pool is closed just once.
	once *sync.Once

	// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
	workerCache sync.Pool

	// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
	waiting int32

	purgeDone int32
	purgeCtx  context.Context
	stopPurge context.CancelFunc

	ticktockDone int32
	ticktockCtx  context.Context
	stopTicktock context.CancelFunc

	now atomic.Value

	options *Options
}

这里主要说一下workerQueue和workerCache的区别,前者时候可以复用的goWorker对象列表,存放的是可以工作的goroutine对象,后者则是销毁那些长时间不使用的goroutine。

(注意图上的是之前版本,现在已经把workerArray改为workerQueue)

goWorker

首先来看看goWorker的结构

type goWorker struct {
	worker

	// pool who owns this worker.
	pool *Pool

	// task is a job should be done.
	task chan func()

	// lastUsed will be updated when putting a worker back into queue.
	lastUsed time.Time
}

goWorker 可以简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务,其核心字段包含:

(1)pool:goWorker 所属的协程池

(2)task:goWorker 用于接收异步任务包的管道

(3)lastUsed:goWorker 回收到协程池的时间

这里的worker就是一个结构,可以把它理解为工作者。goWorker继承和重定义这些函数。

那他是如何接收到外部的任务的呢?

task chan func(),就是通过这个字段,通过一个任务管道,在内部遍历,从而执行这个函数。

Pool

接着看一下Ants下的Pool结构吧:

type Pool struct {
	*poolCommon
}
type poolCommon struct {
	// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
	// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
	// which submits a new task to the same pool.
	capacity int32

	// running is the number of the currently running goroutines.
	running int32

	// lock for protecting the worker queue.
	lock sync.Locker

	// workers is a slice that store the available workers.
	workers workerQueue

	// state is used to notice the pool to closed itself.
	state int32

	// cond for waiting to get an idle worker.
	cond *sync.Cond

	// done is used to indicate that all workers are done.
	allDone chan struct{}
	// once is used to make sure the pool is closed just once.
	once *sync.Once

	// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
	workerCache sync.Pool

	// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
	waiting int32

	purgeDone int32
	purgeCtx  context.Context
	stopPurge context.CancelFunc

	ticktockDone int32
	ticktockCtx  context.Context
	stopTicktock context.CancelFunc

	now atomic.Value

	options *Options
}

其实就是对poolCommon的一个封装,这里在简单的介绍一下

(1)capacity:池子的容量

(2)running:出于运行中的协程数量

(3)lock:自制的自旋锁,保证取 goWorker 时并发安全

(4)workers:goWorker 列表,即“真正意义上的协程池”

(5)state:池子状态标识,0-打开;1-关闭

(6)cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程

(7)workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的 goWorker 仍存活,进入对象池的 goWorker 严格意义上已经销毁;

(8)waiting:标识出于等待状态的协程数量;

(9)heartbeatDone:标识回收协程是否关闭;

(10)stopHeartbeat:用于关闭回收协程的控制器函数;

(11)options:一些定制化的配置.

Options

在来看看,提供了哪些配置吧:

type Options struct {
	ExpiryDuration time.Duration
	PreAlloc bool
	MaxBlockingTasks int
	Nonblocking bool
	PanicHandler func(any)
	Logger Logger
	DisablePurge bool
}
  1. ExpiryDuration:清理协程的扫描周期。清理协程会每隔 ExpiryDuration 时间扫描一次所有 worker,并清除那些未被使用超过 ExpiryDuration 的 worker
  2. PreAlloc:初始化协程池时是否预分配内存。若为 true,池会提前分配内存以减少运行时的动态分配开销。
  3. MaxBlockingTasks:限制通过 pool.Submit 提交任务时,最多允许阻塞的 goroutine 数量。若超过此限制,提交任务会返回错误。
  4. Nonblocking:若为 true,则 pool.Submit 永远不会阻塞。若任务无法立即提交(如无空闲 worker 且池已满),会直接返回 ErrPoolOverload 错误
  5. PanicHandler:捕获 worker goroutine 中的 panic。若未设置,panic 会直接抛出到 worker 的 goroutine 中,可能导致程序崩溃。
  6. Logger:自定义日志器,用于记录协程池的运行信息(如 worker 创建、清理等)
  7. DisablePurge:若为 true,则 worker 不会被自动清理,即使空闲时间超过 ExpiryDuration 也会常驻内存。

在Ants里面参数也是设置为函数式编程,在后续也会介绍到,可以先不着急。

workerQueue

言归正传,回到我们的goWorker上面,对于这些任务,需要一个装载记录他们的组合,他就是workerQueue

type workerQueue interface {
	len() int
	isEmpty() bool
	insert(worker) error
    // 获取goworker
	detach() worker
    // 将空闲时间过长的goworker进行回收
	refresh(duration time.Duration) []worker 
	reset()
}

在他的下面有两个实现,一个是栈,一个是队列的

对于回收机制,还记得之前说过的一个字段lastUsed time.Time,他就是最后一次使用后放回的时间

1.4 Ants的核心api

Ants提供了两个核心api供我们使用,一个是NewPool,另外一个是Sumbit。

NewPool:创建一个协程池

Submit:把任务提交到协程池,有后续的协程运行。

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {
	pc, err := newPool(size, options...)
	if err != nil {
		return nil, err
	}

	pool := &Pool{poolCommon: pc}
	pool.workerCache.New = func() any {
		return &goWorker{
			pool: pool,
			task: make(chan func(), workerChanCap),
		}
	}

	return pool, nil
}
func newPool(size int, options ...Option) (*poolCommon, error) {
    if size <= 0 {
        size = -1
    }

    opts := loadOptions(options...)

    if !opts.DisablePurge {
        if expiry := opts.ExpiryDuration; expiry < 0 {
            return nil, ErrInvalidPoolExpiry
        } else if expiry == 0 {
            opts.ExpiryDuration = DefaultCleanIntervalTime
        }
    }

    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }

    p := &poolCommon{
        capacity: int32(size),
        allDone:  make(chan struct{}),
        lock:     syncx.NewSpinLock(),
        once:     &sync.Once{},
        options:  opts,
    }
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerQueue(queueTypeLoopQueue, size)
    } else {
        p.workers = newWorkerQueue(queueTypeStack, 0)
    }

    p.cond = sync.NewCond(p.lock)

    p.goPurge()
    p.goTicktock()

    return p, nil
}

接下来看看Submit函数,它的实现:

它主要做的就是从Pool中取出一个可用的goWorker,将用户提交的任务包添加goWorker的channel里面去。

func (p *Pool) Submit(task func()) error {
	if p.IsClosed() {
		return ErrPoolClosed
	}

	w, err := p.retrieveWorker()
	if w != nil {
		w.inputFunc(task)
	}
	return err
}
func (w *goWorker) inputFunc(fn func()) {
	w.task <- fn
}
// retrieveWorker returns an available worker to run the tasks.
func (p *poolCommon) retrieveWorker() (w worker, err error) {
	p.lock.Lock()

retry:
	// First try to fetch the worker from the queue.
	if w = p.workers.detach(); w != nil {
		p.lock.Unlock()
		return
	}

	// If the worker queue is empty, and we don't run out of the pool capacity,
	// then just spawn a new worker goroutine.
	if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		p.lock.Unlock()
		w = p.workerCache.Get().(worker)
		w.run()
		return
	}

	// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
	if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
		p.lock.Unlock()
		return nil, ErrPoolOverload
	}

	// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
	p.addWaiting(1)
	p.cond.Wait() // block and wait for an available worker
	p.addWaiting(-1)

	if p.IsClosed() {
		p.lock.Unlock()
		return nil, ErrPoolClosed
	}

	goto retry
}

在之前讲解goWorker的函数的时候,只看了它的结构接下来看看它提供的一些方法:

package ants

import (
	"runtime/debug"
	"time"
)

type goWorker struct {
	worker
	pool *Pool
	task chan func()
	lastUsed time.Time
}

func (w *goWorker) run() {
	w.pool.addRunning(1)
	go func() {
		defer func() {
			if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
				w.pool.once.Do(func() {
					close(w.pool.allDone)
				})
			}
			w.pool.workerCache.Put(w)
			if p := recover(); p != nil {
				if ph := w.pool.options.PanicHandler; ph != nil {
					ph(p)
				} else {
					w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
				}
			}
			// Call Signal() here in case there are goroutines waiting for available workers.
			w.pool.cond.Signal()
		}()

		for fn := range w.task {
			if fn == nil {
				return
			}
			fn()
			if ok := w.pool.revertWorker(w); !ok {
				return
			}
		}
	}()
}

// 完成任务将nil加入任务队列
func (w *goWorker) finish() {
	w.task <- nil
}
// 获取最后一次使用完放回池子的时间
func (w *goWorker) lastUsedTime() time.Time {
	return w.lastUsed
}
// 设置时间
func (w *goWorker) setLastUsedTime(t time.Time) {
	w.lastUsed = t
}
// 将任务加入channel
func (w *goWorker) inputFunc(fn func()) {
	w.task <- fn
}

主要就是看这个run函数

(1)循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;

(2)执行完成 task 后,会将自己交还给协程池;

(3)倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.

在上述过程中看到了revertWorker,他其实就是pool进行回收使用过的协程

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *poolCommon) revertWorker(worker worker) bool {
	if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
		p.cond.Broadcast()
		return false
	}

	worker.setLastUsedTime(p.nowTime())

	p.lock.Lock()
	// To avoid memory leaks, add a double check in the lock scope.
	// Issue: https://github.com/panjf2000/ants/issues/113
	if p.IsClosed() {
		p.lock.Unlock()
		return false
	}
	if err := p.workers.insert(worker); err != nil {
		p.lock.Unlock()
		return false
	}
	// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
	p.cond.Signal()
	p.lock.Unlock()

	return true
}

Pool.revertWorker 方法用于回收 goWorker 回到协程池:

(1)回收时更新 goWorker 回收时间,用于 goWorker 的定期清理;

(2)加锁后,将 goWorker 添加回协程池;

(3)通过协调器 cond 唤醒下一个阻塞等待的协程,并解锁.

还有最后一点,就是如何定期回收goworker?

func (p *poolCommon) purgeStaleWorkers() {
	ticker := time.NewTicker(p.options.ExpiryDuration)

	defer func() {
		ticker.Stop()
		atomic.StoreInt32(&p.purgeDone, 1)
	}()

	purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
	for {
		select {
		case <-purgeCtx.Done():
			return
		case <-ticker.C:
		}

		if p.IsClosed() {
			break
		}

		var isDormant bool
		p.lock.Lock()
		staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
		n := p.Running()
		isDormant = n == 0 || n == len(staleWorkers)
		p.lock.Unlock()

		// Clean up the stale workers.
		for i := range staleWorkers {
			staleWorkers[i].finish()
			staleWorkers[i] = nil
		}

		// There might be a situation where all workers have been cleaned up (no worker is running),
		// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
		if isDormant && p.Waiting() > 0 {
			p.cond.Broadcast()
		}
	}
}

(1)purgeStaleWorkers 方法开启了一个 ticker,按照用户预设的过期时间间隔轮询回收过期的 goWorker;

(2)回收的方式是往对应 goWorker 的 channel 中注入一个空值,goWorker 将会自动将自身放回协程池的对象池 workerCache 当中;

(3)倘若当前存在空闲的 goWorker 且有协程阻塞等待,会唤醒所有阻塞协程.