ShardingSphere源码解析:跨库查询案例分析

发布于:2025-06-14 ⋅ 阅读:(26) ⋅ 点赞:(0)

引言

在分布式系统中,数据分片是解决数据规模增长的重要手段。ShardingSphere作为分布式数据库中间件,提供了强大的分片查询能力。本文将深入ShardingSphere源码,通过一个完整的查询示例,详细解析其如何从不同库中查询数据,特别关注广播表这一关键特性。

一、ShardingSphere查询处理流程概览

ShardingSphere的查询处理遵循经典的"解析→路由→执行→归并"四阶段模型:

  1. SQL解析:将SQL语句解析为抽象语法树(AST)
  2. SQL路由:根据分片规则确定SQL要访问的数据源和表
  3. SQL执行:并行执行路由后的SQL
  4. 结果归并:合并多数据源返回的结果集

下面通过一个完整的查询示例,深入源码分析这个过程。

二、完整查询示例:跨分片查询订单数据

假设我们有一个订单系统,按用户ID分片,配置如下:

# 分片规则配置
rules:
  - !SHARDING
    tables:
      t_order:
        actualDataNodes: ds${0..1}.t_order${0..1}
        databaseStrategy:
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: database-inline
        tableStrategy:
          standard:
            shardingColumn: order_id
            shardingAlgorithmName: table-inline
      t_dict:
        broadcast: true  # 配置为广播表
    shardingAlgorithms:
      database-inline:
        type: INLINE
        props:
          algorithm-expression: ds${user_id % 2}
      table-inline:
        type: INLINE
        props:
          algorithm-expression: t_order${order_id % 2}

现在我们执行一个查询:

SELECT o.*, d.dict_name 
FROM t_order o 
JOIN t_dict d ON o.status = d.dict_code 
WHERE user_id = 1001 AND order_id > 2000 
LIMIT 10;

这个查询涉及分片表t_order和广播表t_dict的关联,让我们深入源码,看看ShardingSphere如何处理。

三、源码深度解析

1. SQL解析阶段

SQL解析是查询处理的第一步,ShardingSphere使用Antlr4构建的解析引擎将SQL转换为抽象语法树。

核心类与流程

// ShardingSphere-JDBC 5.3.2版本核心解析入口
public final class SQLParserEngine {
    
    private final DatabaseType databaseType;
    private final Map<String, SQLParser> parsers;
    
    public SQLStatement parse(final String sql, final boolean useCache) {
        // 根据数据库类型选择解析器
        SQLParser sqlParser = SQLParserFactory.newInstance(databaseType, sql);
        
        // 解析SQL语句,生成AST
        SQLVisitorRule sqlVisitorRule = SQLVisitorRuleEngine.getSQLVisitorRule(databaseType, sqlParser.getSQLStatementType());
        SQLVisitor<Object> visitor = SQLVisitorFactory.newInstance(databaseType, sqlVisitorRule, sqlParser.getParseTree(), parsers);
        
        // 返回解析后的SQL语句对象
        return (SQLStatement) sqlParser.getParseTree().accept(visitor);
    }
}

对于我们的查询,解析器会生成一个包含以下信息的SelectStatement对象:

  • 查询表名:t_order, t_dict
  • 查询条件:user_id = 1001 AND order_id > 2000
  • 连接条件:o.status = d.dict_code
  • 分页信息:LIMIT 10

2. SQL路由阶段

路由阶段是ShardingSphere的核心,它决定SQL将访问哪些数据源和表。特别关注广播表的处理逻辑。

核心类与流程

// 标准分片路由引擎
public final class StandardRoutingEngine implements RoutingEngine {
    
    private final ShardingRule shardingRule;
    private final TableRule tableRule;
    private final SQLStatementContext sqlStatementContext;
    
    @Override
    public RouteResult route(final List<Object> parameters) {
        // 1. 提取分片条件
        Collection<ShardingCondition> shardingConditions = createShardingConditions(parameters);
        
        // 2. 计算数据源分片
        Collection<String> routedDataSources = routeDataSources(shardingConditions);
        
        // 3. 计算表分片
        Collection<RouteUnit> routeUnits = new LinkedList<>();
        for (String each : routedDataSources) {
            routeUnits.addAll(routeTables(each, shardingConditions));
        }
        
        return new RouteResult(routeUnits);
    }
    
