trino jdbc使用代理用户查询数据

发布于:2024-09-18 ⋅ 阅读:(143) ⋅ 点赞:(0)

标题trino jdbc使用代理用户查询数据

使用代理用户,可以很好的进行权限管控

1.使用trino-jdbc


package com.xxx;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.xxx.config.DataSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author sanmuchen
 * @date 2024/09/13 14:43
 */
public class TrinoClient {
    private static final Logger logger = LoggerFactory.getLogger(TrinoClient.class);

    private static final Cache<String, Connection> CONNECTION_CACHE = CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.SECONDS)//设置过期时间
            .removalListener(notification -> {  //过期行为:关闭连接清理缓存
                Object key = notification.getKey();
                Connection removedConnection = (Connection) notification.getValue();
                try {
                    if (removedConnection != null) {
                        removedConnection.close(); // 当连接从缓存中移除时关闭连接
                    }
                    logger.info("user {}'s connection {} have expired and already closed.", key, removedConnection);
                } catch (SQLException e) {
                    // 处理关闭异常
                    logger.error("user {}'s connection close failed, exception: {}", key, e.getMessage());
                }
            })
            .maximumSize(10000).build();

    private static volatile TrinoClient trinoClient;

    @Autowired
    private DataSourceConfig dataSourceConfig;

    @Bean
    public static TrinoClient getInstance() {
        if (trinoClient == null) {
            synchronized (TrinoClient.class) {
                if (trinoClient == null) {
                    trinoClient = new TrinoClient();
                }
            }
        }
        return trinoClient;
    }

    public Connection getConnection(String userName) throws SQLException {
        logger.info("Start to get a connection for user : {}", userName);

        Connection connection = CONNECTION_CACHE.getIfPresent(userName);
        if (connection == null || connection.isClosed()) {
            synchronized (CONNECTION_CACHE) {
                if (connection == null || connection.isClosed()) {
                    logger.info("cache connection is not available ");
                    connection = createAndSaveConnection(userName);
                }
            }
        } else {
            try (ResultSet resultSet = connection.getMetaData().getSchemas(connection.getCatalog(), "%")) {
                logger.info("connection is still available : {} ", resultSet.getMetaData());
                logger.info("continue use old connect, connection:{}", connection);
            } catch (Exception e) {
                logger.info("connection is not available ");
                connection = createAndSaveConnection(userName);
            }
        }
        return connection;
    }

    public Connection createAndSaveConnection(String userName) throws SQLException {
        logger.info("start to create a new connection for user '{}'", userName);
        Properties properties = new Properties();
        String username = "daili";//代理用户名
        String password = "mima";//密码
        String url = "jdbc:trino://trino-xxx.cn:443";
        String clientInfo = "xx";
        properties.setProperty("user", username);
        properties.setProperty("password", password);
        properties.setProperty("sessionUser", userName);
        properties.setProperty("SSL", "true");
        properties.setProperty("clientInfo", clientInfo);
        Connection connection = DriverManager.getConnection(url, properties);
        CONNECTION_CACHE.put(userName, connection);
        logger.info("connection for user'{}' has been created and saved, {}", userName, connection);
        return connection;
    }

}


2.使用clientSession

需求:
使用trino查询hive,clickhouse,paimon,mysql等,并使用trino自带的权限系统进行管控。

package com.xxx;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.xxx.config.TrinoConfig;
import io.trino.jdbc.$internal.airlift.units.Duration;
import io.trino.jdbc.$internal.client.*;
import io.trino.jdbc.$internal.okhttp3.OkHttpClient;
import io.trino.jdbc.$internal.okhttp3.internal.connection.ConnectInterceptor;
import org.springframework.beans.factory.annotation.Autowired;

