用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本2

发布于:2025-05-10 ⋅ 阅读:(7) ⋅ 点赞:(0)

在版本1中,虽然系统能够满足基本需求,但随着连接数的增加和处理请求的复杂度上升,性能瓶颈逐渐显现。为了进一步提升系统的稳定性、并发处理能力以及资源的高效利用,版本2引入了三个重要功能:客户端连接池、服务器长连接以及服务器处理业务逻辑时引入的协程池,主要是为了更好的利用资源和提高系统的稳定性。这些功能的引入,使得系统在面对大规模连接和高并发请求时,能够更好地应对。

为了更好的验证两个版本的效果,文章末尾也增加了两个版本的压测效果
代码地址:https://github.com/karatttt/MyRPC

版本2新增特性

分别解释一下每个功能引入的原因和实际需求。

客户端连接池的引入

背景:

在之前的版本中,每次与服务器的通信都会创建和销毁连接,频繁的连接创建和销毁不仅浪费了系统资源,每一次都需要经过TCP的握手和挥手,同时也导致了连接响应时间的不稳定,我们希望引入一个连接池,更好的管理和复用连接。

// 原版本
// 实现Send方法
func (c *clientTransport) Send(ctx context.Context, reqBody interface{}, rspBody interface{}, opt *ClientTransportOption) error {
	// 获取连接
	// TODO 这里的连接后续可以优化从连接池获取
	conn, err := net.Dial("tcp", opt.Address)
	if err != nil {
		return err
	}
	defer conn.Close()
实现思路:

连接池和协程池有相似的地方也有不同的地方,首先他们都是池化机制,旨在重用已有资源(连接或协程),避免频繁创建和销毁资源的性能开销。都有空闲资源的回收、最大并发数的限制,以及对请求的排队和等待机制,同时都需要处理资源的生命周期管理(如超时、关闭等)。

但是他们的对象不同,一个是TCP连接,一个是系统的协程。我们可以借鉴协程池的实现来实现这个连接池。

这里还需要注意一个点。我们往往对于一个service的请求是同一目的 IP + 端口(如并发调用多次Hello方法,访问server也是同一个同一目的 IP + 端口,只是源端口不同),不同的方法通过协议数据中的方法名来进行service内的路由。而我们创建的这个连接池,是对于这个目的 IP + 端口的池化处理,针对这个目的 IP + 端口创建多个连接并复用,所以我们系统中对于不同的service应有不同的连接池,故做一个poolManager来统一管理。

PoolManager
先来看看这个poolManager,首先是getPoolManager,获取一个全局唯一的poolManager


	type PoolManager struct {
		mu       sync.RWMutex
		pools    map[string]*ConnPool // key是目的ip加端口,v是实际连接池
		ctx      context.Context
		cancel   context.CancelFunc
		sigChan  chan os.Signal
	}

	// GetPoolManager 获取全局唯一的 PoolManager 实例
	func GetPoolManager() *PoolManager {
		poolManagerOnce.Do(func() {
			ctx, cancel := context.WithCancel(context.Background())
			globalPoolManager = &PoolManager{
				pools:    make(map[string]*ConnPool),
				ctx:      ctx,
				cancel:   cancel,
				sigChan:  make(chan os.Signal, 1),
			}
			// 启动信号处理
			go globalPoolManager.handleSignals()
		})
		return globalPoolManager
	}
  • 首先这个struct持有一个pools,以及sigChan用于监听程序退出时,优雅关闭所有连接池,这个优雅关闭后续再看。
  • poolManagerOnce.Do(func() 保证只有一个manager实例
  • 我们下面看看如何创建一个连接池
// GetPool 获取指定地址的连接池,如果不存在则创建
func (pm *PoolManager) GetPool(addr string) *ConnPool {
	pm.mu.RLock()
	if pool, exists := pm.pools[addr]; exists {
		pm.mu.RUnlock()
		return pool
	}
	pm.mu.RUnlock()

	// 创建连接池
	pm.mu.Lock()
	defer pm.mu.Unlock()

	// 双重检查,防止其他goroutine已经创建
	if pool, exists := pm.pools[addr]; exists {
		return pool
	}

	pool := NewConnPool(
		addr,           // 服务器地址
		1000,             // 最大活跃连接数
		1000,              // 最小空闲连接数
		60*time.Second, // 空闲连接超时时间
		60*time.Second, // 建立连接最大生命周期
		func(address string) (net.Conn, error) {
			return net.DialTimeout("tcp", address, 60*time.Second)
		},
	)
	pm.pools[addr] = pool
	return pool
}
// handleSignals 处理系统信号
func (pm *PoolManager) handleSignals() {
	// 注册信号
	signal.Notify(pm.sigChan, syscall.SIGINT, syscall.SIGTERM)

	// 等待信号
	sig := <-pm.sigChan
	fmt.Printf("\nReceived signal: %v\n", sig)

	// 创建一个带超时的上下文用于关闭
	shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 优雅关闭连接池
	fmt.Println("Shutting down connection pools...")
	if err := pm.Shutdown(shutdownCtx); err != nil {
		fmt.Printf("Error during shutdown: %v\n", err)
	}
	fmt.Println("Connection pools shut down successfully")
}
// Shutdown 优雅关闭所有连接池
func (pm *PoolManager) Shutdown(ctx context.Context) error {
	// 发送关闭信号
	pm.cancel()

	// 等待所有连接池关闭
	done := make(chan struct{})
	go func() {
		pm.mu.Lock()
		for _, pool := range pm.pools {
			pool.Close()
		}
		pm.pools = make(map[string]*ConnPool)
		pm.mu.Unlock()
		close(done)
	}()

	// 等待关闭完成或超时
	select {
	case <-done:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}
  • GetPool中,先获取读锁判断是否有pool已经创建,若没有获取写锁创建连接池
  • handleSignals是创建manager时创建的一个协程监听系统信号,如果进程结束,则调用Shutdown,关闭所有的pool。

ConnectPool
获得了连接池后,我们看看如何获取连接,即get操作


func (p *ConnPool) Get() (net.Conn, error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	// 如果连接池已关闭或正在关闭,拒绝新连接
	if p.closed || p.closing {
		return nil, ErrPoolClosed
	}

	// 设置等待超时
	var startWait time.Time
	if p.maxWait > 0 {
		startWait = time.Now()
	}

	for {
		// 检查空闲连接
		if len(p.idleConns) > 0 {
			conn := p.idleConns[len(p.idleConns)-1]
			p.idleConns = p.idleConns[:len(p.idleConns)-1]
			atomic.AddUint64(&p.stats.Hits, 1)

			// 连接健康检查
			if !p.isHealthy(conn) {
				conn.conn.Close()
				atomic.AddInt32(&p.activeCount, -1)
				continue
			}

			// 重置超时设置
			conn.conn.SetDeadline(time.Time{})
			conn.conn.SetReadDeadline(time.Time{})
			conn.conn.SetWriteDeadline(time.Time{})

			conn.lastUsed = time.Now()
			atomic.AddInt32(&p.activeCount, 1)
			p.wg.Add(1)
			p.mu.Unlock()
			return &pooledConnWrapper{conn, p}, nil
		}

		// 检查是否可以创建新连接
		if int(atomic.LoadInt32(&p.activeCount)) < p.maxActive {
			atomic.AddInt32(&p.activeCount, 1)
			p.wg.Add(1)
			atomic.AddUint64(&p.stats.Misses, 1)
			p.mu.Unlock()

			conn, err := p.dialFunc(p.addr)
			if err != nil {
				atomic.AddInt32(&p.activeCount, -1)
				p.wg.Done()
				atomic.AddUint64(&p.stats.Errors, 1)
				p.cond.Signal()
				return nil, err
			}

			pooledConn := &PooledConn{
				conn:      conn,
				createdAt: time.Now(),
				lastUsed:  time.Now(),
			}
			return &pooledConnWrapper{pooledConn, p}, nil
		}

		// 等待连接释放
		if p.maxWait > 0 {
			atomic.AddUint64(&p.stats.Timeouts, 1)
			p.cond.Wait()

			// 检查是否超时
			if time.Since(startWait) >= p.maxWait {
				p.mu.Unlock()
				return nil, fmt.Errorf("连接池获取连接超时,等待时间: %v", time.Since(startWait))
			}
		} else {
			p.cond.Wait()
		}
	}
}

  • 连接池的各个连接有两种状态,空闲连接,活跃连接(正在处理IO的连接),空闲连接通过Put方法归还连接,若超过最大空闲连接数,则该连接被close,活跃连接通过Get方法将其置为活跃状态,若超过最大活跃连接数,则需要排队等待连接池释放连接
  • 获取到连接有几种可能,一种是有空闲连接直接获取返回,一种是无空闲连接,但是活跃连接数未达到最大阈值,则创建新连接,一种是无空闲连接且达到了最大活跃连接数,则通过cond.Wait等待,直到被唤醒的时候,在通过上面几种方式尝试获取
  • 同样有三种情况无法调用get的时候立即创建新连接,一个就是连接池正在关闭或者已经关闭,一个是获取的空闲连接健康检测不通过(这里的检测就是简单的调用conn.Read读,如果读到数据或者返回err则说明不健康,这个连接仍在被使用),还有一个就是无空闲连接且达到了最大活跃连接数,cond.wait等待的时间超过了最大等待时间
  • 接下来看看Put方法

func (p *ConnPool) Put(conn net.Conn) {
	pc, ok := conn.(*pooledConnWrapper)
	if !ok {
		conn.Close()
		return
	}

	p.mu.Lock()
	defer p.mu.Unlock()

	// 减少活跃计数
	atomic.AddInt32(&p.activeCount, -1)
	p.wg.Done()

	// 如果连接池正在关闭或已关闭,直接关闭连接
	if p.closing || p.closed {
		pc.conn.conn.Close()
		p.cond.Signal()
		return
	}

	// 检查连接是否健康
	if !p.isHealthy(pc.conn) {
		pc.conn.conn.Close()
		p.cond.Signal()
		return
	}

	// 检查是否超过最大空闲连接数
	if len(p.idleConns) >= p.maxIdle {
		pc.conn.conn.Close()
		p.cond.Signal()
		return
	}

	pc.conn.lastUsed = time.Now()
	p.idleConns = append(p.idleConns, pc.conn)
	p.cond.Signal()
}
  • Put方法较为简单,当客户端处理完请求的时候,就 defer一下pool的Put方法归还连接,将该连接的状态置为空闲,如果超过了最大空闲数则close这个连接
  • 最后看看close方法,连接池的优雅关闭
func (p *ConnPool) Shutdown(timeout time.Duration) error {
	p.mu.Lock()
	// 标记为正在关闭,不再接受新连接
	p.closing = true
	p.mu.Unlock()

	// 关闭所有空闲连接
	p.mu.Lock()
	for _, conn := range p.idleConns {
		conn.conn.Close()
	}
	p.idleConns = nil
	p.mu.Unlock()

	// 等待活跃连接完成或超时
	done := make(chan struct{})
	go func() {
		p.wg.Wait() // 等待所有活跃连接归还
		close(done)
	}()

	select {
	case <-done:
		// 所有活跃连接已完成
		p.mu.Lock()
		p.closed = true
		p.mu.Unlock()
		return nil
	case <-time.After(timeout):
		// 超时,强制关闭
		p.mu.Lock()
		p.closed = true
		p.mu.Unlock()
		return fmt.Errorf("连接池关闭超时,仍有 %d 个活跃连接", atomic.LoadInt32(&p.activeCount))
	}
}

func (p *ConnPool) Close() {
	// 默认给5秒超时
	if err := p.Shutdown(5 * time.Second); err != nil {
		fmt.Println("连接池关闭警告:", err)
	}
}


  • 首先为什么需要优雅关闭? 如果没有任何协程监听信号(无 signal.Notify)当 SIGINT(Ctrl+C)或 SIGTERM(kill)发生时,进程会立即退出,所有协程(包括 main 和子协程)会被强制终止。操作系统回收所有资源包括socket连接,这就意味着所有未完成的 net.Conn 会被强制关闭,服务端会收到 RST(强制关闭连接,不进行四次挥手) 而非 FIN。我们希望这些正在进行的连接能够正常处理完再关闭,避免server收到不完整的数据从而引发其他意外发生
  • 当然我们需要为这个等待活跃连接处理完设置一个超时时间,所以再Close中,调用shutdown方法并设置了5秒超时时间,shutdown中关闭所有空闲连接,并wg.wait等待活跃连接处理完毕

至此客户端的连接池做好了,但是需要考虑的是,版本1的server端的连接是一次性的,处理完业务逻辑返回后立即close连接。如果不改server的短连接为长连接,那么客户端的连接池则没有意义,即使是空闲的连接仍然会被server立即close掉,所以我们引入server端的长连接

服务器长连接的引入

背景:

在旧版本中,客户端与服务器之间的每次请求都需要进行连接和断开,频繁的建立和关闭连接增加了系统的负担。长连接机制能够让服务器保持与客户端的连接,在连接周期内不断发送和接收数据,减少了连接频繁创建的开销,提高了通信效率,尤其是在需要频繁请求的场景下。

技术思路:

使用for循环监听长连接,不断读取请求帧,并根据上下文状态决定是否关闭连接。同时使用超时机制来防止长时间未操作的连接占用资源。主要更改handleConnection这个方法,即listen到新连接后,go出去的一个协程处理这个方法


// handleConnection 处理单个连接
func (t *serverTransport) handleConnection(ctx context.Context, conn net.Conn) {
	// 设置连接超时
	idleTimeout := 30 * time.Second
	if t.opts != nil && t.opts.IdleTimeout > 0 {
		idleTimeout = t.opts.IdleTimeout
	}

	// 设置读取超时
	conn.SetReadDeadline(time.Now().Add(idleTimeout))

	// 处理连接
	fmt.Printf("New connection from %s\n", conn.RemoteAddr())

	// 循环读取请求,即长连接
	for {
		select {
		// 1. 如果上下文被取消,则关闭连接
		case <-ctx.Done():
			fmt.Printf("Context cancelled, closing connection from %s\n", conn.RemoteAddr())
			return
		default:
			frame, err := codec.ReadFrame(conn)
			if err != nil {
				// 2. 如果读取帧失败,如客户端断开连接,则关闭连接
				if err == io.EOF {
					fmt.Printf("Client %s disconnected normally\n", conn.RemoteAddr())
					return
				}
				// 3. 如果连接超时,超过设置的idletime,关闭连接
				if e, ok := err.(net.Error); ok && e.Timeout() {
					fmt.Printf("Connection from %s timed out after %v\n", conn.RemoteAddr(), idleTimeout)
					return
				}
				// 4. 处理强制关闭的情况
				if strings.Contains(err.Error(), "forcibly closed") {
					fmt.Printf("Client %s forcibly closed the connection\n", conn.RemoteAddr())
					return
				}
				fmt.Printf("Read error from %s: %v\n", conn.RemoteAddr(), err)
				return
			}

			// 重置读取超时
			conn.SetReadDeadline(time.Now().Add(idleTimeout))

			// 使用协程池处理请求
			frameCopy := frame // 创建副本避免闭包问题
			err = t.pool.Submit(func() {
				// 处理请求
				response, err := t.ConnHandler.Handle(context.Background(), frameCopy)
				if err != nil {
					fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)
					return
				}

				// 发送响应
				if _, err := conn.Write(response); err != nil {
					fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)
				}
			})

			if err != nil {
				fmt.Printf("Submit task to pool error for %s: %v\n", conn.RemoteAddr(), err)
				// 协程池提交失败,直接处理
				response, err := t.ConnHandler.Handle(ctx, frame)
				if err != nil {
					fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)
					continue
				}

				if _, err := conn.Write(response); err != nil {
					fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)
					return
				}
			}
		}
	}
}

  • 可见现在处理读取帧的逻辑变成了for循环,即长连接处理多次的请求,当遇到客户端连接断开或者长时间没有数据传输,则server关闭这个连接

