【calcite】calcite实现SQL列级数据血缘 data lineage 查询

发布于:2024-05-08 ⋅ 阅读:(34) ⋅ 点赞:(0)

一、背景

大数据数据血缘,内部实现十分复杂一般需要依赖框架。calcite作为apache顶级项目,且为java体系成员,被多个项目所使用,如flink,spark,kafka等。calcite 对mysql,oracle,postgres和其他大数据平台支持较好,对sqlserver支持较差,没有看到sqlserver相关的代码。
另,python系推荐使用sqlglot,datahub采用。

calcite官方文档

二、 实现方式

gradle添加依赖:

dependencies {
    testImplementation('org.apache.calcite:calcite-core:1.32.0')
}

以下均有scala语言实现,并使用Mysql5.7测试完成:

drop table if exists test.st01;
CREATE TABLE test.st01(
s_id BIGINT comment '主键',
s_name VARCHAR(20)  comment '姓名',
s_age INT comment '年龄',
s_sex VARCHAR(10) comment '性别',
s_part  VARCHAR(10) comment '分区字段',
ts TIMESTAMP comment '创建时间'
);
insert into test.hive_st01 values(1,'zhangsan',10,'male','student','2020-01-01 18:01:01.666');
insert into test.hive_st01 values(2,'lisi',66,'female','teacher','2020-01-01 10:01:01.666');
insert into test.hive_st01 values(3,'sunlirong',50,'male','student','2020-01-01 10:01:01.666');
insert into test.hive_st01 values(4,'laoliu',38,'female','teacher','2020-01-01 10:01:01.666');

create table test.st02 like test.st01;
insert into test.hive_st02 values(2,'wangwu',66,'male','teacher','2020-01-01 10:01:01.666');
insert into test.hive_st02 values(3,'zhaoliu',66,'female','student','2020-01-01 10:01:01.666');

create table test.st03 like test.st01;

先是设置好两个sql语句:

  /**
   * 简单测试
   */
  val MYSQL_SQL1 =
    """
      |select * from `st01` where 1=1
      |""".stripMargin

  /**
   * 测试内容:1、insert into 2、mysql非标准sql函数CONCAT 3、join 4、where
   */
  val MYSQL_SQL2 =
    """
      |insert into `test`.`st03`
      |select s_id,combined_name s_name,s_age,s_sex,s_part,ts from (
      |select
      |a.s_id as s_id
      |,CONCAT(a.s_name,'-',b.s_name) as combined_name
      |,a.s_age+b.s_age as s_age
      |,a.s_sex as s_sex
      |,'none' as s_part
      |,current_timestamp as ts
      |from `test`.`st01` a inner join `test`.`st02` b on a.s_id=b.s_id
      |where a.s_sex='male'
      |) t0 order by ts limit 2
      |""".stripMargin.trim

初始化数据库连接参数:

  val MYSQL_DATABASE = "test"
  val MYSQL_USERNAME = "root"
  val MYSQL_PASSWORD = "你的密码"
  val MYSQL_JDBC_URL = s"jdbc:mysql://192.168.100.100:3306/${MYSQL_DATABASE}?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true"
// 此处就是根据表结构设置,写死
  def printStandardRs(rs: ResultSet): Unit = {
    while (rs.next()) {
      val s_id = rs.getLong(1)
      val s_name = rs.getString(2)
      val s_age = rs.getInt(3)
      val s_sex = rs.getString(4)
      val s_part = rs.getString(5)
      val ts = rs.getTimestamp(6)
      println(s"Get result s_id:${s_id},s_name:${s_name},s_age:${s_age},s_sex:${s_sex},s_part:${s_part},ts:${ts}")
    }
  }
// 打印RelRoot内部含有的列的血缘信息
  def printRelRoot(relRoot: RelRoot): Unit = {
    val fields = relRoot.fields
    val mq: RelMetadataQuery = relRoot.rel.getCluster.getMetadataQuery
    for (index <- 0 until fields.size()) {
      val destFieldName = fields(index).getValue
      val origins: util.Set[RelColumnOrigin] = mq.getColumnOrigins(relRoot.rel, index)
      if (origins != null && origins.nonEmpty) {
        val oriStr = origins.map(ori => {
          val depTbl: RelOptTable = ori.getOriginTable
          // val depTblSchema = depTbl.getRelOptSchema
          val fieldNames = depTbl.getRowType.getFieldNames.toSeq
          val depColOrd: Int = ori.getOriginColumnOrdinal
          val depFldName: String = fieldNames(depColOrd)
          val qualifiers: mutable.ListBuffer[String] = mutable.ListBuffer.empty
          qualifiers.addAll(depTbl.getQualifiedName)
          qualifiers.add(depFldName)
          qualifiers.mkString(".")
        }).mkString(",")
        println(s"${destFieldName} <- ${oriStr}")
      }
    }
  }

本文都是使用calcite 的RelMetadataQuery 类,提供的血缘信息查询。
需要注意的是
(1)都需要提供数据库名db名.tbl名否则,无法找到表,因其是从SchemaPlus提供的元数据信息找表。
(2)create table as无法被planner识别,如果需要分析create table as的血缘可能需要自己写正则拆分sql,把select部分单独提取出来,再进行识别。

