【gRPC】clientPool 客户端连接池简单实现与go案例

发布于:2025-02-10 ⋅ 阅读:(130) ⋅ 点赞:(0)

什么是 gRPC 客户端连接池?

  • 在 gRPC 中,创建和维护一个到服务器的连接是非常消耗资源的(比如 TCP 连接建立和 TLS 握手)。

  • 而在高并发场景下,如果每次请求都创建新的连接,不仅会导致性能下降,还可能耗尽系统资源。

  • 因此,客户端连接池的作用是复用一定数量的连接,提高资源利用率和性能。


gRPC 客户端连接池的原理

  1. 连接复用,池子里的连接使用时取出,用完放回
  2. 控制连接数,可以固定数量或动态调整,防止建太多连接
  3. 并发安全

先展示一个基于sync.pool创建的clientPool

  • 实际上,企业不推荐使用sync包里的无锁机制,
  • 因为sync包里的无锁设计适用于高并发,短暂资源的情况,
  • 而gRPC本身设计初衷是客户端连接是长生命周期,需要稳定管理的资源,与sync.pool的特性不完全匹配
因此为了更好实现,可以自己加锁设计,或者使用第三方库这里举例github:go-grpc-pool

type ClientPool interface {
	Get() *grpc.ClientConn
	Put(conn *grpc.ClientConn)
}

type clientPool struct {
	pool sync.Pool
}

func GetPool(target string, opts ...grpc.DialOption) (ClientPool, error) {
	return &clientPool{
		pool: sync.Pool{
			New: func() any {
				conn, err := grpc.Dial(target, opts...)
				if err != nil {
					log.Println(err)
					return nil
				}
				return conn
			},
		},
	}, nil
}

func (c *clientPool) Get() *grpc.ClientConn {
	conn := c.pool.Get().(*grpc.ClientConn)
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		conn = c.pool.New().(*grpc.ClientConn)
	}
	return conn
}

func (c *clientPool) Put(conn *grpc.ClientConn) {
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		return
	}
	c.pool.Put(conn)
}

自己加锁设计

package main

import (
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/connectivity"
)

// ClientPool 定义接口
type ClientPool interface {
	Get() (*grpc.ClientConn, error)
	Put(conn *grpc.ClientConn)
	Close()
}

// clientPool 是 ClientPool 的实现
type clientPool struct {
	mu          sync.Mutex
	connections chan *grpc.ClientConn
	maxSize     int
	idleTimeout time.Duration
	target      string
	opts        []grpc.DialOption
	closed      bool
}

// NewClientPool 创建一个新的客户端连接池
func NewClientPool(target string, maxSize int, idleTimeout time.Duration, opts ...grpc.DialOption) (ClientPool, error) {
	if maxSize <= 0 {
		return nil, ErrInvalidMaxSize
	}

	pool := &clientPool{
		connections: make(chan *grpc.ClientConn, maxSize),
		maxSize:     maxSize,
		idleTimeout: idleTimeout,
		target:      target,
		opts:        opts,
	}

	// 预填充池
	for i := 0; i < maxSize; i++ {
		conn, err := pool.createConnection()
		if err != nil {
			return nil, err
		}
		pool.connections <- conn
	}

	return pool, nil
}

// createConnection 创建新连接
func (p *clientPool) createConnection() (*grpc.ClientConn, error) {
	conn, err := grpc.Dial(p.target, p.opts...)
	if err != nil {
		return nil, err
	}
	return conn, nil
}

// Get 从连接池获取一个连接
func (p *clientPool) Get() (*grpc.ClientConn, error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if p.closed {
		return nil, ErrPoolClosed
	}

	select {
	case conn := <-p.connections:
		// 检查连接状态
		if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
			conn.Close()
			return p.createConnection()
		}
		return conn, nil
	default:
		// 如果没有空闲连接,尝试创建新的连接
		return p.createConnection()
	}
}

// Put 将连接放回池中
func (p *clientPool) Put(conn *grpc.ClientConn) {
	if conn == nil {
		return
	}

	// 检查连接状态
	if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
		conn.Close()
		return
	}

	select {
	case p.connections <- conn:
		// 放回池中
	default:
		// 如果池已满,直接关闭连接
		conn.Close()
	}
}

// Close 关闭连接池
func (p *clientPool) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()

	if p.closed {
		return
	}

	p.closed = true
	close(p.connections)

	for conn := range p.connections {
		conn.Close()
	}
}

// 错误定义
var (
	ErrInvalidMaxSize = log.New("invalid max size")
	ErrPoolClosed     = log.New("connection pool is closed")
)

// 示例使用
func main() {
	pool, err := NewClientPool("localhost:50051", 10, time.Minute, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to create pool: %v", err)
	}

	conn, err := pool.Get()
	if err != nil {
		log.Fatalf("Failed to get connection: %v", err)
	}

	// 使用连接
	// client := pb.NewYourServiceClient(conn)

	// 放回连接
	pool.Put(conn)

	// 程序退出时关闭连接池
	pool.Close()
}