标题trino jdbc使用代理用户查询数据
使用代理用户,可以很好的进行权限管控
1.使用trino-jdbc
package com.xxx;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.xxx.config.DataSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* @author sanmuchen
* @date 2024/09/13 14:43
*/
public class TrinoClient {
private static final Logger logger = LoggerFactory.getLogger(TrinoClient.class);
private static final Cache<String, Connection> CONNECTION_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.SECONDS)//设置过期时间
.removalListener(notification -> { //过期行为:关闭连接清理缓存
Object key = notification.getKey();
Connection removedConnection = (Connection) notification.getValue();
try {
if (removedConnection != null) {
removedConnection.close(); // 当连接从缓存中移除时关闭连接
}
logger.info("user {}'s connection {} have expired and already closed.", key, removedConnection);
} catch (SQLException e) {
// 处理关闭异常
logger.error("user {}'s connection close failed, exception: {}", key, e.getMessage());
}
})
.maximumSize(10000).build();
private static volatile TrinoClient trinoClient;
@Autowired
private DataSourceConfig dataSourceConfig;
@Bean
public static TrinoClient getInstance() {
if (trinoClient == null) {
synchronized (TrinoClient.class) {
if (trinoClient == null) {
trinoClient = new TrinoClient();
}
}
}
return trinoClient;
}
public Connection getConnection(String userName) throws SQLException {
logger.info("Start to get a connection for user : {}", userName);
Connection connection = CONNECTION_CACHE.getIfPresent(userName);
if (connection == null || connection.isClosed()) {
synchronized (CONNECTION_CACHE) {
if (connection == null || connection.isClosed()) {
logger.info("cache connection is not available ");
connection = createAndSaveConnection(userName);
}
}
} else {
try (ResultSet resultSet = connection.getMetaData().getSchemas(connection.getCatalog(), "%")) {
logger.info("connection is still available : {} ", resultSet.getMetaData());
logger.info("continue use old connect, connection:{}", connection);
} catch (Exception e) {
logger.info("connection is not available ");
connection = createAndSaveConnection(userName);
}
}
return connection;
}
public Connection createAndSaveConnection(String userName) throws SQLException {
logger.info("start to create a new connection for user '{}'", userName);
Properties properties = new Properties();
String username = "daili";//代理用户名
String password = "mima";//密码
String url = "jdbc:trino://trino-xxx.cn:443";
String clientInfo = "xx";
properties.setProperty("user", username);
properties.setProperty("password", password);
properties.setProperty("sessionUser", userName);
properties.setProperty("SSL", "true");
properties.setProperty("clientInfo", clientInfo);
Connection connection = DriverManager.getConnection(url, properties);
CONNECTION_CACHE.put(userName, connection);
logger.info("connection for user'{}' has been created and saved, {}", userName, connection);
return connection;
}
}
2.使用clientSession
需求:
使用trino查询hive,clickhouse,paimon,mysql等,并使用trino自带的权限系统进行管控。
package com.xxx;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.xxx.config.TrinoConfig;
import io.trino.jdbc.$internal.airlift.units.Duration;
import io.trino.jdbc.$internal.client.*;
import io.trino.jdbc.$internal.okhttp3.OkHttpClient;
import io.trino.jdbc.$internal.okhttp3.internal.connection.ConnectInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.URI;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TrinoClient {
@Autowired
private TrinoConfig trinoConfig;
private static volatile TrinoClient trinoClient;
public static TrinoClient getInstance() {
if (trinoClient == null) {
synchronized (TrinoClient.class) {
if (trinoClient == null) {
trinoClient = new TrinoClient();
}
}
}
return trinoClient;
}
private static final Cache<String, ClientSession> CLIENT_SESSION_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(600, TimeUnit.SECONDS)
.maximumSize(10000).build();
private final OkHttpClient okHttpClient = new OkHttpClient.Builder()
//此处使用trino代理用户名和密码
.addInterceptor(OkHttpUtil.basicAuth(trinoConfig.getUsername(), trinoConfig.getPassword()))
.connectTimeout(60L, TimeUnit.SECONDS)
.readTimeout(60L, TimeUnit.SECONDS)
.build();
public ClientSession borrowTrinoClientSession(String user, String catalog, String schema) {
ClientSession trinoClientSession = CLIENT_SESSION_CACHE.getIfPresent(user);
if (Objects.isNull(trinoClientSession)) {//创建一个新的
trinoClientSession = createTrinoClientSession(user);
}
ClientSession newClientSession = ClientSession.builder(trinoClientSession)
.withCatalog(catalog)
.withSchema(schema)
.build();
CLIENT_SESSION_CACHE.put(user, newClientSession);
return newClientSession;
}
//此处使用真实用户去执行
public List<Object> executeSql(String user, String sqlCode, String catalog, String schema) {
ClientSession trinoClientSession = borrowTrinoClientSession(user, catalog, schema);
List<Object> result = new ArrayList<>();
try (StatementClient statement = StatementClientFactory.newStatementClient(okHttpClient, trinoClientSession, sqlCode)) {
while (statement.isRunning()) {
Iterable<List<Object>> data = statement.currentData().getData();
if (Objects.nonNull(data)) {
for (List<Object> datum : data) {
result.add(datum);
}
}
statement.advance();
}
}
return result;
}
private ClientSession createTrinoClientSession(String user) {
String catalog = " ";
String schema = " ";
String url = "https://trino-xxx.cn:443/";
// String url = trinoConfig.getUrl();
URI uri = URI.create(url);
String principal = null;
String source = "global";
Optional<String> traceToken = Optional.empty();
Set<String> clientTags = Collections.emptySet();
String clientInfo = "xxx";
String path = null;
ZoneId timeZonId = TimeZone.getDefault().toZoneId();
Locale locale = Locale.getDefault();
Map<String, String> resourceEstimates = Collections.emptyMap();
Map<String, String> properties = Collections.emptyMap();
Map<String, String> preparedStatements = Collections.emptyMap();
Map<String, ClientSelectedRole> roles = Collections.emptyMap();
Map<String, String> extraCredentials = Collections.emptyMap();
String transactionId = null;
Duration clientRequestTimeout = new Duration(0, TimeUnit.MILLISECONDS);
return new ClientSession(uri, principal, Optional.of(user), source, traceToken, clientTags, clientInfo, catalog, schema,
path, timeZonId, locale, resourceEstimates, properties, preparedStatements, roles, extraCredentials,
transactionId, clientRequestTimeout, true);
}
public static void main(String[] args) {
TrinoClient trinoClient = TrinoClient.getInstance();
String sqlCode = "show catalogs";
String sqlCode1 = "show schemas";
String sqlCode2 = "show tables";
System.out.println("-------------------0");
List<Object> result = trinoClient.executeSql("user", sqlCode, "", "");
result.forEach(System.out::println);
System.out.println("-------------------1");
List<Object> result1 = trinoClient.executeSql("user", sqlCode1, "hive", "");
result1.forEach(System.out::println);
System.out.println("-------------------2");
List<Object> result2 = trinoClient.executeSql("user", sqlCode2, "hive", "zone_ods");
result2.forEach(System.out::println);
}
}