    // 广播表路由特殊处理
    private Collection<RouteUnit> routeBroadcastTable(final String logicTable, final String dataSource) {
        Collection<RouteUnit> result = new LinkedList<>();
        // 广播表在每个数据源都有完整副本
        // 直接使用逻辑表名作为物理表名
        result.add(new RouteUnit(new DataSourceName(dataSource), new TableUnit(logicTable, logicTable)));
        return result;
    }
    
    // 处理多表关联查询
    private Collection<RouteUnit> routeTables(final String routedDataSource, final Collection<ShardingCondition> shardingConditions) {
        Collection<RouteUnit> result = new LinkedList<>();
        
        // 遍历SQL中涉及的所有表
        for (String logicTable : sqlStatementContext.getTablesContext().getTableNames()) {
            if (isBroadcastTable(logicTable)) {
                // 广播表处理:直接路由到当前数据源
                result.addAll(routeBroadcastTable(logicTable, routedDataSource));
            } else {
                // 分片表处理
                TableRule tableRule = shardingRule.getTableRule(logicTable);
                ShardingAlgorithm tableShardingAlgorithm = tableRule.getTableShardingStrategy().getShardingAlgorithm();
                Collection<String> availableTargetNames = tableRule.getActualTables(routedDataSource);
                
                // 执行分片算法
                Collection<String> routedTables = tableShardingAlgorithm.doSharding(availableTargetNames, shardingConditions);
                
                // 创建路由单元
                for (String each : routedTables) {
                    result.add(new RouteUnit(new DataSourceName(routedDataSource), new TableUnit(logicTable, each)));
                }
            }
        }
        
        return result;
    }
    
    // 判断是否为广播表
    private boolean isBroadcastTable(final String tableName) {
        return shardingRule.getBroadcastTables().contains(tableName);
    }
}

对于我们的查询,路由过程如下:

分片表t_order的路由

  1. 提取分片键值:user_id=1001, order_id>2000
  2. 计算数据源:1001 % 2 = 1,路由到ds1
  3. 由于order_id是范围条件,无法精确计算,需要扫描ds1中的所有表:t_order0和t_order1

广播表t_dict的路由

  1. 识别t_dict为广播表
  2. 直接路由到当前数据源ds1,物理表名为t_dict

最终路由单元

  • ds1.t_order0, ds1.t_dict
  • ds1.t_order1, ds1.t_dict

3. SQL执行阶段

执行阶段负责将路由后的SQL发送到对应的数据源执行。

核心类与流程

// SQL执行引擎
public final class StatementExecutor {
    
    private final ConnectionMode connectionMode;
    private final ShardingRuntimeContext runtimeContext;
    private final List<Object> parameters;
    
    public <T> List<T> executeQuery(final ExecuteCallback<T> executeCallback) throws SQLException {
        // 1. 创建执行单元
        Collection<ExecutionUnit> executionUnits = createExecutionUnits();
        
        // 2. 执行查询
        return executeGroup(executionUnits, executeCallback);
    }
    
    private Collection<ExecutionUnit> createExecutionUnits() {
        Collection<ExecutionUnit> result = new LinkedList<>();
        // 遍历路由单元
        for (RouteUnit each : routeContext.getRouteUnits()) {
            // 创建物理SQL
            String sql = sqlRewriteContext.getSql();
            Map<String, Object> attributes = sqlRewriteContext.getAttributes();
            
            // 创建执行单元
            result.add(new ExecutionUnit(each.getDataSourceMapper().getActualName(), 
                new SQLUnit(sql, parameters, attributes)));
        }
        return result;
    }
    
    private <T> List<T> executeGroup(final Collection<ExecutionUnit> executionUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
        // 分组执行(按数据源分组)
        Map<String, List<SQLUnit>> sqlUnitGroups = groupSQLUnitsByDataSource(executionUnits);
        List<T> result = new ArrayList<>(sqlUnitGroups.size());
        
        // 并行执行各组SQL
        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            result.addAll(executeGroupInternal(entry.getKey(), entry.getValue(), executeCallback));
        }
        return result;
    }
    
    private <T> List<T> executeGroupInternal(final String dataSourceName, final List<SQLUnit> sqlUnits, 
                                            final ExecuteCallback<T> executeCallback) throws SQLException {
        // 获取数据源
        DataSource dataSource = runtimeContext.getDataSourceMap().get(dataSourceName);
        
        // 创建JDBC连接
        try (Connection connection = dataSource.getConnection()) {
            // 执行SQL并获取结果
            List<T> result = new ArrayList<>(sqlUnits.size());
            for (SQLUnit each : sqlUnits) {
                try (PreparedStatement preparedStatement = connection.prepareStatement(each.getSql())) {
                    // 设置参数
                    setParameters(preparedStatement, each.getParameters());
                    
                    // 执行查询
                    result.add(executeCallback.execute(preparedStatement));
                }
            }
            return result;
        }
    }
}

