【MogDB】一种基于ctid分片并发查询以提升大表查询性能的方式

发布于:2025-07-13 ⋅ 阅读:(15) ⋅ 点赞:(0)

【MogDB】一种基于ctid分片并发查询以提升大表查询性能的方式

在oracle里有见过这样一种用法,大意是基于行所在的文件号、块号、行号拼成一个数字,除以10取余数,这样这个余数可能为0~9这10个数字,应用并发执行10个SQL,让每个线程中SQL条件的这个余数分别等于0~9,即可将一个大表通过10个并发进行查询,提高数据查询效率

select * from t_test where 
MOD(Dbms_Rowid.Rowid_Relative_Fno(t.ROWID)||Dbms_Rowid.Rowid_Block_Number(t.ROWID)||Dbms_Rowid.ROWID_ROW_NUMBER(t.ROWID),'10') = '9'

这个方案不是官方提供的,而且很明显存在一些问题,它这么查出来行在每个线程里并不是连续的,而且数字直接拼接没有考虑位数的差异。本文暂不深究这种方式的问题,ORACLE有官方的解决方案,即在11.2版本中新增的DBMS_PARALLEL_EXECUTE。

在OG/PG系中,其实也可以通过ctid分片并发查询来提升查询效率。

验证方案可行性

drop table t_test;
create table t_test(id int,a text);
insert into t_test select id,'xxxxxxxxxx' from pg_catalog.generate_series (1,10000000) id;

--查询块数
select relpages from pg_class where oid='t_test'::regclass; --54055

--测试常规全表扫描时间
explain analyze select count(1) from t_test; --1430ms

--分6片并发,每片10000个page (没有平均分片,存在一些误差)
explain analyze select count(1) from t_test where ctid >= '(0,0)' and ctid< '(10000,0)'; --282ms
explain analyze select count(1) from t_test where ctid >= '(10000,0)' and ctid< '(20000,0)'; --282ms
explain analyze select count(1) from t_test where ctid >= '(20000,0)' and ctid< '(30000,0)'; --285ms
explain analyze select count(1) from t_test where ctid >= '(30000,0)' and ctid< '(40000,0)'; --280ms 
explain analyze select count(1) from t_test where ctid >= '(40000,0)' and ctid< '(50000,0)'; --281ms
explain analyze select count(1) from t_test where ctid >= '(50000,0)' and ctid< '(54056,0)'; --337ms  最后一个分片时间稍长一点点,多次测试都一样,暂未分析原因

--测试5000个page,时间开销大概只有10000个page的一半(耗时和数据量基本成正比)
explain analyze select count(1) from t_test where ctid >= '(0,0)' and ctid< '(5000,0)'; --148ms
QUERY PLAN                                                                                                            |
----------------------------------------------------------------------------------------------------------------------+
Aggregate  (cost=899.00..899.01 rows=1 width=8) (actual time=148.119..148.119 rows=1 loops=1)                         |
  ->  Tid Range Scan on t_test  (cost=0.01..774.00 rows=49999 width=0) (actual time=0.010..63.138 rows=925000 loops=1)|
        TID Cond: ((ctid >= '(0,0)'::tid) AND (ctid < '(5000,0)'::tid))                                               |
Total runtime: 148.201 ms                                                                                             |

--测试自带的并行查询,耗时和单个分片差不多
explain analyze select /*+set(query_dop 6)*/ count(1) from t_test; --282ms

--nestloop下可能走不到tidscan,使用场景有局限
explain analyze select /*+leading((b a))*/count(1) from t_test a,t_test b where a.id=b.id and a.ctid >= '(0,0)' and a.ctid< '(5000,0)' and b.id=1000000; --1468ms

