DruidDataSource 封clickhouse实现数据操作

发布于:2024-10-26 ⋅ 阅读:(71) ⋅ 点赞:(0)

根据配置的服务器信息生成ClickHouse集群连接池

import cn.hutool.core.util.StrUtil;
import com.cloudwise.dcim.clickhouse.utils.ClickHouseClusterPool;
import com.cloudwise.dcim.clickhouse.utils.ClickHouseConnectionPool;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 **/
@Configuration
public class ClickHouseDruidConfig {
    
    @Value("${chServers}")
    private String servers;
    
    @Value("${chHttpPort:18100}")
    private Integer port;
    
    @Value("${chUsername:default}")
    private String user;
    
    @Value("${chPassword:Rootmaster@777}")
    private String password;
    
    /**
     * 根据配置的服务器信息生成ClickHouse集群连接池
     * 该方法用于初始化一个ClickHouseClusterPool实例,该实例包含了配置中所有ClickHouse服务器的连接池
     * 主要用于提高ClickHouse数据库的连接效率和管理连接资源
     *
     * @return ClickHouseClusterPool 返回一个包含所有ClickHouse服务器连接池的集群连接池对象
     */
    @Bean("clickHouseClusterPool")
    public ClickHouseClusterPool generationClusterPool() {
        // 将配置的服务器字符串按逗号分割成字符串数组
        String[] ipArray = servers.split(StrUtil.COMMA);
        // 创建一个ClickHouse集群连接池对象用于存储各个服务器的连接池
        ClickHouseClusterPool clickHouseClusterPool = new ClickHouseClusterPool();
        // 遍历服务器IP数组
        for (String ip : ipArray) {
            // 拼接ClickHouse数据库的JDBC连接URL
            String clickHouseUrl = "jdbc:clickhouse://" + ip + StrUtil.COLON + port;
            // 根据拼接的URL、用户名和密码创建ClickHouse连接池
            ClickHouseConnectionPool connectionPool = new ClickHouseConnectionPool(clickHouseUrl, user, password);
            // 将创建的连接池添加到集群连接池中
            clickHouseClusterPool.add(connectionPool);
        }
        // 返回包含所有服务器连接池的集群连接池对象
        return clickHouseClusterPool;
    }

    
   
    
}

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 **/
public class ClickHouseClusterPool {
    private  static final List<ClickHouseConnectionPool> CLUSTER_POOL= new ArrayList<>();
    
    /**
     * 随机获取一个ClickHouse连接池
     *
     * 本方法通过随机选择的方式从预定义的连接池集群中选取一个连接池,这样做的目的是为了均衡负载
     * 和提高系统的稳定性和可用性。通过这种方式,可以避免单点故障,并且使得对ClickHouse数据库
     * 的操作更加高效和可靠。
     *
     * @return ClickHouseConnectionPool 随机选择的ClickHouse连接池实例
     */
    public ClickHouseConnectionPool genClickHouseConnectionPool(){
        int index = new SecureRandom().nextInt(CLUSTER_POOL.size());
        return CLUSTER_POOL.get(index);
    }
    
    /**
     * ClickHouse连接池中添加一个新的连接池
     *
     * @param clickHouseConnectionPool 要添加的点击屋连接池对象
     */
    public void add(ClickHouseConnectionPool clickHouseConnectionPool){
        CLUSTER_POOL.add(clickHouseConnectionPool);
    }
    
    
}

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;
import java.sql.SQLException;

/**
 **/
public class ClickHouseConnectionPool {
    private final DruidDataSource dataSource;
    