对于我们的查询,执行过程如下:

  1. 生成两个物理SQL:
    • SELECT o.*, d.dict_name FROM ds1.t_order0 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
    • SELECT o.*, d.dict_name FROM ds1.t_order1 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
  2. 从连接池获取ds1的连接
  3. 并行执行这两个SQL
  4. 获取两个结果集

4. 结果归并阶段

归并阶段负责将多个数据源返回的结果合并为一个统一的结果集。

核心类与流程

// 归并引擎
public final class MergeEngine {
    
    private final DatabaseType databaseType;
    private final SelectStatementContext selectStatementContext;
    private final List<QueryResult> queryResults;
    
    public QueryResult merge() throws SQLException {
        // 1. 创建归并器链
        List<ResultMerger> resultMergers = createResultMergers();
        
        // 2. 执行归并
        QueryResult result = queryResults.isEmpty() ? EmptyQueryResult.INSTANCE : queryResults.get(0);
        for (ResultMerger each : resultMergers) {
            result = each.merge(result, queryResults, selectStatementContext);
        }
        return result;
    }
    
    private List<ResultMerger> createResultMergers() {
        List<ResultMerger> result = new LinkedList<>();
        
        // 处理聚合函数
        if (!selectStatementContext.getProjectionsContext().getAggregationProjections().isEmpty()) {
            result.add(new AggregationDistinctMerger());
            result.add(new AggregationMerger());
        }
        
        // 处理分组
        if (!selectStatementContext.getGroupByContext().getItems().isEmpty()) {
            result.add(new GroupByMerger());
        }
        
        // 处理排序
        if (!selectStatementContext.getOrderByContext().getItems().isEmpty()) {
            result.add(new OrderByMerger());
        }
        
        // 处理分页
        if (null != selectStatementContext.getLimit()) {
            result.add(new LimitMerger());
        }
        
        return result;
    }
}

对于我们的查询,归并过程如下:

  1. 由于查询没有聚合函数和分组,直接进入排序阶段
  2. 假设查询没有指定ORDER BY,ShardingSphere会按主键排序
  3. 使用优先级队列对两个结果集进行归并排序
  4. 应用LIMIT 10,返回前10条记录

四、广播表的核心机制详解

1. 广播表的定义与用途

广播表是ShardingSphere中的一种特殊表,它在每个数据源中都有完整的副本。广播表适用于以下场景:

  • 数据量较小的字典表(如地区表、状态码表)
  • 经常与分片表进行关联查询的表
  • 数据变更不频繁的基础数据表

2. 广播表的源码实现

2.1 配置解析
// 分片规则配置解析
public final class ShardingRuleConfigurationYamlSwapper implements TypeBasedYamlSwapper<YamlShardingRuleConfiguration, ShardingRuleConfiguration> {
    
    @Override
    public ShardingRuleConfiguration swapToObject(final YamlShardingRuleConfiguration yamlConfig) {
        ShardingRuleConfiguration result = new ShardingRuleConfiguration();
        
        // 解析广播表配置
        if (null != yamlConfig.getBroadcastTables()) {
            result.getBroadcastTables().addAll(yamlConfig.getBroadcastTables());
        }
        
        // 其他配置解析...
        return result;
    }
}
2.2 广播表路由优化
// 广播表路由优化
public final class BroadcastRoutingEngine implements RoutingEngine {
    
    private final ShardingRule shardingRule;
    private final Collection<String> logicTables;
    
    @Override
    public RouteResult route(final List<Object> parameters) {
        RouteResult result = new RouteResult();
        
        // 获取所有数据源
        Collection<String> dataSources = shardingRule.getDataSourceMap().keySet();
        
        // 为每个广播表在每个数据源创建路由单元
        for (String logicTable : logicTables) {
            for (String dataSource : dataSources) {
                result.getRouteUnits().add(new RouteUnit(
                    new DataSourceName(dataSource), 
                    new TableUnit(logicTable, logicTable)));
            }
        }
        
        return result;
    }
}
2.3 广播表数据一致性保证