2.1 通用版本

    var sql = MYSQL_SQL2
    // 具体连接参数参考:org.apache.calcite.config.CalciteConnectionProperty
    // 具体sql functions参考:org.apache.calcite.sql.fun.SqlLibraryOperators
    val conn = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL;model=inline:" + getDefaultMysqlConnConfig)
    val stmt = conn.createStatement()

    try {
      val rs = stmt.executeQuery(sql)
      printStandardRs(rs)
      rs.close()
    } catch {
      case ex: Exception => println(ex.getMessage)
    }

    val ccStmt = conn.createStatement().unwrap(classOf[CalciteServerStatement])
    val cxt = ccStmt.createPrepareContext()

    val mysqlValidateConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT.withConformance(SqlConformanceEnum.MYSQL_5)

    // 获取parse config,用于planner.parse
    val mysqlParseConfig = SqlParser.config()
      .withLex(Lex.MYSQL)
      .withConformance(SqlConformanceEnum.MYSQL_5)

    // 获取OperatorTable,operator操作符集合,用于planner.validate
    // 方法一、不含 sql funcs,不可用
    // val calciteCatalogReader=new CalciteCatalogReader(cxt.getRootSchema,List(MYSQL_DATABASE),cxt.getTypeFactory,CalciteConnectionConfig.DEFAULT)
    // 方法二、所有内置的 sql funcs,不推荐使用
    // val sqlFuncs: Seq[SqlFunction] = classOf[SqlLibraryOperators].getFields.toSeq.map(f => f.get(null)).filter(v => v.isInstanceOf[SqlFunction]).map(f=>f.asInstanceOf[SqlFunction])
    // val sqlOperatorTable=SqlOperatorTables.of(sqlFuncs)
    // 方法三、推荐使用,使用扫描注解的方式加载,类SqlLibraryOperators中各个方法都有注解
    // 必须有SqlLibrary.STANDARD,否则 “=”都无法识别。
    val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(util.EnumSet.of(SqlLibrary.STANDARD, SqlLibrary.MYSQL))

    val frameworkConfig = Frameworks.newConfigBuilder()
      .parserConfig(mysqlParseConfig)
      .defaultSchema(cxt.getRootSchema.plus())
      .sqlValidatorConfig(mysqlValidateConfig)
      .operatorTable(mysqlOperatorTable)
      .build()

    val planner = Frameworks.getPlanner(frameworkConfig)
    val parsedNode = planner.parse(sql)
    val validatedNode = planner.validate(parsedNode)
    val relRoot: RelRoot = planner.rel(validatedNode)
    // println(s"get RelNode:${relRoot}")
    printRelRoot(relRoot)
    stmt.close()
    conn.close()

2.2 代码版本

区别就是此处使用代码构建dataSource和SchemaPlus

  def getCalciteMysqlConn(): CalciteConn = {
    Class.forName("com.mysql.cj.jdbc.Driver")
    val dataSource = new BasicDataSource
    dataSource.setUrl(MYSQL_JDBC_URL)
    dataSource.setUsername(MYSQL_USERNAME)
    dataSource.setPassword(MYSQL_PASSWORD)
    Class.forName("org.apache.calcite.jdbc.Driver")
    val connection = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL")
    val calciteConnection = connection.unwrap(classOf[CalciteConnection])
    val rootSchema: SchemaPlus = calciteConnection.getRootSchema
    val schema: JdbcSchema = JdbcSchema.create(rootSchema, MYSQL_DATABASE, dataSource, null, MYSQL_DATABASE)
    rootSchema.add(MYSQL_DATABASE, schema)
    CalciteConn(calciteConnection, rootSchema)
  }
    
    var targetSql = MYSQL_SQL2
    println(s"get sql:\n${targetSql}")
    var parserConfig: SqlParser.Config = SqlParser.config()
    parserConfig = parserConfig
      .withLex(Lex.MYSQL)
      .withConformance(SqlConformanceEnum.MYSQL_5)

    val calciteConn = getCalciteMysqlConn()
    val rootSchema = calciteConn.schema
    val schemaList = new java.util.ArrayList[String]
    schemaList.add(MYSQL_DATABASE)

    val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(SqlLibrary.STANDARD, SqlLibrary.MYSQL)

    val mysqlConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT
      .withConformance(SqlConformanceEnum.MYSQL_5)
      .withLenientOperatorLookup(true)

    val frameworkConfig: FrameworkConfig = Frameworks.newConfigBuilder
      .defaultSchema(rootSchema)
      .operatorTable(mysqlOperatorTable)
      .parserConfig(parserConfig)
      .sqlValidatorConfig(mysqlConfig)
      .build()

    val planner = Frameworks.getPlanner(frameworkConfig)
    val sqlNode: SqlNode = planner.parse(targetSql)
    val validatedNode: SqlNode = planner.validate(sqlNode)
    val relRoot: RelRoot = planner.rel(validatedNode)
    printRelRoot(relRoot)

最终打印结果:

s_id <- test.hive_st01.s_id
s_name <- test.hive_st02.s_name,test.hive_st01.s_name
s_age <- test.hive_st02.s_age,test.hive_st01.s_age
s_sex <- test.hive_st01.s_sex

参考文章:
基于Calcite解析Flink SQL列级数据血缘