    public ClickHouseConnectionPool(String clickHouseUrl,String user,String paassword) {
         dataSource = new DruidDataSource();
    
        // 基本属性
        dataSource.setUrl(clickHouseUrl);
        dataSource.setUsername(user);
        dataSource.setPassword(paassword);
    
        // 配置初始化大小/最小/最大
        dataSource.setInitialSize(1);
        dataSource.setMinIdle(1);
        dataSource.setMaxActive(5);
    
        // 配置获取连接等待超时的时间
        dataSource.setMaxWait(600000);
    
        // 配置间隔多久进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        dataSource.setTimeBetweenEvictionRunsMillis(60000);
    
        // 配置一个连接在池中最小生存的时间,单位是毫秒
        dataSource.setMinEvictableIdleTimeMillis(300000);
    
        // 校验SQL,必要时会执行
        dataSource.setValidationQuery("SELECT 1");
        dataSource.setTestWhileIdle(true);
        dataSource.setTestOnBorrow(false);
        dataSource.setTestOnReturn(false);
    
        // 配置ClickHouse的Druid特有属性
        dataSource.setPoolPreparedStatements(false);
    
        try {
            dataSource.init();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
    
    /**
     * 获取数据库连接
     *
     * @return 数据库连接
     * @throws SQLException 如果无法获取连接,则抛出此异常
     */
    public Connection getConnection() throws SQLException {
        try {
            // 通过数据源获取数据库连接
            return dataSource.getConnection();
        } catch (Exception e) {
            // 捕获异常并包装成SQLException抛出,以便调用者能够处理获取连接失败的情况
            throw new SQLException("Could not get a connection", e);
        }
    }
    
    /**
     * 归还数据库连接到连接池
     *
     * @param conn 要归还的数据库连接对象
     */
    public void returnConnection(Connection conn) {
        if (conn != null) {
            try {
                //归还连接到DruidDataSource
                conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    
}

SQL模版解析

import org.beetl.core.Configuration;
import org.beetl.core.GroupTemplate;
import org.beetl.core.Template;
import org.beetl.core.resource.StringTemplateResourceLoader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 **/
public class BeetlTemplateUtils {
    private static GroupTemplate gt;
    static {
        //初始化代码
        StringTemplateResourceLoader resourceLoader = new StringTemplateResourceLoader();
        Configuration cfg = null;
        try {
            cfg = Configuration.defaultConfiguration();
        } catch (IOException e) {
            e.printStackTrace();
        }
        gt = new GroupTemplate(resourceLoader, cfg);
    }
    
    
    /**
     * 根据给定的模板和参数渲染字符串
     *
     * @param template 模板名称,用于查找对应的模板
     * @param params 参数映射,包含需要替换在模板中的键值对
     * @return 渲染后的字符串
     */
    public static String render(String template, Map<String,? extends Object> params){
        // 获取模板
        Template t = gt.getTemplate(template);
        // 如果参数映射不为空且不为空,则遍历参数并将其绑定到模板中
        if(params != null && !params.isEmpty()){
            params.forEach((key,value)->{
                t.binding(key, value);
            });
        }
        // 返回渲染后的字符串
        return  t.render();
    }
    
    public static void main(String[] args) throws IOException {
        
        String template = "hello,${name}";
        Map<String,String> params = new HashMap<>();
        params.put("name","Lucy");
        // 输出渲染后的字符串
        System.out.println(BeetlTemplateUtils.render(template,params));
   
    }

}

CK具体操作

import cn.hutool.json.JSONUtil;
import com.cloudwise.dcim.common.exception.BaseException;
import com.cloudwise.dcim.common.utils.SpringHoldUtil;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 **/
@Slf4j
public class ClickHouseQueryUtils {
    /**
     * 执行SQL查询,并返回查询结果的列表
     * 该方法直接返回查询结果的Map列表,不涉及实体类转换
     *
     * @param sql 查询的SQL语句
     * @return 包含查询结果的Map列表,每条记录作为一个Map对象
     * @throws BaseException 当查询过程中发生SQLException时,抛出自定义的BaseException
     *
     */
    public static List<Map<String,Object>> querySelect(String sql){
        ClickHouseConnectionPool connectionPool =  SpringHoldUtil.getBean(ClickHouseClusterPool.class).genClickHouseConnectionPool();
        Connection connection = null;
        //输入动态参数
        //PreparedStatement preparedStatement = null;
        List<Map<String, Object>> dataMap = null;
        
        try {
            connection = connectionPool.getConnection();
            //preparedStatement = connection.prepareStatement() ;
            sql = BeetlTemplateUtils.render(sql,null);
            try(Statement statement = connection.createStatement()){
                long currentTime = System.currentTimeMillis();
                try (ResultSet resultSet = statement.executeQuery(sql)) {
                    long endTime = System.currentTimeMillis();
                    //大于10秒是慢SQL
                    if(endTime-currentTime>10000){
                        log.error("CK慢SQL用时{}毫秒SQL语句:{}",(endTime-currentTime),sql);
                    }
                    // 处理结果集
                    dataMap = convert(resultSet);
                }
            }
        }catch (SQLException e){
            throw  new BaseException(e);
        }finally {
            if(connection!=null) {
                connectionPool.returnConnection(connection);
            }
        }
        return  dataMap;
    }

    /**
     * 根据SQL模板和参数查询数据
     *
     * @param sqlTemplate SQL模板,使用Beetl模板语法编写
     * @param params 参数映射,键为模板中的变量名,值为变量的值
     * @return 查询结果列表,每个结果为一个键值对映射
     *
     * 本方法通过BeetlTemplateUtils工具类渲染SQL模板,将params参数中的键值对替换到sqlTemplate中,
     * 生成最终的SQL查询语句,然后执行该查询语句,返回查询到的数据
     *
     */
    public static List<Map<String,Object>> querySelect(String sqlTemplate, Map<String,? extends Object> params){
        // 渲染SQL模板,生成最终的SQL查询语句
        String sql = BeetlTemplateUtils.render(sqlTemplate,params);
        // 执行SQL查询,并返回查询结果
        return querySelect(sql);
    }
    /**
     * 根据提供的SQL语句和实体类类型,查询数据库并转换结果为指定实体类的列表
     * 此方法用于执行查询操作,而不直接返回数据库查询结果集,而是将结果集映射为实体对象列表
     * 它利用了JSONUtil库来实现从数据库查询结果到实体类实例列表的转换
     *
     * @param sql 查询数据库的SQL语句
     * @param cls 实体类的Class对象,用于将查询结果转换为目标实体类列表
     * @param <T> 实体类类型
     * @return 返回类型为T的实体类对象列表如果查询结果为空或转换失败,则返回null
     */
    public static  <T> List<T> querySelect(String sql,Class<T> cls) {
        // 执行SQL查询,返回结果集为List<Map<String, Object>>类型
        List<Map<String, Object>> dataMap =  querySelect(sql);
        
        // 检查查询结果是否非空且不为空列表
        if(dataMap!=null&&!dataMap.isEmpty()) {
            // 将查询结果Map列表转换为JSON字符串
            String jsonData = JSONUtil.toJsonStr(dataMap);
            // 将JSON字符串转换为指定实体类的列表并返回
            return  JSONUtil.toList(jsonData, cls);
        }
        // 如果查询结果为空或转换过程出错,则返回null
        return  null;
    }

    
    /**
     * 使用模板和参数执行SQL查询,并将结果映射到指定的实体类列表中
     * 该方法使用Beetl模板引擎将sqlTemplate与params中的参数合并生成SQL语句,
     * 然后执行该SQL语句,并将结果集转换为由泛型类型T指定的实体类列表
     *
     * @param sqlTemplate SQL模板,可以包含Beetl模板语法,用于动态生成SQL
     * @param params 参数映射,键为模板中的参数名称,值为参数的实际值
     * @param cls 实体类类型,用于将查询结果映射到该类型对象
     * @param <T> 泛型类型,表示实体类类型
     * @return 包含查询结果的实体类对象列表如果查询结果为空或没有匹配的记录,则返回空列表
     */
    public static <T> List<T> querySelect(String sqlTemplate, Map<String,? extends Object> params,Class<T> cls){
        // 根据SQL模板和参数,利用Beetl模板引擎渲染生成最终的SQL语句
        String sql = BeetlTemplateUtils.render(sqlTemplate,params);
        // 调用重载的querySelect方法,执行SQL查询,并将结果映射到指定的实体类
        return querySelect(sql,cls);
    }

    
    /**
     * 将数据库查询结果集转换为包含列-值对的列表
     * 此方法用于将 JDBC ResultSet 对象转换为 List<Map<String, Object>> 形式
     * 这种形式更方便在应用程序中处理和访问查询结果
     *
     * @param rs ResultSet 对象,包含数据库查询结果
     * @return 一个 List 对象,其中每个元素是一个 Map,表示查询结果的一行,键为列名,值为该列的值
     * @throws SQLException 如果处理 ResultSet 过程中出现错误,将抛出此异常
     */
    private static List<Map<String, Object>> convert(ResultSet rs) throws SQLException {
        // 获取结果集的元数据,元数据包含表结构的信息,如列名和数据类型
        ResultSetMetaData md = rs.getMetaData();
        // 获取结果集的列数
        int columns = md.getColumnCount();
        // 初始化一个列表,用于存储查询结果的每一行的数据
        List<Map<String, Object>> list = new ArrayList<>();
        // 遍历结果集的每一行,直到 rs.next() 返回 false 为止
        while (rs.next()) {
            // 初始化一个映射,用于存储当前行的列名和值对
            Map<String, Object> row = new HashMap<>(columns);
            // 遍历当前行的每一列,将列名和值存入映射
            for (int i = 1; i <= columns; ++i) {
                // 调用 rs.getObject 方法获取当前列的值,并以列名为键存入映射
                row.put(md.getColumnName(i), rs.getObject(i));
            }
            // 将当前行的映射添加到列表中
            list.add(row);
        }
        // 返回存储查询结果的列表
        return list;
    }

}