ShardingSphere通过以下机制保证广播表的数据一致性:

// 广播表数据一致性保证
public final class BroadcastTableConsistencyChecker implements ConsistencyChecker {
    
    @Override
    public boolean check(final String dataSourceName, final String logicTableName) {
        // 获取所有数据源
        Map<String, DataSource> dataSourceMap = shardingRuntimeContext.getDataSourceMap();
        
        // 获取广播表的所有物理表
        Collection<String> actualTables = shardingRuntimeContext.getRuleMetaData().getRules().stream()
            .filter(each -> each instanceof ShardingRule)
            .map(each -> (ShardingRule) each)
            .flatMap(each -> each.getBroadcastTables().stream())
            .filter(each -> each.equalsIgnoreCase(logicTableName))
            .findFirst()
            .map(each -> dataSourceMap.keySet().stream()
                .map(dataSource -> each)
                .collect(Collectors.toList()))
            .orElse(Collections.emptyList());
        
        // 检查每个数据源中的表结构和数据一致性
        // 实际实现中会比较表结构、数据行数等
        return isSchemaConsistent(dataSourceMap, actualTables) && isDataConsistent(dataSourceMap, actualTables);
    }
    
    private boolean isSchemaConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {
        // 检查表结构一致性
        // 实际实现中会比较表字段、索引等
        return true;
    }
    
    private boolean isDataConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {
        // 检查数据一致性
        // 实际实现中会比较数据行数、数据摘要等
        return true;
    }
}

3. 广播表的优势与注意事项

优势

  • 显著减少跨库连接查询的开销
  • 简化SQL编写,无需考虑分片键关联
  • 提高查询性能,尤其是多表关联场景

注意事项

  • 广播表数据变更需要同步到所有数据源
  • 不适合大数据量表(会导致存储成本增加)
  • 需要定期检查各数据源中广播表的一致性

五、跨分片查询的优化机制

1. 广播表优化

// 广播表与分片表关联查询优化
public final class ShardingMergeEngine {
    
    public QueryResult merge(final SelectStatementContext selectStatementContext, final List<QueryResult> queryResults) {
        // 如果是广播表与分片表的关联查询
        if (isBroadcastJoin(selectStatementContext)) {
            // 优化处理:直接在各分片内完成关联
            return new BroadcastJoinMergedResult(queryResults, selectStatementContext);
        }
        
        // 其他合并逻辑...
    }
}

2. 分页优化

// 内存分页优化
public int getMemoryPageOffset() {
    if (isSameLogicSQLWithDifferentParameters()) {
        // 当SQL相同但参数不同时,需要计算总的offset
        return calculateTotalOffset();
    }
    return limit.getOffsetValue();
}

3. 并行执行优化

// 并行执行框架
public final class ExecuteEngine {
    
    private final ExecutorService executorService;
    
    public <T> List<T> execute(final Collection<? extends Callable<T>> callables) throws SQLException {
        try {
            // 使用线程池并行执行
            return executorService.invokeAll(callables).stream().map(this::getResult).collect(Collectors.toList());
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new SQLException("Interrupted", ex);
        }
    }
}

六、总结与最佳实践

1. 核心机制总结

  • 分片键驱动路由:ShardingSphere通过分片键精确计算目标数据源和表
  • 广播表优化:将小表复制到所有数据源,消除跨库连接开销
  • 并行执行框架:基于线程池实现多数据源并行查询
  • 流式结果归并:通过装饰器模式实现结果集的流式处理和归并
  • 查询重写:自动调整SQL以适应分片环境

2. 最佳实践建议

  1. 合理设计分片键:选择查询频率高、分布均匀的字段作为分片键
  2. 利用广播表:对于字典表等小表,配置为广播表减少跨库查询
  3. 避免全表扫描:尽量在SQL中包含分片键条件
  4. 优化分页:避免大偏移量分页,考虑使用游标分页
  5. 监控与调优:利用ShardingSphere的监控功能,分析慢查询并优化
  6. 定期检查广播表一致性:确保各数据源中广播表数据一致

通过深入理解ShardingSphere的源码实现,特别是广播表这一关键特性,我们可以更好地利用其分片能力,避免常见的性能陷阱,构建高效、稳定的分布式数据库系统。