粗粮厂的基于spark的通用olap之间的同步工具项目

发布于:2025-08-19 ⋅ 阅读:(13) ⋅ 点赞:(0)


说的比较简单,有需要的可以留言,我不断补充完善

1 项目背景

我们公司内部的需要一款,能在不同的olap之间做数据传递与拷贝,例如 iceberg到doris,到mysql,甚至到kafka的,这么一个数据同步工具,要尽可能简单,尽可能维护容易。所以有了这么一个项目的诞生,目前可以实现,通过一条简短的shell命令,实现不同数据库与存储之间的数据拷贝。

目前这套方案,在公司内部已经部署4个数据团队,服务对象产品+数据研发 超过100人,直接使用使用的业务对象,超过 4000人。

2 项目实现

2.1 实现原理

目前的实现是通过spark来实现,分为两个部分:

  • 1 写入同步信息:同步任务记录,是一个很简单的,通过shell 传参,调用一个spark任务,执行一个简单的,数据插入动作,将 数据源表,目标,持有人,过滤或想要保留的字段,数据筛选项等一些信息,传入spark任务中,并将数据写入一个mysql中保存。其中源表与目标表,通过 catalog__database__schema__tablename的方式保存,并维护了一套catalog,通过前缀就可以知道数据在哪个引擎的哪个表中。
  • 2 读取同步信息:一个常驻的,死循环的spark任务,会定期遍历mysql,会筛选出目前符合条件的,未过期的,同步任务,使用ExecutionContext 和 Future ,来并发执行同步任务,通过源信息,与反射 ,维护一个连接的配置项,来做隔离,保证数据传入时,不会涉及隐私信息

2.2 细节要点

  • 并发部分,可以通过【读取同步信息】 任务部分启动时,动态传参,来控制数据流量
  • 在任务中,维护了3个列表,分别保证,同一个任务只会执行一次,同一个目标表,同一时间只有一个任务在写入,任务执行超过配置时间,会自动杀死,并允许新的任务调起,这样就可以保证不会触发目标的锁,并控制重复提交
  • 通过对不同传入参数解析,对于每个目标引擎单独部署独立的同步任务,做到资源隔离
  • spark任务 每个并发执行有做到很好的异常捕获,发生问题时,可以调用报警接口,发送信息到持有人飞书中;对于常驻的 【读取同步信息】整体任务监控,做到2天杀死重新启动,并每5分钟pid判活,保证任务的执行中
  • 任务监控与判断,对目标数据与原始数据做数据量校验,对数据过程中的日志做接受,扫描错误日志等,保证要给

3 抽样说明

这里抽样说明一下 ,iceberg 同步数据到hologres 时的要点,其实整体的使用都相同,不过在开发的时候,可以根据不同的引擎做不同的细节调整 : 例入hologres

  • 使用spark-connector-hologres的连接器写入数据,连接器会先在hologres引擎中创建临时表,数据写入完成后,再做insert overwrite动作,因为分布式存储的问题,所以就需要在代码里手动执行set hg_experimental_force_sync_replay = on; 来保证元数据在不同节点的同步
  • 使用hologres连接器,对原始数据量做判断,超过1千万的,执行serverless,也就是后被隐藏能源!
  • 增加1次的任务重试,减少因为元数据不同步导致的表不存在的bug
  • 目标数据是视图的方式,也有分区表,可以在代码中做判断并刷新视图,保证数据插入可以兼容

总的来说,可以根据不同的业务目标库与使用方法,做单独的优化迭代,保证到每次的同步都是最优的选项

4 项目运行状态

还是以iceberg 到 hologres 为例,某个实例的spark资源情况为 180个 Executor,每个4G,16G的DM,参数配置为:

--conf spark.sql.catalog.iceberg_zjyprc_hadoop.cache-enabled=false
--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=30
--conf spark.network.timeout=180000
--conf spark.slow.shuffle.fetch.time.blacklist.threshold=60000
--conf spark.speculation=false
--conf spark.excludeOnFailure.enabled=false
--conf spark.task.maxFailures=1

4.1 运行速度

目前 5000w条数级别的数据量,大概需要 16-17分钟,而且这里面有一半的时间时因为hologres连接器在内部重新shuffle,如果目标是mysql之类的,速度会提高至少一半

4.2 项目吞吐

目前每日同步 9000张表,总数据量大概 1-2T左右,基本可以满足业务需求

4.3 稳定性

通过上述的监控与定期重启,配合计算引擎的升级,同步迭代工具的使用,例如hologres 支持了insert overwrite 命令,可以实现写cpu打满也不会影响读的使用,同步迭代最新版本,可以保证业务的高可用。


网站公告

今日签到

点亮在社区的每一天
去签到