池式结构之连接池

发布于:2025-08-18 ⋅ 阅读:(18) ⋅ 点赞:(0)

一、初识MySQL连接池

问:什么是数据库连接池?

答:维持管理一定数量连接的池式结构。

问:他解决了什么问题?

答:复用资源,而且提升了MySQl并发处理sql的能力。因为一次性建立多个连接,在MySQL内部也会创建多个线程对应多个连接,相比较一个连接的一个线程,并发度更高。

问:同步连接池和异步连接池的区别?

答:同步连接池:当服务端核心业务线程发起MySQL用户请求,该线程被阻塞。遍历同步连接池所有连接找到未加锁的连接,给他加锁然后执行SQL。收到答案后,解锁改连接并且返回结果,唤醒线程。应用场景:服务器刚刚启动,还未对外提供连接的时候,利用同步连接池初始化资源。

异步连接池:解决核 心业务线程阻塞问题,需要先实现一个线程池。阻塞的将是线程池线程,而非核心业务线程。线程池线程接收到返回结果后,通过future和promise机制将返回值给到核心业务线程。

 

问:MySQL官方提供的 c/c++驱动(接口库)需要实现哪些内容?(方便服务器发送用户请求)

答:connect、recv、send、read、write等(都是阻塞IO的实现方法),并且需要实现一个mysql协议(确定如何解决粘包问题,数据包首部加长度或者用特殊字符分隔包)。

二、代码思路

     0.对于传入的数据库名称“db1”创建一个唯一的连接池对象,并且通过map对映。初始化连接池(创建任务队列对象,创建pool_size个MySQLConn连接对象,并且创建MySQLWorker对象和连接对象绑定,启动工作线程等待阻塞队列),通过连接对象的Open()建立物理连接。

  1. 用户通过MySQLConnPool::Query发起查询sql
  2. sql的操作对象SQLOperation被创建,通过GetFuture(),使操作对象关联一个future后,把该操作对象放入BlockingQueue
  3. MySQLWorker工作对象从队列获取操作对象,通过线程绑定的连接,执行sql并且将结果存入promise,通过promise.set_value()将结果传递给关联的future

  4. AsyncProcessor通过future_wait_for发现future已就绪,调用用户代码层传入的回调函数处理结果。

双重等待:

小范围:工作对象在等操作对象被创建。创建后工作对象执行SQL,拿到promise值,传给future.

大范围:回调对象在等future值。

三、代码实现

1.连接池对象

//MySQLConnPool.cpp

#include "MySQLConnPool.h"
#include "MySQLConn.h"
#include "SQLOperation.h"
#include "QueryCallback.h"
#include <cppconn/resultset.h>
#include "BlockingQueue.h"

std::unordered_map<std::string, MySQLConnPool *> MySQLConnPool::instances_;

MySQLConnPool *MySQLConnPool::GetInstance(const std::string &db) {
    if (instances_.find(db) == instances_.end()) {
        instances_[db] = new MySQLConnPool(db);
    }
    return instances_[db];
}

void MySQLConnPool::InitPool(const std::string &url, int pool_size) {
    task_queue_ = new BlockingQueue<SQLOperation *>();
    for (int i = 0; i < pool_size; ++i) {
        MySQLConn *conn = new MySQLConn(url, database_, *task_queue_);
        conn->Open();
        pool_.push_back(conn);
    }
}

MySQLConnPool::~MySQLConnPool() {
    if (task_queue_)
        task_queue_->Cancel();
    for (auto conn : pool_) {
        delete conn;
    }
    if (task_queue_) {
        delete task_queue_;
        task_queue_ = nullptr;
    }
    pool_.clear();
}

第二个参数是用户传入的回调函数   在future有值后执行
QueryCallback MySQLConnPool::Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) {
    SQLOperation *op = new SQLOperation(sql);
    auto future = op->GetFuture();
    task_queue_->Push(op);
    return QueryCallback(std::move(future), std::move(cb));
}

2.连接对象

//MySQLConn.cpp

#include "MySQLConn.h"
#include "QueryCallback.h"
#include "MySQLWorker.h"
#include "BlockingQueue.h"

#include <cppconn/driver.h>
#include <cppconn/connection.h>
#include <cppconn/exception.h>
#include <cppconn/statement.h>
#include <cppconn/resultset.h>

#include <vector>
#include <string>

// "tcp://127.0.0.1:3306;root;123456"
static std::vector<std::string_view>
Tokenize(std::string_view str, char sep, bool keepEmpty)
{
    //划分上面的指令
}

MySQLConnInfo::MySQLConnInfo(const std::string &info, const std::string &db)
{
    auto tokens = Tokenize(info, ';', false);
    if (tokens.size() != 3)
        return;

    url.assign(tokens[0]);
    user.assign(tokens[1]);
    password.assign(tokens[2]);
    database.assign(db);
}