QUERY PLAN                                                                                                           |
---------------------------------------------------------------------------------------------------------------------+
Aggregate  (cost=408105.81..408105.82 rows=1 width=8) (actual time=1467.898..1467.899 rows=1 loops=1)                |
  ->  Nested Loop  (cost=0.00..408105.81 rows=1 width=0) (actual time=1467.893..1467.893 rows=0 loops=1)             |
        ->  Seq Scan on t_test b  (cost=0.00..179053.25 rows=1 width=4) (actual time=65.282..642.095 rows=1 loops=1) |
              Filter: (id = 1000000)                                                                                 |
              Rows Removed by Filter: 9999999                                                                        |
        ->  Seq Scan on t_test a  (cost=0.00..229052.55 rows=1 width=4) (actual time=825.782..825.782 rows=0 loops=1)|
              Filter: ((ctid >= '(0,0)'::tid) AND (ctid < '(5000,0)'::tid) AND (id = 1000000))                       |
              Rows Removed by Filter: 10000000                                                                       |
Total runtime: 1468.117 ms                                                                                           |

简单化使用方案

根据以上测试,如果不使用内核自带的并行查询能力(在openGauss金融版里,query_dop是禁止修改的),基于ctid切片来实现多连接的并发查询,在部分场景下可以提高全表扫描场景的查询效率(需要自行确保查询快照的一致性)。

但是提前计算好每个线程里的ctid范围再去组装SQL,会比较麻烦,因此我想了一种方案,通过创建两个自定义函数,仅需传入要并行查的表、分片数、当前线程查第几片即可。

CREATE OR REPLACE FUNCTION public.ctid_split_max(i_tableoid oid,  i_mod integer, i_n integer)
 RETURNS tid
 LANGUAGE sql
 IMMUTABLE NOT FENCED NOT SHIPPABLE
AS $function$select ('('||i_n*(trunc((select relpages  from pg_class where oid=i_tableoid)/i_mod)+1)||',0)')::tid ;
$function$;


CREATE OR REPLACE FUNCTION public.ctid_split_min(i_tableoid oid, i_mod integer, i_n integer)
 RETURNS tid
 LANGUAGE sql
 IMMUTABLE NOT FENCED NOT SHIPPABLE
AS $function$select ('('||(i_n-1)*(trunc((select relpages  from pg_class where oid=i_tableoid)/i_mod)+1)||',0)')::tid  ;
$function$;

使用时,应用开启多个并发同时查询该表,带上where条件,比如查询t_test表,6个并发

select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,1) and ctid_split_max('t_test'::regclass::oid,6,1);
select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,2) and ctid_split_max('t_test'::regclass::oid,6,2);
select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,3) and ctid_split_max('t_test'::regclass::oid,6,3);
select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,4) and ctid_split_max('t_test'::regclass::oid,6,4);
select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,5) and ctid_split_max('t_test'::regclass::oid,6,5);
select * from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,6) and ctid_split_max('t_test'::regclass::oid,6,6);

注意函数入参全部都是常量,而且由于是sql immutable,因此能常量折叠,提前算好,不会每行都去执行这两个函数。

验证方案准确性

--查之前必须收集一次统计信息,否则pg_class里的relpages不准,会导致部分数据未查询到
analyze t_test;
--验证是否存在数据遗漏
select count(1) from t_test;

select sum(c) from (
 select count(1) c from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,1) and ctid_split_max('t_test'::regclass::oid,6,1)
 union all
 select count(1) from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,2) and ctid_split_max('t_test'::regclass::oid,6,2)
  union all
select count(1) from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,3) and ctid_split_max('t_test'::regclass::oid,6,3)
 union all
 select count(1) from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,4) and ctid_split_max('t_test'::regclass::oid,6,4)
 union all
 select count(1) from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,5) and ctid_split_max('t_test'::regclass::oid,6,5)
 union all
 select count(1) from t_test where ctid between ctid_split_min('t_test'::regclass::oid,6,6) and ctid_split_max('t_test'::regclass::oid,6,6)
);