服务端协程池的引入

背景:

在之前的版本中,服务器需要为每一个请求分配一个独立的协程处理,随着请求量的增加,创建大量的协程会导致资源浪费和上下文切换的开销。 通过引入协程池,可以限制并发协程的数量,避免系统因为过多的协程而出现性能瓶颈。协程池帮助复用已有的协程,减少了每个请求创建新协程的开销,提升了处理能力。

技术思路:

使用一个协程池管理并发任务,限制并发请求的数量。如果协程池中的协程数量已达到上限,新请求将被等待或者直接失败,避免过度的并发资源竞争。这里主要还是handleConnection这个方法

// handleConnection部分代码
// 使用协程池处理请求
			frameCopy := frame // 创建副本避免闭包问题
			err = t.pool.Submit(func() {
				// 处理请求
				response, err := t.ConnHandler.Handle(context.Background(), frameCopy)
				if err != nil {
					fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)
					return
				}

				// 发送响应
				if _, err := conn.Write(response); err != nil {
					fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)
				}
			})
  • 这里使用了t.pool.Submit(func() 来把业务处理逻辑交由协程池,这里协程池用的是"github.com/panjf2000/ants/v2" 里的ants.Pool
  • 其实这里想和netty的多路复用模型做一个比对,可以更好的理解这种go的模型:
  1. Go 的 net
    • 全局 netpoller 管理所有 socket(监听 + 连接)。实际上go的事件循环包括监听和连接事件,也就是epoll监听的事件循环中,包括监听连接事件(就绪则创建连接),和处理连接事件(就绪则读请求,这个事件是在我们调用conn.Read的时候注册到事件循环中,协程进入等待状态,当数据到达内核的时候则事件就绪,go的调度器会唤醒这个协程开始read数据),实际上和redis的单reactor单线程有点像
    • 但是这里处理业务逻辑就不是单线程了, 我们这里用了协程池来处理业务逻辑
  2. Netty
    • 主 Reactor 负责 Accept,从 Reactor 负责 Read/Write。相对于go的模型做了职责的分类,处理业务逻辑同样是线程池来做,主从reactor多线程模型
      ![[Pasted image 20250510184039.png]]

压测

进行了以上的改进,我们试着写一个例子进行压测,和版本1进行比对

package main

import (
	"MyRPC/core/client"
	"MyRPC/pb"
	"context"
	"fmt"
	"net/http"
	_ "net/http/pprof"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

var (
	success int64
	wg      sync.WaitGroup
)

func main() {
	// 启动 pprof 服务(用于可视化内存分析)
	go func() {
		fmt.Println("[pprof] listening on :6060")
		_ = http.ListenAndServe(":6060", nil)
	}()

	// 创建 RPC 客户端
	c := pb.NewHelloClientProxy(client.WithTarget("121.40.244.59:8001"))
	if c == nil {
		fmt.Println("Failed to create client")
		return
	}

	printMemStats("Before requests")
	// 20000并发
	// 客户端连接池是1000最大活跃连接数,1000最小空闲连接数
	const N = 20000
	start := time.Now()
	for i := 0; i < N; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			rsp, err := c.Hello(context.Background(), &pb.HelloRequest{Msg: "world"})
			if err == nil && rsp != nil {
				atomic.AddInt64(&success, 1)
			} else {
				fmt.Printf("Request %d error: %v\n", i, err)
			}
		}(i)
	}
	wg.Wait()
	elapsed := time.Since(start)
	printMemStats("After requests")

	fmt.Println("\n------ Benchmark Summary ------")
	fmt.Printf("Total requests: %d\n", N)
	fmt.Printf("Success count:  %d\n", success)
	fmt.Printf("Total time:     %v\n", elapsed)
	fmt.Printf("Avg per call:   %v\n", elapsed/time.Duration(N))

	// 休眠3s
	time.Sleep(3 * time.Second)
}

// 打印内存状态
func printMemStats(label string) {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("\n=== %s ===\n", label)
	fmt.Printf("Alloc = %v KB\n", m.Alloc/1024)
	fmt.Printf("TotalAlloc = %v KB\n", m.TotalAlloc/1024)
	fmt.Printf("Sys = %v KB\n", m.Sys/1024)
	fmt.Printf("NumGC = %v\n", m.NumGC)
}

对于4核CPU的压测结果如下:
![[Pasted image 20250510184147.png]]

同样对于10000并发和30000并发,平均处理时间都在2至4ms左右,
而对于版本一,当并发量到达万级别时,已经处理不完了…笔者等了好长时间也没等到结果,tcp连接已经达到了机器极限,太多时间阻塞在连接的关闭和建立上

总结

该版本实际上与http1.1的特性有相似之处,同样是支持了长连接,但是同样会存在队头阻塞的情况,虽然是复用同一 TCP 连接发送多个请求,但是请求对于一个TCP的连接连接仍然是串行,必须等上一个请求完成,如果上一个请求耗时长就会阻塞了。所以后续版本可以考虑引入http2的多路复用的特性。同时也可以考虑实现rpc的异步发送和流式传输


网站公告

今日签到

点亮在社区的每一天
去签到