MySQL 连接池的实现

发布于:2024-04-06 ⋅ 阅读:(15) ⋅ 点赞:(0)

池化技术

  • 池化技术能够减少资源对象的创建次数,提高程序的响应性能,特别是在高并发下这种提高更明显。
  • 共同特征
    • 对象创建时间长。
    • 对象创建需要大量资源。
    • 对象创建后可被重复使用。

数据库连接池

  • 数据库连接池(Connection pooling)是程序启动时建立足够的数据库连接,并将这些连接组成一个连接池,由程序动态地对池中的连接进行申请,使用,释放。
  • 数据库连接:服务器需要跟数据库建立 TCP 连接
    • TCP 连接需要三次握手建立连接,然后服务器就可以通过这条连接进行收发数据了。当这条连接使用完毕后,需要通过四次挥手断开连接
    • 收发数据的规律:请求回应模式
    • 收发的数据要符合 MySQL 协议,发数据的时候要通过 MySQL 协议进行编码,编码为二进制发送给对端,收数据的时候要通过 MySQL 协议进行解码
    • 所有跟 MySQL 进行交互的服务器都需要安装 MySQL 驱动 → 具备编码和解码的能力。
      • libmysqlclient.lib/.dll(Windows)、libmysqlclient.a/.so(Linux)、mysql.h官方实现的驱动采用的是阻塞 IO:当 IO 未就绪的时候会阻塞线程,比如当协议栈中的接收缓冲区没有数据时,而 read 是把内核态的接收缓冲区中的数据拷贝到用户态来,那么执行 read 的线程就会发生阻塞
      • 自己实现 MySQL 协议的解码和编码。workflow、openresty 实现的是非阻塞 IO
  • MySQL 的网络模型
    • 有一个主线程,它会调用 select 监听 listenfd。(MySQL 会被绑定在 3306 端口上)
      • 使用 select 的原因:select 可以跨平台。
    • 只要有服务器与 MySQL 建立连接,select 就会响应一个读事件,MySQL 会为每一条连接分配一个线程;MySQL 最大连接数通常为 151。(my.cnf)
    • 在这个线程中有一个 while 循环,不断地去 read 数据,当界定出一个完整的数据包后,就会执行 SQL 的逻辑,最后把执行 SQL 的结果 write 给服务器。
      while (true) {
      	read
      	界定数据包
      	执行 SQL 的逻辑
      	write
      }
      
    • 当建立多条连接时,MySQL 内部会有多个线程并发执行 SQL 语句
  • 短连接:不复用连接,每次执行 SQL 语句都会建立连接、断开连接。
    • 优点:当数据库访问频次不高的时候,节省资源,每次用完就会销毁。
    • 缺点:当数据库访问频次高的时候,会不断地创建资源、销毁资源。
  • 长连接:会维持 TCP 连接,不让它断开,会复用这条连接去执行 SQL 语句。
    • 数据库连接既是 TCP 连接,也是长连接
    • 没有 SQL 语句执行,如何维持 TCP 连接 ?
      • TCP 中的 keepalive 选项只是保持服务器和 MySQL 的内核态网络协议栈是网络畅通的。
      • 通过定时发送心跳的方式维持 TCP 连接:服务器发送一个数据包到达 MySQL,MySQL 通过系统调用取出数据包,然后再回一个数据包到达服务器,服务器通过系统调用取出数据包。调用 mysql_pingmysql_pong 接口。
      • 定时发送心跳与 keepalive 区别:定时发送心跳除了可以探知连接是否可用,还可以确认处理 SQL 语句的线程是否活跃、是否发生阻塞。
        在这里插入图片描述

同步连接和异步连接

  • 同步连接:同步等待连接的返回,会阻塞当前线程。
  • 异步连接
    • 通过回调函数去处理返回结果,异步获取连接的返回。
      void callback(SQLResult &res) {
      
      }
      DBImpl->AsyncQuery(sqlstr, callback);
      ...
      // 一段时间后执行
      callback(res);
      
    • 官方实现的 MySQL 驱动采用的是阻塞 IO,底层使用的是同步连接,需要占用一个线程;为了不阻塞当前线程,那么就需要另启线程去等待返回结果,这样一来阻塞的是另起的线程,而不是当前线程,从而实现异步连接。

