FileOutputCommitter中提交mapreduce.fileoutputcommitter.algorithm.version有v1和v2两个版本。
v1版本Spark写入文件的流程:
1.当task完成的时候,会将task的结果文件先写入到临时目录下面。
2.所有的task完成后,将所有的结果文件写入到结果目录。
3.删除临时目录,生成标记文件。
v1的弊端:第二步写入操作是在driver端进行的,而且是单线程进行。当结果文件数量很多的时候,耗时就会线性增加。
v2版本Spark写入文件的流程:
1.当task完成的时候,将task的结果文件直接写入结果目录。
2.所有的task完成后,删除临时目录,生成标记文件。
可以看的v2比v1少了一个rename的过程。写入结果目录是发生在Executor上,task是同时进行的,相当于多线程,速度更快。但是存在一致性问题,在所有task写入结果目录过程,用户可以看到部分数据。
FileOutputCommitter源码解析
对应的类是org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
commit task
Executor端完成task会执行commitTask,可以看到当algorithmVersion == 1的时候会执行rename操作,将task结果文件到committedTaskPath(临时文件)。
algorithmVersion不是1的时候,使用mergePaths将结果文件直接写入到outputPath
commit job
在driver执行,当job完成(所有的task完成),执行commitJob
algorithmVersion == 1,执行mergePaths将结果文件直接写入到outputPath,这里就是性能瓶颈所在的地方,这里是单线程进行的,要是单个文件耗时20ms,1000个文件就是20000ms(20s),它是线性增加的。
algorithmVersion == 2的话,在commitTask中已经写入到结果目录了。
最后删除临时目录,生成标记文件。
和hive结合使用
上面提到使用v2的时候,很有可能出现结果目录中出现部分文件的情况,这要是hive的表目录就能在这个表读取到部分数据,这是不对的。应该保证原子性,要么读取的数据是旧的数据,要么是新的数据,不能读取部分数据。
入口类:org.apache.spark.sql.hive.execution.InsertIntoHiveTable
hive的临时目录生成
类似这种:{hive表数据目录}/.hive-staging_hive_2025-06-26_17-33-07_088_701862708150638596-1/-ext-10000/
processInsert
这里就一个简单的场景,就是写入到非分区表。
首先是先将数据写入到hive的临时目录中,在load到hive表中。
写入hive临时目录
可以看到commiter变量,这个就是上面的FileOutputCommitter,有两种提交方式。
runJob就是spark提交了任务,开始执行。这里执行task,task完成回调commitTask方法,runJob是阻塞的,会等到所有task完成。然后回调commitJob方法。根据上面FileOutputCommitter,此时数据已经全部写入了hive的临时目录下面了。
loadTable
loadTable是使用反射的方式执行_loadTableMethod_方法,对应就是Hive的loadTable方法。
hive的loadTable中如果是替换就是replaceFiles,否则是copyFiles。
replaceFiles
对应sparksql的 insert overwrite
首先将原有的目录下面的文件都放到回收站,即删除,再将结果文件从临时目录移动到正式目录。
moveFiles 移动结果文件。可以看到使用线程池(默认是25个线程)进行结果文件的重命名来移动文件的。不同于FileOutputCommitter的v1中单线程进行rename,Hive是多线程来rename的,效率更高。
copyFiles
对应sparksql的 insert into
可以看到它也是使用线程池进行并行操作的。
整体流程
v2流程,建议使用v2,效率更高。