分库分表代码实操

发布于:2025-04-02 ⋅ 阅读:(70) ⋅ 点赞:(0)

在分库分表架构中,Java代码的查询逻辑会因是否使用中间件(如ShardingSphere)而有所不同。


1. 使用中间件(如ShardingSphere)的代码示例

中间件自动处理分片路由,代码与普通查询几乎无差异,只需配置分片规则即可。

1.1 依赖配置(pom.xml)
<!-- ShardingSphere JDBC 依赖 -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-jdbc-core</artifactId>
    <version>5.3.2</version>
</dependency>
1.2 配置文件(application-sharding.yml)
dataSources:
  ds0: # 分库0
    url: jdbc:mysql://db0:3306/db?useSSL=false
    username: root
    password: 123456
  ds1: # 分库1
    url: jdbc:mysql://db1:3306/db?useSSL=false
    username: root
    password: 123456

rules:
  sharding:
    tables:
      user_info:
        actualDataNodes: ds$->{0..1}.user_info_$->{0..1} # 分库分表名规则(2库 × 2表)
        databaseStrategy: # 分库规则
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: db_hash
        tableStrategy: # 分表规则
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: table_hash
    shardingAlgorithms:
      db_hash:
        type: HASH_MOD # 哈希取模
        props:
          sharding-count: 2 # 分库数量
      table_hash:
        type: HASH_MOD
        props:
          sharding-count: 2 # 分表数量
1.3 Java查询代码(与普通JDBC一致)
// 通过用户ID查询(自动路由到对应分片)
public User getUserById(Long userId) {
    String sql = "SELECT * FROM user_info WHERE user_id = ?";
    try (Connection conn = dataSource.getConnection();
         PreparedStatement pstmt = conn.prepareStatement(sql)) {
        pstmt.setLong(1, userId);
        ResultSet rs = pstmt.executeQuery();
        if (rs.next()) {
            return new User(rs.getLong("user_id"), rs.getString("username"));
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

// 跨分片查询(如查询所有用户)
public List<User> getAllUsers() {
    List<User> users = new ArrayList<>();
    String sql = "SELECT * FROM user_info"; // 中间件自动合并所有分片结果
    try (Connection conn = dataSource.getConnection();
         Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
        while (rs.next()) {
            users.add(new User(rs.getLong("user_id"), rs.getString("username")));
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return users;
}

2. 手动实现分片路由的代码示例

若未使用中间件,需在代码中自行计算分片位置并指定表名。

2.1 分片工具类
public class ShardingUtil {
    // 分库数量
    private static final int DB_COUNT = 2;
    // 分表数量(每个库内的表数)
    private static final int TABLE_COUNT_PER_DB = 2;

    // 根据user_id计算分库位置
    public static String getDataSourceName(Long userId) {
        int dbIndex = (int) (userId % DB_COUNT);
        return "ds" + dbIndex; // 如 ds0, ds1
    }

    // 根据user_id计算分表名
    public static String getTableName(Long userId) {
        int tableIndex = (int) ((userId / DB_COUNT) % TABLE_COUNT_PER_DB);
        return "user_info_" + tableIndex; // 如 user_info_0, user_info_1
    }
}
2.2 查询代码(手动路由)
public User getUserByIdManual(Long userId) {
    // 1. 计算分库分表信息
    String dataSourceName = ShardingUtil.getDataSourceName(userId);
    String tableName = ShardingUtil.getTableName(userId);

    // 2. 根据dataSourceName获取对应数据源(需提前配置多数据源)
    DataSource dataSource = getDataSource(dataSourceName);

    // 3. 动态拼接SQL
    String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";
    try (Connection conn = dataSource.getConnection();
         PreparedStatement pstmt = conn.prepareStatement(sql)) {
        pstmt.setLong(1, userId);
        ResultSet rs = pstmt.executeQuery();
        if (rs.next()) {
            return new User(rs.getLong("user_id"), rs.getString("username"));
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

// 跨分片查询(需遍历所有分片)
public List<User> getAllUsersManual() {
    List<User> users = new ArrayList<>();
    for (int dbIndex = 0; dbIndex < DB_COUNT; dbIndex++) {
        String dataSourceName = "ds" + dbIndex;
        DataSource dataSource = getDataSource(dataSourceName);
        for (int tableIndex = 0; tableIndex < TABLE_COUNT_PER_DB; tableIndex++) {
            String tableName = "user_info_" + tableIndex;
            String sql = "SELECT * FROM " + tableName;
            try (Connection conn = dataSource.getConnection();
                 Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery(sql)) {
                while (rs.next()) {
                    users.add(new User(rs.getLong("user_id"), rs.getString("username")));
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    return users;
}

3. 关键注意事项

3.1 分片键处理
  • 必须携带分片键
    精准查询(如WHERE user_id = 123)需明确分片键,否则中间件可能广播到所有分片。
  • 避免非分片键查询
    如必须按非分片键(如手机号)查询,需建立异构索引(如通过Elasticsearch)。
3.2 事务管理
  • 本地事务
    单分片操作可使用本地事务(如更新同一用户的订单和账户)。
  • 分布式事务
    跨分片操作需引入Seata等框架:
@GlobalTransactional // Seata的全局事务注解
public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
    // 扣减fromUserId的余额(可能分片A)
    deductBalance(fromUserId, amount);
    // 增加toUserId的余额(可能分片B)
    addBalance(toUserId, amount);
}
3.3 性能优化
  • 避免全表扫描
    跨分片查询(如SELECT * FROM user_info)会遍历所有分片,性能极差,需通过分页、异步导出等方式优化。
  • 批量操作
    按分片键分组后批量提交:
Map<String, List<Long>> shardedUserIds = userIds.stream()
    .collect(Collectors.groupingBy(ShardingUtil::getDataSourceName));

shardedUserIds.forEach((dataSourceName, ids) -> {
    DataSource ds = getDataSource(dataSourceName);
    String sql = "INSERT INTO user_info (...) VALUES (...)";
    // 使用JDBC批量插入
});

4. 总结

  • 核心原则
    1. 查询尽量携带分片键。
    2. 避免跨分片事务和全表扫描。
    3. 优先通过业务设计减少分片间依赖。

实际开发中,推荐优先使用成熟中间件(如ShardingSphere),仅在特殊场景下手动实现路由逻辑。


网站公告

今日签到

点亮在社区的每一天
去签到