注意,此方案不适用于分区表,因为在openGauss中,同一个表的不同分区,ctid是可能存在重复值的,对于分区表应该通过分区子句去指定,让每个线程只查一个分区,并且统计信息也得改从pg_partition中获取。

另外,该方案还依赖于tidrangescan算子的支持,MogDB在2024-03-30发布的5.0.6版本为了提升gs_dump的效率,支持并行导出,新增了tidrangescan的算子,因此该方案才能有效果,而在MogDB 5.0.5及之前的版本只能使用seq scan然后再filter,该方案并不能带来性能的提升。另外,在openGauss 6.0.0版本中,该能力也未引入,但似乎在7.0.0 RC1中做了适配(官方下载的包仍然不支持)。实测GaussDB 506.0版本也没有tidrangescan

应用测试

思路和简单SQL验证都符合预期,本来懒得再用java测了,不过现在AI工具非常厉害了,思路完整就可以让AI自动写出程序。于是我直接把这篇文章灌给Cursor,并下达指令:

读一下这篇文章,写段JAVA程序,验证文章中这种拆数据分片的并行方式是否的确可以加快数据查询速度。思考如何设计测试用例,能在无并行和有并行时,输入的SQL相同、输出的产物也相同,注意保持日志输出的整洁

import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.logging.Level;

/**
 * 优化的CTID并行查询测试
 * 减少日志输出,优化连接管理,更准确的性能测试
 */
public class OptimizedCtidTest {
    
    // 数据库连接配置
    private static final String DB_URL = "jdbc:opengauss://192.168.163.134:52000/postgres";
    private static final String DB_USER = "admin";
    private static final String DB_PASSWORD = "Admin@123";
    
    // 测试配置
    private static final String TEST_TABLE = "t_test_optimized";
    private static final int THREAD_COUNT = 8;  // 线程数
    private static final int DATA_SIZE = 10000000; // 1000万条数据
    private static final int TEST_ROUNDS = 10;    // 测试轮数
    
