背景与问题概述
这一周(2025-05-26-2026-05-30)我在搞数据拟合修复优化的任务,有大量的数据需要进行数据处理及回写,大概一个表一天一分区有五六千万数据,大约一百多列的字段。 具体是这样的我先取档案,关联对应表hive对应分区的数据,然后进行算法一系列逻辑处理后,将结果输出到hive,然后再从hive回写一份到oracle里面。
spark资源大概我给了不小,数据大概一天40左右吧,大概12个excutor,每一个12G内存,2core吧,拟合完数据,将数据入hive时候,进行了整体去重。 包括且不限于如下操作
1、.distinct(),
2、对应主键的去重.dropDuplicates(id),
3、row_number对id,type主键字段开窗取first
4、对id,type主键字段开窗,取后续字段的max()
经过以上操作,我的数据得以在没有主键冲突的情况下顺利的入库到hive中,并且我对入库数据进行group by id,type having count(1) >1时数据也没有出现重复的情况。
OK。鬼知道我对上述数据验证进行多少次跑批总结出来的上面的操作。以上是我写入hive的操作。 下面即将是从hive入到oracle艰辛的探索之路。 正常来讲经过上面的数据操作,我从hive入到oracle是不应该出现主键冲突的情况了,因为我有一部分表已经处理入库了,但有一个表就是死活入不进去,我impala都快查烂了,资源监控的同事都给我致电了。
为什么调了一天呢,因为跑一个 程序就要个吧小时,代码都快被我调抑郁了。
Hive数据写入阶段的去重策略
经过多次实验和验证,我总结出一套有效的去重方法,确保数据在写入Hive时不出现主键冲突:
1. 整体去重 - distinct()
val distinctDF = originalDF.distinct()
这种方法简单直接,但性能开销较大,适合小数据集或初步去重。
2. 基于主键的去重 - dropDuplicates()
val dedupByKeyDF = originalDF.dropDuplicates("id")
比整体去重更高效,只针对指定列进行去重。
3. 开窗函数取第一条记录
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp") val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)) .filter("rn = 1") .drop("rn")
这种方法在有多条相同主键记录时,可以按指定排序条件保留一条。
4. 开窗函数取最大值记录
val maxValueDF = originalDF.groupBy("id", "type") .agg(max("value1").as("value1"), max("value2").as("value2"), /* 其他字段的max操作 */)
对于需要保留最大值的场景,这种聚合方式非常有效。
Hive到Oracle的数据的迁移问题结局
尽管Hive中的数据已经严格去重,但在迁移到Oracle时仍遇到了两个主要问题:
问题1:NULL值导致的主键冲突
-- 问题发现查询
SELECT id, type, COUNT(1)
FROM hive_table
WHERE id IS NULL
GROUP BY id, type
HAVING COUNT(1) > 1;
解决方案:
// 在写入Oracle前增加NULL值处理
val cleanDF = processedDF.na.fill("NULL", Seq("id"))
.filter("id IS NOT NULL") // 或者直接过滤
问题2:资源不足导致的作业失败
最初配置:
12个Executor
每个Executor 12G内存,2个核心
一个表一天的分区大概处理约40GB数据
作业在运行10-20分钟后失败,经过多次调整,最终稳定运行的配置:
每个Executor 45G内存,这个我觉得得看集群资源,我们集群资源很紧张,大概10TB的内存,都不太够用
适当增加核心数(根据集群情况)我一般都设置2
性能优化经验总结
1. 内存配置黄金法则
对于大规模数据处理,Executor内存配置应遵循:
基础内存 = 数据分区大小 × 安全系数(2-3)
考虑序列化开销和中间数据结构
2. 高效去重策略选择
方法 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
distinct() | 小数据集或全字段去重 | 简单 | 性能差 |
dropDuplicates() | 已知主键字段 | 高效 | 仅针对指定列 |
开窗函数 | 需要按条件保留记录 | 灵活可控 | 计算开销大 |
聚合函数 | 需要保留极值 | 高效 | 只能处理数值字段 |
3. NULL值处理最佳实践
在数据处理的早期阶段识别和处理NULL值
对于主键字段,NULL值应被替换或过滤
考虑使用COALESCE或NVL函数提供默认值
4. 资源监控与调优技巧
观察GC时间和频率,内存不足时GC会频繁发生
监控Executor心跳丢失情况
适当增加
spark.memory.fraction
(默认0.6)考虑启用
spark.memory.offHeap.enabled
使用堆外内存
优化Demo示例代码
/**
* @date 2025-05-30
* @author hebei_xidaocun_laoli
*/
// 1. 读取原始数据
val rawDF = spark.table("source_table")
.where("dt = '20250530'") // 按分区过滤
// 2. 多阶段去重处理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重
val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF
.withColumn("rn", row_number().over(windowSpec))
.filter("rn = 1")
.drop("rn")
// 3. NULL值处理
val cleanDF = stage2DF.na.fill(Map(
"id" -> "NULL_ID",
"type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但确保不冲突
// 4. 写入Hive
cleanDF.write
.mode("overwrite")
.partitionBy("dt")
.saveAsTable("result_hive_table")
// 5. 配置优化后写入Oracle
cleanDF.write
.format("jdbc")
.option("url", "jdbc:oracle:thin:@//host:port/service")
.option("dbtable", "target_table")
.option("user", "username")
.option("password", "password")
.option("batchsize", 10000) // 调整批量大小
.option("isolationLevel", "NONE") // 对于大数据量写入可提高性能
.mode("append")
.save()
通过这次项目,总结了以下经验:
数据质量优先:在数据处理早期阶段解决NULL值、重复数据等问题
渐进式调优:从较小资源开始,逐步增加直至作业稳定运行
监控驱动:密切监控作业执行情况,特别是GC和内存使用指标
文档记录:记录每次调整的参数和效果,形成知识库
大数据处理中的问题往往不是单一因素导致的,需要综合考虑数据特性、处理逻辑和集群资源。希望诸君避免类似的"坑",更高效地完成大数据处理任务。
这个资源调优是真的恶心,代码没问题,就是和资源有问题,跑着跑着就突然报错了,唉,还好这个端午节前解决了