同步连接池

  • 当前线程从连接池(线程安全)中获取可用连接(未被锁定的连接)。
  • 同步连接池的大小:最多允许几个线程同时使用连接
  • 应用:服务器启动时,初始化资源
  • 每一条连接都对应一把锁
    在这里插入图片描述

异步连接池

  • 任意线程向连接池投递执行 SQL 语句的请求,连接池依次从队列中取出任务执行。
  • 异步连接池的大小:最多允许几个连接同时执行 SQL 语句
  • 应用:服务器启动后,业务处理
  • 每一条连接都对应一个线程
  • 请求和回应如何对应 ?
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

MySQL 连接驱动使用

  • sudo apt install libmysqlclient-dev

  • 加载 mysql.h 以及动态库或静态库。

  • 初始化连接驱动 mysql_library_init

  • 使用 MySQL 连接驱动与 MySQL 进行交互(执行 SQL 语句)。

  • 释放连接资源 mysql_library_end

  • 裸 SQL 语句每一条 SQL 语句都需要进行词法句法分析、制定执行计划

    void Execute(char const* sql);
    
  • 预处理 SQL 语句只需要执行一次词法句法分析、制定执行计划,下次再执行相同的 SQL 语句时,会直接将执行计划放到存储引擎中执行,效率更高

    void Execute(PreparedStatement<T>* stmt);
    
  • 同步连接池和异步连接池跟具体的数据库绑定。


接口封装

  • 同步接口使用
    • DirectExecute 不需要结果。
    • Query 需要结果。
  • 异步接口使用
    • Execute 不需要结果。
    • AsyncExecute 需要结果。
    • DelayQueryHolder 异步执行多个 SQL 语句。
    • AsyncCallbackProcessor.h 封装了异步获取结果的接口。
      • AddCallback 保存 callback 以及 future
      • 请求线程循环检测是否收到数据库的返回,并进行处理(执行回调函数)ProcessReadyCallbacks
        在这里插入图片描述
  • 异步接口封装
    • chain责任链模式
    • delay(holder)pipline 模式,异步执行多个 SQL 语句。
    • transaction 事务模式
void ProcessReadyCallbacks()
{
     if (_callbacks.empty())
         return;

     std::vector<T> updateCallbacks{ std::move(_callbacks) };

     updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback)
     {
         return callback.InvokeIfReady();
     }), updateCallbacks.end());

     _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
}
bool QueryCallback::InvokeIfReady()
{
    QueryCallbackData& callback = _callbacks.front();
    auto checkStateAndReturnCompletion = [this]()
    {
        _callbacks.pop();
        bool hasNext = !_isPrepared ? _string.valid() : _prepared.valid();
        if (_callbacks.empty())
        {
            ASSERT(!hasNext);
            return true;
        }

        // abort chain
        if (!hasNext)
            return true;

        ASSERT(_isPrepared == _callbacks.front()._isPrepared);
        return false;
    };

    if (!_isPrepared)
    {
        if (_string.valid() && _string.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
        {
            QueryResultFuture f(std::move(_string));
            std::function<void(QueryCallback&, QueryResult)> cb(std::move(callback._string));
            cb(*this, f.get());
            return checkStateAndReturnCompletion();
        }
    }
    else
    {
        if (_prepared.valid() && _prepared.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
        {
            PreparedQueryResultFuture f(std::move(_prepared));
            std::function<void(QueryCallback&, PreparedQueryResult)> cb(std::move(callback._prepared));
            cb(*this, f.get());
            return checkStateAndReturnCompletion();
        }
    }

    return false;
}


网站公告

今日签到

点亮在社区的每一天
去签到