    // 连接池
    private static final List<Connection> connectionPool = new ArrayList<>();
    private static final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT + 2);
    
    static {
        // 关闭openGauss驱动的详细日志
        Logger.getLogger("org.opengauss").setLevel(Level.WARNING);
    }
    
    public static void main(String[] args) {
        OptimizedCtidTest test = new OptimizedCtidTest();
        
        System.out.println("=== CTID并行查询测试 ===");
        System.out.println("数据量: " + DATA_SIZE + " 条");
        System.out.println("并发数: " + THREAD_COUNT + " 个线程");
        System.out.println("测试轮数: " + TEST_ROUNDS + " 轮");
        System.out.println();
        
        try {
            // 初始化连接池
            test.initConnectionPool();
            
            // 初始化测试数据
            test.setupTestData();
            
            // 性能测试
            test.performanceTest();
            
            // 详细分析
            test.detailedAnalysis();
            
        } catch (Exception e) {
            System.err.println("测试失败: " + e.getMessage());
            e.printStackTrace();
        } finally {
            test.cleanup();
        }
    }
    
    /**
     * 初始化连接池
     */
    private void initConnectionPool() throws SQLException {
        System.out.println("初始化连接池...");
        
        // 创建连接池
        for (int i = 0; i < THREAD_COUNT + 2; i++) {
            Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
            connectionPool.add(conn);
        }
        
        System.out.println("✓ 连接池初始化完成 (" + connectionPool.size() + " 个连接)");
    }
    
    /**
     * 获取连接(复用连接池)
     */
    private synchronized Connection getConnection() {
        if (!connectionPool.isEmpty()) {
            return connectionPool.remove(0);
        }
        throw new RuntimeException("连接池已空");
    }
    
    /**
     * 归还连接
     */
    private synchronized void returnConnection(Connection conn) {
        if (conn != null) {
            connectionPool.add(conn);
        }
    }
    
    /**
     * 初始化测试数据
     */
    private void setupTestData() throws SQLException {
        System.out.println("准备测试数据...");
        
        Connection conn = getConnection();
        try (Statement stmt = conn.createStatement()) {
            // 删除并重建表
            stmt.execute("DROP TABLE IF EXISTS " + TEST_TABLE);
            stmt.execute("CREATE TABLE " + TEST_TABLE + "(id int, data text)");
            
            // 插入测试数据
            System.out.print("插入 " + DATA_SIZE + " 条数据...");
            long startTime = System.currentTimeMillis();
            stmt.execute("INSERT INTO " + TEST_TABLE + 
                " SELECT id, 'data_' || id FROM generate_series(1, " + DATA_SIZE + ") id");
            long insertTime = System.currentTimeMillis() - startTime;
            System.out.println("完成 (" + insertTime + "ms)");
            
            // 创建分片函数
            stmt.execute("""
                CREATE OR REPLACE FUNCTION ctid_split_min(i_tableoid oid, i_mod integer, i_n integer)
                RETURNS tid LANGUAGE sql IMMUTABLE AS $$
                select ('('||(i_n-1)*(trunc((select relpages from pg_class where oid=i_tableoid)/i_mod)+1)||',0)')::tid;
                $$;
                """);
            
            stmt.execute("""
                CREATE OR REPLACE FUNCTION ctid_split_max(i_tableoid oid, i_mod integer, i_n integer)
                RETURNS tid LANGUAGE sql IMMUTABLE AS $$
                select ('('||i_n*(trunc((select relpages from pg_class where oid=i_tableoid)/i_mod)+1)||',0)')::tid;
                $$;
                """);
            
            // 收集统计信息
            stmt.execute("ANALYZE " + TEST_TABLE);
             
            // 获取表信息
            ResultSet rs = stmt.executeQuery("SELECT relpages, reltuples FROM pg_class WHERE relname = '" + TEST_TABLE + "'");
            if (rs.next()) {
                System.out.println("表页面数: " + rs.getInt(1) + ", 估算行数: " + rs.getLong(2));
            }
            
        } finally {
            returnConnection(conn);
        }
    }
    
    /**
     * 性能测试
     */
    private void performanceTest() throws Exception {
        System.out.println("\n=== 性能对比测试 ===");
        
        List<Long> serialTimes = new ArrayList<>();
        List<Long> parallelTimes = new ArrayList<>();
        
        // 预热
        System.out.println("预热中...");
        for (int i = 0; i < 2; i++) {
            serialQuery();
            parallelQuery();
        }
        
        // 正式测试
        System.out.println("开始正式测试...");
        for (int round = 1; round <= TEST_ROUNDS; round++) {
            System.out.print("第 " + round + " 轮: ");
            
            // 串行查询
            long serialTime = serialQuery();
            serialTimes.add(serialTime);
            System.out.print("串行=" + serialTime + "ms ");
            
            // 等待一下避免缓存影响
            Thread.sleep(1000);
            
            // 并行查询
            long parallelTime = parallelQuery();
            parallelTimes.add(parallelTime);
            System.out.print("并行=" + parallelTime + "ms ");
            
            double speedup = (double) serialTime / parallelTime;
            System.out.println("提升=" + String.format("%.2f", speedup) + "倍");
        }
        
        // 统计结果
        long avgSerial = serialTimes.stream().mapToLong(Long::longValue).sum() / TEST_ROUNDS;
        long avgParallel = parallelTimes.stream().mapToLong(Long::longValue).sum() / TEST_ROUNDS;
        double avgSpeedup = (double) avgSerial / avgParallel;
        
        System.out.println("\n=== 测试结果汇总 ===");
        System.out.println("串行查询平均时间: " + avgSerial + "ms");
        System.out.println("并行查询平均时间: " + avgParallel + "ms");
        System.out.println("平均性能提升: " + String.format("%.2f", avgSpeedup) + "倍");
        
        if (avgSpeedup > 1.2) {
            System.out.println("✓ 并行查询有明显性能提升!");
        } else if (avgSpeedup > 0.9) {
            System.out.println("~ 并行查询效果一般");
        } else {
            System.out.println("✗ 并行查询反而更慢");
        }
    }
    
    /**
     * 串行查询
     */
    private long serialQuery() throws SQLException {
        Connection conn = getConnection();
        try {
            long startTime = System.currentTimeMillis();
            
            try (Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + TEST_TABLE)) {
                if (rs.next()) {
                    long count = rs.getLong(1);
                    if (count != DATA_SIZE) {
                        System.err.println("警告: 数据不一致,期望 " + DATA_SIZE + " 实际 " + count);
                    }
                }
            }
            
            return System.currentTimeMillis() - startTime;
        } finally {
            returnConnection(conn);
        }
    }
    
    /**
     * 并行查询
     */
    private long parallelQuery() throws Exception {
        long startTime = System.currentTimeMillis();
        AtomicLong totalCount = new AtomicLong(0);
        
        // 提交并行任务
        List<Future<Long>> futures = new ArrayList<>();
        for (int i = 1; i <= THREAD_COUNT; i++) {
            final int threadIndex = i;
            futures.add(executor.submit(() -> {
                Connection conn = getConnection();
                try {
                    String sql = "SELECT COUNT(*) FROM " + TEST_TABLE + 
                        " WHERE ctid BETWEEN ctid_split_min('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", " + threadIndex + ")" +
                        " AND ctid_split_max('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", " + threadIndex + ")";
                    
                    try (Statement stmt = conn.createStatement();
                         ResultSet rs = stmt.executeQuery(sql)) {
                        if (rs.next()) {
                            return rs.getLong(1);
                        }
                    }
                    return 0L;
                } catch (SQLException e) {
                    System.err.println("线程 " + threadIndex + " 失败: " + e.getMessage());
                    return 0L;
                } finally {
                    returnConnection(conn);
                }
            }));
        }
        
        // 等待所有任务完成并汇总结果
        for (Future<Long> future : futures) {
            totalCount.addAndGet(future.get());
        }
        
        long executionTime = System.currentTimeMillis() - startTime;
        
        // 验证结果
        if (totalCount.get() != DATA_SIZE) {
            System.err.println("警告: 并行查询结果不一致,期望 " + DATA_SIZE + " 实际 " + totalCount.get());
        }
        
        return executionTime;
    }
    
    /**
     * 详细分析
     */
    private void detailedAnalysis() throws Exception {
        System.out.println("\n=== 详细分析 ===");
        
        // 单个分片的性能分析
        System.out.println("各分片性能分析:");
        for (int i = 1; i <= THREAD_COUNT; i++) {
            final int threadIndex = i;
            
            Connection conn = getConnection();
            try {
                String sql = "SELECT COUNT(*) FROM " + TEST_TABLE + 
                    " WHERE ctid BETWEEN ctid_split_min('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", " + threadIndex + ")" +
                    " AND ctid_split_max('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", " + threadIndex + ")";
                
                long startTime = System.currentTimeMillis();
                try (Statement stmt = conn.createStatement();
                     ResultSet rs = stmt.executeQuery(sql)) {
                    if (rs.next()) {
                        long count = rs.getLong(1);
                        long time = System.currentTimeMillis() - startTime;
                        System.out.println("  分片 " + threadIndex + ": " + count + " 行, " + time + "ms");
                    }
                }
            } finally {
                returnConnection(conn);
            }
        }
        
        // 执行计划分析
        System.out.println("\n执行计划分析:");
        Connection conn = getConnection();
        try (Statement stmt = conn.createStatement()) {
            System.out.println("串行查询执行计划:");
            ResultSet rs = stmt.executeQuery("EXPLAIN SELECT COUNT(*) FROM " + TEST_TABLE);
            while (rs.next()) {
                System.out.println("  " + rs.getString(1));
            }
            
            System.out.println("\n并行查询执行计划:");
            rs = stmt.executeQuery("EXPLAIN SELECT COUNT(*) FROM " + TEST_TABLE + 
                " WHERE ctid BETWEEN ctid_split_min('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", 1)" +
                " AND ctid_split_max('" + TEST_TABLE + "'::regclass::oid, " + THREAD_COUNT + ", 1)");
            while (rs.next()) {
                System.out.println("  " + rs.getString(1));
            }
        } finally {
            returnConnection(conn);
        }
    }
    
    /**
     * 清理资源
     */
    private void cleanup() {
        try {
            executor.shutdown();
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
            
            for (Connection conn : connectionPool) {
                if (conn != null && !conn.isClosed()) {
                    conn.close();
                }
            }
        } catch (Exception e) {
            System.err.println("清理资源时出错: " + e.getMessage());
        }
    }
} 
=== CTID并行查询测试 ===
数据量: 10000000 条
并发数: 8 个线程
测试轮数: 10 轮

