redigo连接池源码解析

发布于:2022-11-09 ⋅ 阅读:(14) ⋅ 点赞:(0) ⋅ 评论:(0)

何为连接池

连接池是负责分配、管理和释放连接,它允许应用程序重复使用池中的空闲的连接,而不是每次都重新建立一个连接。 本质就是管理了一堆长链接,提供给需求方相应的句柄使用。

连接池有何用

  • 减少网络io开销
    减少了每次连接三次握手和四次挥手的开销。连接复用,自然减少了创建,关闭套接字等流程。提升系统性能

  • 控制资源
    如果没有连接池管理,如每次请求,协程都创建一个连接,那么当请求量巨大时,产生非常大的浪费并且可能会导致高负载下的异常发生,最终导致所有服务都不可用。这就是为什么很多存储都会有一层proxy来管理,不让业务服务直接和存储连接。

  • 简化编程
    使用者只需关心如何获取和返回的方法,无需关心底层连接、避免资源泄漏等问题

redigo是如何实现v1.8.4

首先redigo不支持cluster,作者也不打算支持,所以建议还是选择go-redis

package main
import (
	"fmt"
	red "github.com/gomodule/redigo/redis"
	"time"
)

type Redis struct {
	pool *red.Pool
}

var redis *Redis
func Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
	con := redis.pool.Get()
	// connct
	if err := con.Err(); err != nil {
		return nil, err
	}
	defer con.Close()
	parmas := make([]interface{}, 0)
	parmas = append(parmas, key)

	if len(args) > 0 {
		for _, v := range args {
			parmas = append(parmas, v)
		}
	}
	return con.Do(cmd, parmas...)
}

func initRedis() {
	redis = new(Redis)
	redis.pool = &red.Pool{
		MaxIdle:     2, //空闲数
		IdleTimeout: 240 * time.Second,
		MaxActive:   0, //最大数
		Dial: func() (red.Conn, error) {
			c, err := red.Dial("tcp", "127.0.0.1:6379")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c red.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}
}

func main() {
	initRedis()
	Exec("set", "dandy", "hello")
	result, err := Exec("get", "dandy")
	if err != nil {
		fmt.Print(err.Error())
	}
	str, _ := red.String(result, err)
	fmt.Print(str)
	redis.pool.Close()
}

初始化redis.pool

type Pool struct {
	// Dial conn中Dial调用初始化
	Dial func() (Conn, error)
	// 带有context的Dial,2选1即可
	DialContext func(ctx context.Context) (Conn, error)
	// 获取连接池中,校验连接是否可用,一般和PING、PONG使用
	TestOnBorrow func(c Conn, t time.Time) error
	// 连接池中最大空闲数
	MaxIdle int
	// 连接池中保持活跃的数,0没有限制
	MaxActive int
	// 空闲检查时间
	IdleTimeout time.Duration
	// wait设置为true并且pool中活跃数到达设置的最大值,直到连接池中有可用连接,get()才返回 
	Wait bool
	//  设置连接最大存活时间 0无限制
	MaxConnLifetime time.Duration
	// 统计、队列等使用
	mu           sync.Mutex    // mu protects the following fields
	closed       bool          // set to true when the pool is closed.
	active       int           // the number of open connections in the pool
	initOnce     sync.Once     // the init ch once func
	ch           chan struct{} // limits open connections when p.Wait is true
	idle         idleList      // idle connections
	waitCount    int64         // total number of connections waited for.
	waitDuration time.Duration // total time waited for new connections.
}

Pool获取连接

Get获取

源码 pool.go
func (p *Pool) Get() Conn 
  • wait设置等待
select {
  case <-p.ch://当连接池满时,会阻塞等待,直到有空闲连接
    select {
    case <-ctx.Done():
      p.ch <- struct{}{}
      return 0, ctx.Err()
    default:
    }
  case <-ctx.Done():
    return 0, ctx.Err()
}

当pool中设置了Wait,当连接满时(p.ch获取不到数据),会等待直到池中有空闲连接,就会通知ch

看activeConn.close()会调用Pool.put(),此时连接池将会有空闲连接,并且通知刚才等待的Wait ch

if p.ch != nil && !p.closed {
        // 通知等待ch
	p.ch <- struct{}{}
}
  • 空闲时间判断
if p.IdleTimeout > 0 {
        n := p.idle.count
        // 只需从尾部back判断即可验证是否过期
        // 如果过期,删除尾部,释放该连接,并且继续遍历
        for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.idle.back
            p.idle.popBack()
            p.mu.Unlock()
            pc.c.Close()
            p.mu.Lock()
            p.active--
        }
}

我们可以先大致先看内部连接池双向链表的管理点击跳转,也许这样你会很容易的理解。

这里idletimeout遍历整个链表,因为idle.back.t为最早插入的时间,所以只需要检查尾部back即可。

  • 从连接池头部后取空闲连接