import java.net.URI;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TrinoClient {

    @Autowired
    private TrinoConfig trinoConfig;
    
    private static volatile TrinoClient trinoClient;

    public static TrinoClient getInstance() {
        if (trinoClient == null) {
            synchronized (TrinoClient.class) {
                if (trinoClient == null) {
                    trinoClient = new TrinoClient();
                }
            }
        }
        return trinoClient;
    }

    private static final Cache<String, ClientSession> CLIENT_SESSION_CACHE = CacheBuilder.newBuilder()
            .expireAfterAccess(600, TimeUnit.SECONDS)
            .maximumSize(10000).build();


    private final OkHttpClient okHttpClient = new OkHttpClient.Builder()
    		 //此处使用trino代理用户名和密码
            .addInterceptor(OkHttpUtil.basicAuth(trinoConfig.getUsername(), trinoConfig.getPassword()))
            .connectTimeout(60L, TimeUnit.SECONDS)
            .readTimeout(60L, TimeUnit.SECONDS)
            .build();


    public ClientSession borrowTrinoClientSession(String user, String catalog, String schema) {

        ClientSession trinoClientSession = CLIENT_SESSION_CACHE.getIfPresent(user);

        if (Objects.isNull(trinoClientSession)) {//创建一个新的
            trinoClientSession = createTrinoClientSession(user);
        }

        ClientSession newClientSession = ClientSession.builder(trinoClientSession)
                .withCatalog(catalog)
                .withSchema(schema)
                .build();

        CLIENT_SESSION_CACHE.put(user, newClientSession);
        return newClientSession;
    }

	//此处使用真实用户去执行
    public List<Object> executeSql(String user, String sqlCode, String catalog, String schema) {
        ClientSession trinoClientSession = borrowTrinoClientSession(user, catalog, schema);
        List<Object> result = new ArrayList<>();
        try (StatementClient statement = StatementClientFactory.newStatementClient(okHttpClient, trinoClientSession, sqlCode)) {
            while (statement.isRunning()) {
                Iterable<List<Object>> data = statement.currentData().getData();
                if (Objects.nonNull(data)) {
                    for (List<Object> datum : data) {
                        result.add(datum);
                    }
                }
                statement.advance();
            }
        }
        return result;
    }

    private ClientSession createTrinoClientSession(String user) {
        String catalog = " ";
        String schema = " ";
        String url = "https://trino-xxx.cn:443/";
//        String url = trinoConfig.getUrl();
        URI uri = URI.create(url);
        String principal = null;
        String source = "global";
        Optional<String> traceToken = Optional.empty();
        Set<String> clientTags = Collections.emptySet();
        String clientInfo = "xxx";
        String path = null;
        ZoneId timeZonId = TimeZone.getDefault().toZoneId();
        Locale locale = Locale.getDefault();
        Map<String, String> resourceEstimates = Collections.emptyMap();
        Map<String, String> properties = Collections.emptyMap();
        Map<String, String> preparedStatements = Collections.emptyMap();
        Map<String, ClientSelectedRole> roles = Collections.emptyMap();
        Map<String, String> extraCredentials = Collections.emptyMap();
        String transactionId = null;
        Duration clientRequestTimeout = new Duration(0, TimeUnit.MILLISECONDS);

        return new ClientSession(uri, principal, Optional.of(user), source, traceToken, clientTags, clientInfo, catalog, schema,
                path, timeZonId, locale, resourceEstimates, properties, preparedStatements, roles, extraCredentials,
                transactionId, clientRequestTimeout, true);
    }

    public static void main(String[] args) {

        TrinoClient trinoClient = TrinoClient.getInstance();
        String sqlCode = "show catalogs";
        String sqlCode1 = "show schemas";
        String sqlCode2 = "show tables";

		System.out.println("-------------------0");
        List<Object> result = trinoClient.executeSql("user", sqlCode, "", "");
        result.forEach(System.out::println);

        System.out.println("-------------------1");
        List<Object> result1 = trinoClient.executeSql("user", sqlCode1, "hive", "");
       result1.forEach(System.out::println);

        System.out.println("-------------------2");
        List<Object> result2 = trinoClient.executeSql("user", sqlCode2, "hive", "zone_ods");
        result2.forEach(System.out::println);
    }

}


网站公告

今日签到

点亮在社区的每一天
去签到