初始化连接池...
? 连接池初始化完成 (10 个连接)
准备测试数据...
表页面数: 63098, 估算行数: 9999252

=== 性能对比测试 ===
预热中...
开始正式测试...
第 1 轮: 串行=890ms 并行=804ms 提升=1.11倍
第 2 轮: 串行=893ms 并行=695ms 提升=1.28倍
第 3 轮: 串行=1016ms 并行=837ms 提升=1.21倍
第 4 轮: 串行=864ms 并行=595ms 提升=1.45倍
第 5 轮: 串行=989ms 并行=796ms 提升=1.24倍
第 6 轮: 串行=1163ms 并行=772ms 提升=1.51倍
第 7 轮: 串行=946ms 并行=531ms 提升=1.78倍
第 8 轮: 串行=871ms 并行=556ms 提升=1.57倍
第 9 轮: 串行=1015ms 并行=598ms 提升=1.70倍
第 10 轮: 串行=923ms 并行=743ms 提升=1.24倍

=== 测试结果汇总 ===
串行查询平均时间: 957ms
并行查询平均时间: 692ms
平均性能提升: 1.38倍
? 并行查询有明显性能提升!

=== 详细分析 ===
各分片性能分析:
  分片 1: 1387285 行, 182ms
  分片 2: 1230528 行, 422ms
  分片 3: 1230528 行, 426ms
  分片 4: 1230528 行, 162ms
  分片 5: 1230528 行, 159ms
  分片 6: 1230528 行, 162ms
  分片 7: 1230528 行, 165ms
  分片 8: 1229547 行, 534ms

执行计划分析:
串行查询执行计划:
  Aggregate  (cost=188088.65..188088.66 rows=1 width=8)
    ->  Seq Scan on t_test_optimized  (cost=0.00..163090.52 rows=9999252 width=0)

并行查询执行计划:
  Aggregate  (cost=943.96..943.97 rows=1 width=8)
    ->  Tid Range Scan on t_test_optimized  (cost=0.01..818.97 rows=49996 width=0)
          TID Cond: ((ctid >= '(0,0)'::tid) AND (ctid <= '(7888,0)'::tid))

由于我是在虚拟机上测试,所以性能抖动有点大,如果是在配置好的物理机上,每个分片查询速度差不多,那么并行所提升的性能应该更高。

总结与展望

本文通过使用基于CTID进行分片并行查询的方式,提升了全表扫描的速度,但能使用此方案的场景非常少。
尽管某些场景,OG/PG系数据库的性能强于ORACLE,但大表的全表扫描速度弱于ORACLE,是OG系一直绕不过去的坎,期望有一天OG/PG社区能在全表扫描性能上有重大突破。


网站公告

今日签到

点亮在社区的每一天
去签到