for p.idle.front != nil {
    pc := p.idle.front
    // 取出头部
    p.idle.popFront()
    p.mu.Unlock()
    // 校验连接是否正常,一般我们设置回调ping取检验,
    // 自然每次都多了一次请求,性能消耗
    if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
      (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
      // 返回可用的连接
      return &activeConn{p: p, pc: pc}, nil
    }
    // 校验不通过,自然释放该连接
    pc.c.Close()
    p.mu.Lock()
    p.active--
}

因为上面的条件如果都校验成功,说明链表头部有数据,我们只需pop出来,之后返回activeConn,即我们成功后去了一个连接。注意,这里activeConn很关键,里头将最早初始化创建的p(pool)存入,并且将pc(poolConn)即从链表中取出的数据存起来。基本上ac(activeConn)涵盖了后续所有可以操作的数据。 详细pc我们可以看下面

  • 开始链表肯定是空的,如何获取连接
p.active++
p.mu.Unlock()
// 这里拨号,即调用我们一开始注册的回调pool中的Dial
// 这里就是创建线程池开始创建连接,即conn的管理
c, err := p.dial(ctx)
if err != nil {
    p.mu.Lock()
    p.active--
    if p.ch != nil && !p.closed {
        p.ch <- struct{}{}
    }
    p.mu.Unlock()
    // 返回错误连接
    return errorConn{err}, err
}
// pc中c为conn.go中conn的存储。
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil

这里就是一开始,我们调用dial创建连接,并返回activConn得到连接。后续我们会分析conn.go

pool 中链表put的存放

func (p *Pool) put(pc *poolConn, forceClose bool) error {
   p.mu.Lock()
   if !p.closed && !forceClose {
       pc.t = nowFunc()
       // 报存队列
       p.idle.pushFront(pc)
       // 超过设置,pop出时间有效时间最小的back连接
       if p.idle.count > p.MaxIdle {
           pc = p.idle.back
           p.idle.popBack()
       } else {
           pc = nil
       }
   }
   // back该连接不保存,直接关闭
   if pc != nil {
       p.mu.Unlock()
       pc.c.Close()
       p.mu.Lock()
       p.active--
   }
   // 上述以说明,配合wait设置使用
   if p.ch != nil && !p.closed {
       p.ch <- struct{}{}
   }
   p.mu.Unlock()
   return nil
}

当Pool.get获取的连接,并没有保存在连接池中,而是当activeConn.Close()时,才调用put,保存连接。至此,pool中核心功能都已准备完毕。


idleList连接池管理

  • pushFront链表存储

连接池插入.png

func (l *idleList) pushFront(pc *poolConn) {
    // 这里记住,idleList中front和back始终指向
    // 的是连接池中的头部和尾部
    // 1 新的pc尾指针指向链表头部
    pc.next = l.front
    pc.prev = nil
    if l.count == 0 {
        // 0.当连接池为空,头尾都指向改连接
        l.back = pc
    } else {
        // 2. 链表头前驱指针指向pc
        l.front.prev = pc
    }
    // 3.修改l中front指向为新插入的pc
    l.front = pc
    l.count++
}
  • popFront删除链表
    连接池删除.png

conn.go

  • 连接创建
// 调用net/dial.go库进行连接
netConn, err := do.dialContext(ctx, network, address)
if err != nil {
    return nil, err
}
c := &conn{
    // 暂时我们研究的是返回:TCPConn
    conn:         netConn,
    // bufio写的也很好,后续对其分析
    bw:           bufio.NewWriter(netConn),
    br:           bufio.NewReader(netConn),
    readTimeout:  do.readTimeout,
    writeTimeout: do.writeTimeout,
}
  • 之后的activeConn调用的Do方法就是调用conn中的Do
if cmd != "" {
    // RESP协议组包
    if err := c.writeCommand(cmd, args); err != nil {
        return nil, c.fatal(err)
    }
}
// bufio用法,里头Write为interface实际为TCPConn的操作
if err := c.bw.Flush(); err != nil {
    return nil, c.fatal(err)
}

var deadline time.Time
if readTimeout != 0 {
    deadline = time.Now().Add(readTimeout)
}
// read过期检测
if err := c.conn.SetReadDeadline(deadline); err != nil {
    return nil, c.fatal(err)
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
    var e error
    // 获取redis服务回包数据
    if reply, e = c.readReply(); e != nil {
        return nil, c.fatal(e)
    }
    if e, ok := reply.(Error); ok && err == nil {
        err = e
    }
}

Do方法调用的是DoWithTimeout,这里发起RESP协议组包,并发送数据给redis服务端,之后读取redis服务器返回的数据。


大家如果觉得有啥疑惑或者不正确,都可以在评论或者加微信(dandyhzh)一起谈论。