MySQLConn::MySQLConn(const std::string &info, const std::string &db, BlockingQueue<SQLOperation *> &task_queue)
  : info_(info, db)
{
    worker_ = new MySQLWorker(this, task_queue);//创建工作对象  并且和this指向的当前连接对象绑定
    worker_->Start();
}

MySQLConn::~MySQLConn()
{
    if (worker_) {
        worker_->Stop();
        delete worker_;
        worker_ = nullptr;
    }

    if (conn_) {
        delete conn_;
    }
}

int MySQLConn::Open()
{
    int err = 0;
    try {
        driver_ = get_driver_instance();
        conn_ = driver_->connect(info_.url, info_.user, info_.password);
        if (!conn_) {
            return -1;
        }
    
        conn_->setSchema(info_.database);
    } catch (sql::SQLException &e) {
        HandlerException(e);
        err = e.getErrorCode();
    }
    return err;
}

void MySQLConn::Close()
{
    if (conn_) {
        conn_->close();
        delete conn_;
        conn_ = nullptr;
    }
}

sql::ResultSet* MySQLConn::Query(const std::string &sql)
{//底层的执行
    try {
        sql::Statement *stmt = conn_->createStatement();//MYSQL原生的api
        return stmt->executeQuery(sql);
    } catch (sql::SQLException &e) {
        HandlerException(e);
    }
    return nullptr;
}

void MySQLConn::HandlerException(sql::SQLException &e)
{
    if (e.getErrorCode() != 0)
    {
        std::cerr << "# ERR: SQLException in " << __FILE__;
        std::cerr << "(" << __FUNCTION__ << ") on line " << __LINE__ << std::endl;
        std::cerr << "# ERR: " << e.what();
        std::cerr << " (MySQL error code: " << e.getErrorCode();
        std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl;
    }
}

3.工作对象

职责:拿到操作对象后,执行SQL,将结果存入promise

//MySQLWorker.cpp
#include "MySQLWorker.h"

#include "BlockingQueue.h"
#include "SQLOperation.h"
#include "MySQLConn.h"

MySQLWorker::MySQLWorker(MySQLConn *conn, BlockingQueue<SQLOperation *> &task_queue)
    : conn_(conn), task_queue_(task_queue)
{
}

MySQLWorker::~MySQLWorker()
{
    Stop();
}

void MySQLWorker::Start()//start一次 创建一个线程
{
    worker_ = std::thread(&MySQLWorker::Worker, this);//this表示该线程可以执行工作对象所有函数 比如下面的Worker执行函数
}

void MySQLWorker::Stop()
{
    if (worker_.joinable()) {
        worker_.join();
    }
}

void MySQLWorker::Worker() {
    while (true) {
        SQLOperation *op = nullptr;
        if (!task_queue_.Pop(op)) {
            break;
        }
        op->Execute(conn_);
        delete op;
    }
}

4.sql操作对象

//SQLOperation.cpp
#include "SQLOperation.h"
#include "MySQLConn.h"

void SQLOperation::Execute(MySQLConn *conn)
{
    auto result = conn->Query(sql_);    走连接对象的底层的查询
    
    把promise的值传给future
    promise_.set_value(std::unique_ptr<sql::ResultSet>(result));
}

5.回调管理对象

//AsyncProcessor.cpp

#include "AsyncProcessor.h"
#include "QueryCallback.h"

把用户调用query后生成的回调对象移动到管理对象内部的vertor中管理
void AsyncProcessor::AddQueryCallback(QueryCallback &&query_callback)
{
    pending_queries_.emplace_back(std::move(query_callback));
}

检测vector集合中的回调对象是否有就绪的
void AsyncProcessor::InvokeIfReady()
{
    for (auto it = pending_queries_.begin(); it != pending_queries_.end();)
    {
        if (it->InvokeIfReady())
            it = pending_queries_.erase(it);
        else
            ++it;
    }
}

6.回调对象

//QueryCallback.h
#pragma once

#include <future>
#include <functional>
#include <memory>
#include <cppconn/resultset.h>

namespace sql    //MYSQL提供
{
    class ResultSet;
}

class QueryCallback {
public:
    QueryCallback(std::future<std::unique_ptr<sql::ResultSet>> &&future, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb)
        : future_(std::move(future)), cb_(std::move(cb))
    {
    }

    检测future值 判断是否就绪
    bool InvokeIfReady() {
        if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
            cb_(std::move(future_.get()));  执行用户回调
            return true;
        }
        return false;
    }
private:
    std::future<std::unique_ptr<sql::ResultSet>> future_;
    std::function<void(std::unique_ptr<sql::ResultSet>)> cb_;
};


网站公告

今日签到

点亮在社区的每一天
去签到