Go语言手搓协程池

发布于:2025-05-01 ⋅ 阅读:(41) ⋅ 点赞:(0)

协程池介绍

协程池简单理解就是有一个池子一样的东西,里面装着固定数量的goroutine,当有一个任务到来的时候,会将这个任务交给池子里的一个空闲的goroutine去处理,如果池子里没有空闲的goroutine了,任务就会阻塞等待。所以协程池有三个角色Worker,Task,Pool。

相关角色和方法

协程池所有的角色:task 任务,poll 池子,worker 任务执行单元

属性定义:

Task:具体的任务

Pool:池子

worker:用于执行任务的 goroutine

方法定义:

  • NewTask:创建任务

  • NewPool:创建协程池

  • AddTask:向协程池添加任务

  • run:worker 的逻辑,开始执行

  • incRunning:增加 worker 的数量

  • decRunning:减少 worker 的数量

  • getRunningWorkers:获得当前正在工作的 worker 数量

  • getCap:获得当前 worker 的容量

注意:用 for range 遍历 channel 时,若通道未关闭且无数据就会被阻塞;若通道已关闭且无数据则不会阻塞,会直接退出循环。

Coding

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Task struct {
	f func() error //具体的任务逻辑
}

type Pool struct {
	RunningWorkers int64      //运行着的 worker 数量
	Capacity       int64      //协程池 worker 数量
	JobCh          chan *Task //任务存放的位置
	sync.Mutex
}

func NewTask(funcArg func() error) *Task {
	return &Task{
		f: funcArg,
	}
}

func NewPool(capacity int64, taskNum int64) *Pool {
	return &Pool{
		Capacity: capacity,
		JobCh:    make(chan *Task, taskNum),
	}
}

func (p *Pool) GetCap() int64 {
	return p.Capacity
}

func (p *Pool) incRunning() {
	atomic.AddInt64(&p.RunningWorkers, 1)
}

func (p *Pool) decRunning() {
	atomic.AddInt64(&p.RunningWorkers, -1)
}

func (p *Pool) getRunningWorkers() int64 {
	return atomic.LoadInt64(&p.RunningWorkers)
}

func (p *Pool) run() {
	p.incRunning()
	go func() {
		defer func() {
			p.decRunning()
		}()

		// 如果 channel 没有 task,并且也没有被 close()
		// 那么会一直阻塞在这里,为了正确的退出,我们在 main 函数里设置了 3s 的等待时间,确保程序正确退出
		for task := range p.JobCh {
			task.f()
		}
	}()
}

// AddTask 往协程池添加任务
func (p *Pool) AddTask(task *Task) {
	//加锁,防止启动多个 worker,导致下面的判断出问题
	p.Lock()
	defer p.Unlock()

	if p.getRunningWorkers() < p.GetCap() {
		//启动创建一个 worker
		p.run()
	}

	//将任务放入 channel,等待消费
	p.JobCh <- task
}

func main() {
	//创建协程池:3个协程,容量为 10 的任务队列 channel
	pool := NewPool(3, 10)

	for i := 0; i < 20; i++ {
		//将任务放入池中
		pool.AddTask(NewTask(func() error {
			fmt.Printf("I am Task \n")
			return nil
		}))
	}

	//确保 20 个任务都执行完成
	time.Sleep(time.Second * 3)
}


网站公告

今日签到

点亮在社区的每一天
去签到