spark任务优化参数整理

发布于:2025-02-10 ⋅ 阅读:(35) ⋅ 点赞:(0)

以下参数中有sql字眼的一般只有spark-sql模块生效,如果你看过spark的源码,你会发现sql模块是在core模块上硬生生干了一层,所以反过来spark-sql可以复用core模块的配置,例外的时候会另行说明,此外由于总结这些参数是在不同时间段,当时使用的spark版本也不一样,因此要注意是否有效,验证的方法很简单,去下面官方文档搜一下就行。如果本博主已经踩了坑的会直接说明。看完之后如果有core模块优化参数不多的感觉,无需自扰,因为core模块的开发本身就是80%依赖代码级优化实现的,比如rdd集的分区拆分、加盐、转换等等都是在代码级别完成的,而不是任务提交参数。

这里附带官网的配置文档–》https://spark.apache.org/docs/latest/configuration.html,把官方文档放在这里有两个目的,一是官方文档里面spark参数很全,就导致里面不止有可用来任务优化的参数,还有偏集群用的配置,比如是否开始任务ui,是否在ui中就能kill任务等等这种,二是我下面写的都是我在工作中会用到的,以及一些使用经验,并且官方文档里那么多参数工作都不可能都用,你想用也记不住啊,同时大家在看其他文献时,遇到不认识的参数可以去找找看,官方文档里有参数的开始生效版本(Since Version),找不到的话就说明,参数不正确或者apache版本的spark已经删除了相关配置

1、任务使用资源限制,基本参数,注意的是这些资源配置有spark前缀是因为他们是标准的conf配置,也就是submit脚本,你调用--conf参数写的,和--driver.memory这种属于不同的优先级,--driver.memory这种优先级比它高,对于spark来讲,数据量和计算量是两个不同的概念,计算任务本身不止有单一的MR架构那样一个map一个reduce的直白执行逻辑,还有很多复杂的任务task,所以随着执行计划的不同,往往计算量要大于数据量,而且这个差距是成正比的放大,要使用的计算资源也更多,除了计算任务本身,还有伴随计算产生的附加消耗,因此往往1G的数据要付出3G的计算资源,甚至更多,在具体计算的时候使用多少资源,就需要经验了,不过初学者可以参考一个公式min(计算数据大小/容器内存+20%左右的预留=容器个数,任务开始执行后配合其他参数可支持并行task最多时的容器个数)、driver的内存(spark.driver.memory)永远大于driver可收集数据集大小(spark.driver.maxResultSize) 、单容器的内存和核数的比例是10:1、单容器的大小不应太大一般在6C/60G左右就最多了

在容器资源的估算上,除了上面提到的用数据量来估算,也有的地方使用总核数来估算,也就是用总量可用有多少,再除以其他的相关数据值,这种情况是因为拥有的集群资源不多,没有办法支撑任务完全自主扩张,虽然两种估算方法最后的目的是一样的,但是后者对于大任务所需资源来讲,肯定会影响任务的运行,具体使用的时候看情况而定即可

在具体配置的时候,容器个数最好交给动态延展去处理,这样不会造成在启动容器量和在计算数据量上的不协调,除非你容器设置的本身不够。当你的任务特别大,大到超过了容器读写性能的瓶颈,再考虑用num的方式直接指定定额的容器个数,因为随着不同集群性能的影响,过大的任务在容器动态延展上会很吃力,任务会不稳定。

至于一个集群的性能读写瓶颈,如果你能拿到当前集群的冒烟测试结果,那是最好的,但是越大的集群,冒烟测试越不好做,所以除非是私有云的小项目,否则一般很难拿到,此时最直观的观察点就是这个集群的shuffer额度,在保证任务跑通的情况下,一个集群能容纳的shuffer量越高他的读写瓶颈就越高,本作者最高操作过读写瓶颈在10Tshuffer下保证任务运行,10t以上就不太好说了

spark.driver.memory=20G   #applicationmaster启动的driver进程占用内存
spark.driver.cores=4   #applicationmaster启动的driver进程占用核数
spark.executor.cores=4     #容器占用核数
spark.executor.memory=40G    #容器占用的内存数
spark.num.executors=10    #任务用到的总容器数

2、限制sql任务运行时拉取的分区数和拉去文件总大小的上限,不过这两个配置是kyuubi的,做一个参考而已,apache spark不限制你读取的分区数和数据大小

spark.sql.watchdog.maxPartitions=2000
spark.sql.watchdog.maxFileSize=3t

3、任务过程中计算文件的大小控制,也就是聚合、拆分相关,这点可以跳过,因为这点涉及的参数不是apache spark的参数,放在这里只是做一个对比,apache 版本的core模块需要通过关注rdd集的分区数控制,见下面的36点,或者大文件控制上限,见5点,sql模块需要通过AQE,见下面的9点

spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled=true    #是否对不发生shuffer的stage做聚合
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true   #是否在写入文件之间聚合
spark.sql.optimizer.finalStageConfigIsolation.enabled=true    #最后任务的最后阶段文件聚合,会有一个落盘前聚合的执行计划

上面这三个文件聚合是kyuubi的参数,apache spark开源团队对离线文件聚合与拆分这方面,由于被白嫖太多所以有些许摆烂

但是国内大厂有自己的开发做出了自己的东西,比如阿里基础架构计算引擎3.2.1升级点概要中提到的支持非动态分区支持合并小文件
在这里插入图片描述
诸如此类的参数各家引擎提供商都不一样,因此如果你用的不是开源就问一下提供方是否有相关参数,同时这也更加突出了开发的重要,尤其是在这个AI满天飞的时代,很多公共的姿势资源差距被拉平了,所以大家努力共勉提升自己的技术水平还是很有必要的

4、任务最后阶段消耗资源多少,通常是配合压缩和自适应分区的相关配置来做任务优化,这两个参数同上也是kyuubi的,放在这里再次向告诉大家突出不同发行版本下的spark架构有着不同的特点,需要灵活应用

spark.sql.finalWriteStage.executorMemory=10g
spark.sql.finalWriteStage.executorCores=2

5、大文件拆分,从apache spark 2.1.0 版本开始core模块处理大文件时,可以不通过修改rdd集的分区数来控制计算过程中生成的文件大小,因为大文件的上限是一个未知数,直接通过分区数来控制,可能会发生需多次尝试的情况,最终本末倒置,因此,apache spark支持修改计算中生成文件的上限,来限制大文件对任务的影响,默认是128M(134217728字节)

spark.files.maxPartitionBytes=134217728    #一般情况用这个

不过当你使用的是sql模块,并且读取Parquet、JSON和ORC文件时,apache版本下的spark,提供了两个比较鸡肋的同类型配置,说是鸡肋是因为sql模块有AQE,因此了解知道就行

spark.sql.files.maxPartitionBytes=128MB   #这个配置在你读取的数据文件类型是Parquet、JSON和ORC时使用,⭐️⭐️⭐️注意这个参数2.0.0开始起效
spark.sql.files.maxPartitionNum=null     #当你的数据文件类型是Parquet、JSON和ORC时使用,用来调整最大分区数,⭐️⭐️⭐️注意这个参数3.5.0开始起效

配置的时候注意,spark.sql.files.maxPartitionNum的起效版本是3.5.0,而apache spark源码里面的默认值就是空的,官方文档中提到如果不配置时默认采用spark.sql.shuffle.partitions的值,并且需要开启AQE以及AQE的分区跟踪

6、数据文件压缩

mapreduce.output.fileoutputformat.compress=true    #是否对任务的输出数据压缩
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec     #用到的压缩类
mapreduce.map.output.compress=true     #是否对map阶段的输出进行压缩
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec     #同上
hive.exec.compress.output=true    #hive端数据压缩,⭐️⭐️3.x的spark之后共用上面map的压缩策略,但是2.x的版本有一个mapred.map.output.compress.codec,使用的时候注意版本
spark.sql.parquet.compression.codec=snappy    #如果表数据存储类型是parquet,那就要另行制定压缩方式,默认是snappy,可以选择gzip、lzo、或者uncompressed不压缩

下面的压缩一般不用
spark.rdd.compress=true     #是否对rdd集合的数据进行压缩
spark.checkpoint.compress=true    #是否对任务检查点数据压缩,默认false
spark.broadcast.compress=true     #spark任务的广播数据是否压缩,默认true,压缩格式复用 spark.io.compression.codec

7、spark-io压缩用到的编码器,很多spark内部涉及的压缩编码器默认复用的都是这个配置,如RDD分区、事件日志、广播变量和洗牌输出等。默认是lz4

spark.io.compression.codec=lz4

默认情况下,Spark提供四种编解码器:lz4、lzf、snappy和zstd。
它们对应的,以及你还可以使用完全限定的类名来指定编解码器,如下:
org.apache.spark.io.LZ4CompressionCodec
org.apache.sark.io.LZFCompressionCoded
org.apache.spark.io.SnappyCompressionCodec
org.apache.spark.io.ZStdCompressionCoded

8、sql任务的shuffer任务的默认分区数,但是这个配置的生效前提是其他算子在执行计划中的分区数失效了才使用它,所以大多情况下没有作用,默认值200

spark.sql.shuffle.partitions=200

9、sql任务自适应分区数查询(AQE),注意同时配置了AQE的合并分区相关和倾斜时,会先合并,再调整倾斜,设计到的分区大小,建议50~300M

spark.sql.adaptive.enabled=true                               # 开启aqe
spark.sql.adaptive.forceApply=true                      #强制开启AQE,一般不带这个参数,当发现aqe的效果不明显的时候再用
spark.sql.adaptive.logLevel=info            #aqe的日志级别,一般保持默认,不用改
spark.sql.adaptive.coalescePartitions.enabled=true             # 自动合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum=100    # 初始的分区数。默认为spark.sql.shuffle.partitions的值
spark.sql.adaptive.coalescePartitions.minPartitionNum=1     # 最小的分区数。默认为spark.sql.shuffle.partitions的值,一般不另行配置
spark.sql.adaptive.coalescePartitions.maxPartitionNum=1     # 最大的分区数。AQE的常用配置,一般不要太大
spark.sql.adaptive.advisoryPartitionSizeInBytes=128M           # 每个分区建议大小(默认单位字节)
spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128M     #设置shuffer阶段后下一阶段任务输入预期的数据大小,一般不另行配置
spark.sql.adaptive.fetchShuffleBlocksInBatch=true        #默认是true,当获取连续的shuffle分区时,对于同一个map的shuffle block可以批量获取,而不是一个接一个的获取,来提升io提升性能
spark.sql.adaptive.localShuffleReader.enabled=true	  #允许在自适应时采用本地进程优化shuffer,分险是如果报错,这部分日志无法聚合到yarn
spark.sql.adaptive.skewJoin.enabled=true                         # 开启join时的数据倾斜检测
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5          # 默认5,当某个分区大小大于所有分区的大小中间值5倍,就打散数据
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256M   #通过直接指定分区大小的阈值来决定是否打散分区, 默认256M,和上面的参数一起生效,用来因对不同的情况
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2         #参与join的表非空分区对于整体任务而言占比小于该比例,那么该表不会被作为广播表去使用,默认0.2,一般不改,因为通常广播能力是禁用掉的,广播会非常耗driver的内存,尤其是在TB级数据处理中,随意广播是一个比较危险的操作
spark.sql.autoBroadcastJoinThreshold=-1        #这个配置通常保持-1,它是指给一个字节大小,小于这个字节的表都会在join操作时被广播,-1表示禁用广播能力

spark.sql.mergesmallfilesize=256M  #分区建议大小  ⭐️⭐️注意这个配置在3.x之后废弃,2.x需要试一下你用的版本

对于AQE的分区数,一定要知道不是说你设置多少,web页面上就能直观的体现出来多少,执行计划上是以task的形式展示执行计划的,而task和分区是两个东西,因此你要有那种感觉,来调整分区数,这种感觉只能靠经验去喂,就和神枪手一样。

而且AQE的coalescePartitions对数据集倾斜的作用很明显,但是对join是发生的热点key倾斜就不太有力了,所以如果有join倾斜一定要开启skewJoin

10、任务容器的动态伸缩,建议一般情况下不要使用直接指定,而是尽量使用动态扩容,来规定任务的容器个数,因为直接指定时,很容易造成资源的倾斜,除非你的任务特别大,这个时候动态扩容的能力会成为累赘,至于如何判断你可以在任务启动之后spark页面的excutor页面看到实际启动了多少个容器,如果total个数和单个容器使用资源综合考虑后的结果,反馈出任务最终使用的计算资源超过了有预留后的总资源,或者该任务的shuffer无论你如何调整,它的shuffer大小都较高,甚至逼近当前集群的shuffer能力上限,这种情况就不建议用动态容器扩展了

spark.dynamicAllocation.enabled=true                  # 开启动态收缩容器资源默认false
spark.dynamicAllocation.shuffleTracking.enabled=true	 # shuffle动态跟踪,默认true
spark.dynamicAllocation.initialExecutors=10	     # 初始化申请资源
spark.dynamicAllocation.maxExecutors=100	         # 最大容器个数
spark.dynamicAllocation.minExecutors=10	         # 最小容器个数
spark.dynamicAllocation.executorAllocationRatio=1	 # 这个用来设置动态容器资源模式下,任务可尝试的最多资源占比,默认为1,本身是个浮点数值也就是0.1到~,一般不另行配置

11、是否对分区做动态裁剪,默认true,这个配置一般不关,它目的就是优化执行,开启后你可以在spark的任务web界面看到有的执行计划就被skip了,当然skip不全是因为它,容器的动态伸缩和自适应分区数也会造成。

spark.sql.optimizer.dynamicPartitionPruning.enabled=true

12、spark-sql提供了Hint,需要你去查看官方文档https://spark.apache.org/docs/3.5.2/sql-ref-syntax-qry-select-hints.html#content,看的时候注意你用的版本,这个就是在写sql的时候加入建议的执行计划,比如当你希望sql执行的时候直接指定希望的分区数,你可以写成如下的格式,但是这种方式其实就是嫌少了用户使用时的代码量,一般用的不多

SELECT /*+ COALESCE(3) */ * FROM t;

13、core任务中rdd的默认分区数,这个配置一般不直接在任务外配置,有需要的话调用算子的parallelism方法了,具体见36点

spark.default.parallelism=10

14、修改系列化方式,这里的序列化是针对shuffle、广播和rdd cache的序列化方式,默认使用java的序列化方式org.apache.spark.serializer.JavaSerializer性能比较低,所以一般都使用org.apache.spark.serializer.KryoSerializer ,至于spark task的序列化由参数spark.closure.serializer配置,目前只支持JavaSerializer。

spark.serializer=org.apache.spark.serializer.KryoSerializer

15、kryo序列化时使用的缓存区大小,这个配置是当driver调用collect等算子回收大量数据到driver端,Kryo可能会抛buffer limit exceeded异常,这个时候就要调大该参数

spark.kryoserializer.buffer=64k

16、第15点设置的是缓存大小,这个配置设置的是driver收集数据使用的内存资源最大是多少,默认1g,0表示不限制

spark.driver.maxResultSize=1g

17、下一个数据块定位次数,在数据落盘的时候如果网络延迟等极端原因会导致driver定位数据块写入位置时,收不到任何datanode的回馈,这个时候可以尝试调大这个值,一般不会遇到,博主只遇到过一次,出现问题的时候会抛出Unable to close file because the last block does not have enough number of replicas异常,对应的bug在spark2.7.4已修复,往后的版本这个参数就以及废弃了,这个配置的默认值是5,挂了就设置为6

dfs.client.block.write.locateFollowingBlock.retries=5

18、shuffle write时,会先写到BufferedOutputStream缓冲区中,然后再写到磁盘,该参数就是缓存区大小,默认32k,建议设置为64k,这个配置是数据量不较大的时候,减少一些系列化次数,和让小文件聚合异曲同工,设置的时候注意要和17平衡

spark.shuffle.file.buffer=32k

19、shuffle溢写磁盘过程中需要将数据序列化和反序列化,这个参数是一个批次处理的条数,默认是10000,需要的话调大该值,2万5万都可以,但是一定要成比例的设置16的配置值

spark.shuffle.spill.batchSize=10000

20、shuffle read拉取数据时,由于网络异常或者gc导致拉取失败,会自动重试,该参数配置重试次数,在数据量达到十亿、百亿级别的时候,最好调大该参数以增加稳定性,默认是3次,建议设置为10到20。

spark.shuffle.io.maxRetries=3

21、该参数是 spark.shuffle.io.maxRetries的重试间隔,默认是0.5s。

spark.shuffle.io.retryWait=500

22、shuffle read拉取数据时的缓存区大小,也就是一次拉取的数据大小,默认64,计算单位是M,要注意的它是从n个节点上,一次总共拉取64M数据,而不是从单个节点获取64M。并且它拉取数据时,并行的发送n个请求,每个请求拉取的最大长度是 64M / n,但是实际拉取时都是以block为最小单位的,所以实际获取的有可能会大于64M / n。所以这个配置就有点迷,属于理论上不行,但实际由于block大小而不得不行的配置

spark.reducer.maxSizeInFlight=64

在你对上面缓存区的大小做修改的时候,不要设置的太大,因为要考虑下面的这个配置

spark.reducer.maxReqsInFlight=Int.MaxValue

该配置用来限制每个批次拉数据时,能够发出的请求数上限,默认是scala中Int类型的最大值,一般不另行改动,但是如果你缓存区大小设置的不合理,或者碰上任务生产的中间文件普遍不大,造成spark为了靠近你设置的缓存区大小文件请求一次性发出去很多,这就会造成大量的网络IO导致任务失败,遇到这种情况,要先使用文件聚合,然后考虑AQE、最后调整任务资源,因为前面两个对资源的消耗是有一定的影响的,总之再次就是想告诉你有这种顾虑存在,至于这个上限限制一般不改

23、spark允许你限制每个reduce任务,能够对执行计划中的某个datanode上获取最多多少个数据块,不过一般遇不到改的情况,和上面缓存区面临的请求数一样是一个要知道的概念

spark.reducer.maxBlocksInFlightPerAddress=Int.MaxValue

注意:列出14-23的配置是为了引对一种极端情况,如果你的上游开发不是你,但是上游表生成了巨量的小文件,导致你的任务在执行计划中看到的情况明显具体计算参与率很低,大量的开销都耗在了拉数据和数据倾斜上,同时伴随着网络套接字断开的问题,联系上游人家不鸟你,那你只能注意文件聚合、调整任务资源、配置数据倾斜之外,在把拉取的数据批次大小放低,重试和重试间隔放大,最后就阿弥陀佛吧

24、存储内存占用比例,这个配置越大留给shuffer和计算本身的内存就越少,反之越小跑任务的时候数据暂时落盘的次数就越频繁,默认值0.5

spark.memory.storageFraction=0.5

对于落盘的阈值,在整个spark中有个spark.reducer.maxReqSizeShuffleToMem参数,用来直接用数据大小来控制落盘时机,但是该参数变动很频繁,不同版本名称也不一样,所以一般不用,这里的落盘是指计算过程中随着计算任务的跑数,或者input来的数据太多,使得容器内部用来存储数据的那部分堆内存到了上限之后,数据就会被暂时写在文件里面,当恢复平衡之后会逐步再读取出来处理

25、如果你的数据类型是Parquet,且使用spark计算引擎处理hive数据,要注意这个配置,用来决定是否采用spark自己的Serde来解析Parquet文件;Spark SQL为了更好的性能,在读取hive metastore创建的parquet文件时,会采用自己Parquet Serde,而不是采用hive的Parquet Serde来序列化和反序列化,由于两者底层实现差异比较大,所以很容易造成null值和decimal精度问题,默认为true,设为false即可(会采用与hive相同的Serde)。

spark.sql.hive.convertMetastoreParquet=false

当你操作spark要对hive表的Parquet类型数据写入的时候一定要注意下面的配置。

spark.sql.parquet.writeLegacyFormat=true

这个参数用来决定是否使用hive的方式来写Parquet文件,这是由于对数据精度的把控上,两个计算框架不一样,比如decimal精度上两者的实现就有差别,导致hive读取spark创建的Parquet文件会报错,在hive中decimal类型是固定的用int32来表示,而标准的parquet规范约定,根据精度的不同会采用int32和int64来存储,而spark就是采用的标准的parquet格式,所以对于精度不同decimal的,底层的存储类型有变化,所以使用spark存储的parquet文件,在使用hive读取时报错,因此要将spark.sql.parquet.writeLegacyFormat(默认false)配置设为true,即采用与hive相同的format类来写parquet文件

26、和上面的Parquet一样,orc数据spark和hive的底层实现也不太一样,因此如果你用spark处理hive的orc数据,要注意下面的配置

spark.sql.hive.convertMetaStoreOrc=false

上面这个配置用来决定spark读取orc的时候是否转换成spark内部的Parquet格式的表,如果你的orc数据来自于hive,就要设置为false,如果为true发生兼容性问题的概率很大

orc.force.positional.evolution=true

上面这个配置决定spark读取orc时,是否强制解析orc数据,这里的强制说的是,由于orc是列式存储,在不同版本之间很容易发生字段底层存储的顺序不同,或其他不兼容问题,为true时,意味着强制解析这些数据,为数据分析提供了一定的兼容性保证。一种常见的需要强制解析场景就是,当你对orc格式的表,修改了字段名,或者增加列,并且你没有刷新数据的话,不强制解析的情况下,select出来的列名就是数据里面真实存储的字段名,也就是原来的字段名

spark.hadoop.hive.exec.orc.split.strategy=BI

上面这个配置是用来决定,spark读取orc文件时的切分策略,有三种可选值,分别为BI(文件级别)、ETL(stripe条带级别)、‌HYBRID(混合默认),在网上能找到的其他文献中说这个配置的默认值是BI,但本博主在使用spark3.2.1的时候遇到了一次读取orc数据报数组下标越界问题,规避了空值影响之后,发现ETL模式在参与计算的数据切片较大时不太稳定,而在spark3.2.1的源码里面在读取orc数据时数据切分用的是混合模式,因此在发生同样问题的时候,直接指定BI就行,注意这个问题如果大家遇到了,那如果上游任务数据生成时的文件切片数可以放大,让每个文件的大小缩小,也是可以解决的
在这里插入图片描述

hive.exec.orc.default.stripe.size=67108864

上面这个配置用来决定spark读取orc文件时混合切分策略的阈值,默认是256MB,如果你任然向使用混合模式,哪就调小至64M。 ⭐️⭐️⭐️但是要注意,这个参数在3.x之后失效了,去更改分区大小来控制

spark.sql.orc.impl=hive

上面该配置决定spark用那种方式写入orc数据,默认是native,即内置的类库,但是如果数要流向hive,就要配置成hive

hive.merge.orcfile.stripe.level=true

上面这个参数用来控制orc数据写入时进行的合并策略,为true使用stripe级别进行,而当该参数设置为false时,合并操作会在文件级别进行。这种合并操作是通过启动一个独立的map-reduce任务来实现的,旨在优化存储和提升查询效率。具体来说,ORC文件的合并有助于减少小文件的数量,从而避免因大量小文件而导致的处理效率低下问题。此外,ORC文件的索引结构(如行组级别、条带级别的索引)使得查询时能够快速定位到需要的数据块,避免了不必要的磁盘和网络I/O,进而提高了查询效率。

注意:操作orc和Parquet格式时,一定要合理的带上上面的参数,否则轻则数据精度丢失,重则不可识别数据文件导致任务在读取文件阶段就失败了

27、任务重试次数,这个配置在工作中不同的开发版技术环境是不同的值,如果有需要可以更改,默认的原生栈是2

spark.yarn.maxAppAttempts=2

28、有的时候spark任务,当你不太愿意给再多的资源时候,但任务缺失由于数据太多,比如数据块拉去比较耗时之类的,会触发任务超时,这个时候你可以设置下面的两个参数,把超时时间延长,或者设置为0不预防超时

spark.network.timeout=600s    #网络超时时间,默认单位毫秒,默认值10000
spark.task.timeout=100     #task任务超时时间,默认单位秒,默认值3

29、推测执行,这个配置和hive任务的推测执行一样的目的,启动另一个相同的task并行,那个先成功就用那个,另一个关闭,通常情况下为了任务的稳步运行和资源的择优,要确保是关闭的,但如果集群部分节点的状态不佳,导致任务执行缓慢等,就开启这个配置,开启后再执行任务会看到job有skip的就是一个成功后另一个暂停的表现

spark.speculation=false

30、这个参数是容器和driver的心跳机制间隔,

spark.executor.heartbeatInterval=20s   #默认单位毫秒,默认值在 Spark 1.6 版本之前,默认值是 10000 ,在 1.6 及之后的版本中,默认值是 3000

31、如果你的sql要使用模糊匹配字符,涉及了字母的大小写,你可以设置下面这个参数为false,忽略大小写,这样就不需要LOWER或UPPER函数了

spark.sql.caseSensitive=false

32、在spark3.2之前sparksql操作数据,能向一个非空路径运行CREATE TABLE AS SELECT,但3.2时候会抛出运行异常,通过设置下面的参数可以继续使用这个能力,但其实一般也没有人怎么用

spark.sql.legacy.allowNonEmptyLocationInCTAS=true

下面是源码里面的原话

In Spark 3.2, `CREATE TABLE AS SELECT` with non-empty `LOCATION` will throw `AnalysisException`. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`.

33、当select的表,路径下存在非数据文件的路径时,在数据分析的时候会报错,这个时候需要用下面的参数,让spark强制对目录递归读取

mapred.input.dir.recursive=true;
mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs=true;
mapreduce.input.fileinputformat.input.dir.recursive=true;

34、关闭sql计算时的全段代码融合能力,默认值是true,在sql计算的时候,如果你时常关注任务的web执行计划,你会发现spark的sql架构常常会将多个有关联可以并行的执行计划融合成一个阶段执行,这种能力可以优化内部算子的调用开销,但是在大任务处理的时候会发生编译错误,因此如果你的任务很大,那么最好把这个能力关掉,虽然这会影响一些任务的性能和增加一些计算开销

spark.sql.codegen.wholeStage=false
spark.sql.codegen.aggregate.map.twolevel.enabled=false

35、节点黑名单,该功能在spark2.2开始支持,这些配置通常可以被配置在spark-defaults.conf中做为默认参数存在,这是一种保障机制,当你的集群中存在某些节点状态异常,你可以配置黑名单的方式,使得调度器开始记录任务失败情况,达到一个阈值的时候尽量不再向该节点上运行task,注意是尽量!!除非达到黑名单阈值的上限,并且当你有黑名单能力需求的时候,通常会一起打开推测执行。不过要注意的是spark的节点黑名单最小的管控单位是以执行器为单位的也就是executor,这就导致很容易出现一种情况就是同一节点重启了一个executor,这种时候就会越过executor的黑名单拦截,不过spark还提供了对executor失败使得节点进入黑名单的设置

spark.excludeOnFailure.enabled
spark.excludeOnFailure.application.maxFailedTasksPerExecutor=2
在一个application中本该成功的taskset运行时,也就是不发生Executor本身错误的taskset能够成功的话,那么单个执行器上失败多少个不同的task时,执行器进入黑名单,注意用词不同的!!!不同的!!!!源码里的原文就是How many different tasks must fail on one executor, in successful task sets,所以有时候你认为要进入黑名单的执行器,不是你以为的你以为

spark.excludeOnFailure.application.maxFailedExecutorsPerNode=2
在一个节点上,因为进入黑名单的Executor达到多少个时,这个节点进入黑名单

spark.excludeOnFailure.stage.maxFailedTasksPerExecutor=2
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode=2
这两个和前面两个一样的作用,不同的是失败的task,要属于同一个stage

spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor=1
对于计算计划已经是给定的任务,也就是任务上一秒拿到这个执行资源,但紧接着这个资源进入了黑名单的这种极端情况,在排除该任务执行计划中的执行器之前,可以在对应的执行器上尝试调度多少次,通常保持默认的1

spark.excludeOnFailure.task.maxTaskAttemptsPerNode=2
作用和含义和上一条一样,不过是在同节点的其他执行器上尝试调度机会

spark.excludeOnFailure.timeout=1h
节点或者执行器进入黑名单后的时间,默认1h,结束后从新进入可用资源队列

3.1.0开始在源码中有三个默认是关闭的配置,知道就行尽量不要用,毕竟按照我的经验和直觉,搞不好这三个配置功能还不稳定,官方保存关闭应该有点说法
spark.excludeOnFailure.killExcludedExecutors=false         #是否kill掉调度失败的执行器
spark.excludeOnFailure.killExcludedExecutors.decommission=false       #先尝试停用执行器而不是直接杀死
spark.excludeOnFailure.application.fetchFailure.enabled=false

⭐️⭐️注意上面这些黑名单的配置项,早些的版本我没有特意关注不好说,但确定的是3.1.0开始已经用上面的了!!!所以如果你用的是3.1.0之前的版本尽量用下面这一版,因为在源码中可以看到失效的映射
在这里插入图片描述

spark.blacklist.enabled
spark.blacklist.timeout
spark.blacklist.application.maxFailedTasksPerExecutor
spark.blacklist.application.maxFailedExecutorsPerNode
spark.blacklist.stage.maxFailedTasksPerExecutor
spark.blacklist.stage.maxFailedExecutorsPerNode
spark.blacklist.task.maxTaskAttemptsPerExecutor
spark.blacklist.task.maxTaskAttemptsPerNode

36、上面提到对于任务执行中分区文件的大小控制,在sql模块上用的是aqe去把控,但是core模块只有从新调整分区数的方法,或者大文件拆分。而对于分区数来说,总体上core模块中提供了五种不同情况下的分区个数修改方法

a、使用repartition()方法:
repartition()方法可以用于重新分区RDD,并返回一个新的RDD。
它允许用户指定新的分区数,并会触发shuffle操作来重新分配数据。
示例代码:newRDD = rdd.repartition(numPartitions),其中numPartitions是新的分区数。

b、使用coalesce()方法:
coalesce()方法也可以用于调整RDD的分区数,但它主要用于减少分区数,以减少小任务的数量和降低调度开销。
与repartition()不同,coalesce()在减少分区数时默认不会触发shuffle操作(除非设置了shuffle=true),这在处理大数据集时更为高效。
示例代码:newRDD = rdd.coalesce(numPartitions),其中numPartitions是新的分区数。

c、在读取数据时指定分区数:
当使用Spark的API(如textFile()、parquetFile()等)读取外部数据源时,可以通过可选的参数minPartitions来指定最小的分区数。
这有助于在数据读取阶段就控制RDD的分区数,以便后续处理。

d、配置Spark的默认并行度:
可以通过设置Spark配置参数spark.default.parallelism来指定默认的并行度(即RDD的分区数)。
这个参数会影响所有没有显式指定分区数的RDD操作。
示例配置:val conf = new SparkConf().setAppName("appName").setMaster("masterURL").set("spark.default.parallelism", "numPartitions")。

e、使用自定义分区器:
对于需要基于特定逻辑进行分区的场景,可以使用自定义分区器。
自定义分区器需要继承Partitioner类并实现numPartitions和getPartition方法。
使用自定义分区器可以精确地控制数据的分区方式,以满足特定的业务需求。

而至于sql模块的aqe在实际使用中会发现分区具体大小生效的优先级高于分区个数。

37、在spark3.x之后,sql模块对底层处理时间格式化的类不在使用之前的simpledateformat,因为simple它对时间数据的校验不严谨,造成数据生成会携带后缀。但类库的变动,导致用spark3.x去处理2.x的时间数据就会报错,因此spark提供了如下配置,在运行时修复此问题。但是效果不太理想最直接的方法是用substr函数截取对应位数的时间字符串数据

spark.sql.legacy.timeParserPolicy=LEGACY

38、executor和driver的堆外内存,默认值占用自身整体内存的百分之10和384字节取最小值,这个配置一般不改动,它是用来存放driver除了堆空间之外的,比如指针栈等这种数据,一般默认的就够

spark.driver.memoryOverhead=driverMemory * spark.driver.memoryOverheadFactor, with minimum of 384
spark.driver.memoryOverheadFactor=0.1
spark.executor.memoryOverhead=executorMemory * spark.executor.memoryOverheadFactor, with minimum of 384
spark.executor.memoryOverheadFactor=0.1

39、如果你用的是pyspark,那么在设置executor的内存时一定要携带如下的配置,和spark.executor.memory保持一致,因为源码中没有默认值,而pyspark和其他的模块是解偶的,所以我在使用的时候都会配置,担心没有限制的情况发生

spark.executor.pyspark.memory

40、spark任务的执行路径,为了执行效率它应该是一个服务器的物理路径,放在这里只是让大家和前面的拉数据请求上限数一样知道就行,因为在spark1.0之后就不需要特意的手改了,而是会在不同的资源架构下采用对应的默认值或者配置项决定,standalone或者master下的需要在spark-defaults.conf中配置spark.local.dir,yarn 下由yarn.nodemanager.local-dirs拖管

spark.local.dir

41、在任务启动后是否在日志中打印所有有效的任务配置,默认false

spark.logConf

42、提交到的框架以及模式,通常会使用--master这种优先级最高的配置方式

spark.master
spark.submit.deployMode
spark.yarn.queue=xx   #on yarn时指定队列

43、对于shuffer压力较大任务,spark提供了外部shuffer的能力。spark默认的shuffer是在任务中进行的,也就是多个input/output之后的任务会跟随着shuffer任务,如下面这种任务ui
在这里插入图片描述
这种shuffer模式对于日常任务没有问题,但是如果任务较大,那么缺点就很明显了资源利用率低,因为等shuffer任务会占用当前任务的进程,再shuffer任务返回结果的途中,后面的任务不动,而外部shuffer的好处相对的可以提高资源利用率,增加动态资源调度的弹性,避免开启了动态资源调度时,因为shuffle数据的占用,导致executor无法回收的问题,或者因为executor被回收了,造成shuffle数据被销毁,从而触发该子任务的重算流程,还可以减少executor内部之间网络带宽和本地带宽的占用

spark.shuffle.service.enabled=true    #开启外部shuffer,默认是false
spark.shuffle.service.port=7337     #外部shuffer的端口,默认是7337
spark.shuffle.service.name=spark_shuffle    #外部shuffer服务的名字,在spark的自有体系中会自带,但如果你是on yarn,则必须是yarn.nodemanager.aux-services中有的
spark.shuffle.service.index.cache.size=100m     #shuffer服务对存储索引的空间大小,默认是100m,适当的提高可以减少io和磁盘的开销
spark.shuffle.sort.bypassMergeThreshold=200    #这个配置是当你的任务shuffer是用来排序的,那么如果你的任务没有另外配置聚合或者动态跟踪分区数时,生成的task并行默认最多200
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO   #这个参数3.0.0开始起效,用来另行指定排序shuffer的实现类,保持默认就行不用动
spark.shuffle.spill.compress=true     #shuffer时数据是否压缩,默认true,压缩用到的类库采用spark.io.compression.codec的值
spark.shuffle.accurateBlockThreshold=100 * 1024 * 1024     #这个配置是一个默认的shuffer数据快安全阈值,默认是100 * 1024 * 1024,单位字节,防止shuffer服务低估shuffer数据的大小,从而爆发OOM,超过这个配置shuffer服务会高度关注数据的大小
spark.shuffle.registration.timeout=5000    #任务注册外部shuffle服务的超时时间,单位毫秒
spark.shuffle.registration.maxAttempts=3    #注册外部shuffer服务的重试次数

spark.shuffle.maxChunksBeingTransferred=Long.MAX_VALUE  #这个是shuffer服务的最大传输并行度,知道就行不要随便改,默认Long.MAX_VALUE,根据官方的文档到达这个上限时新的传输会被拒绝导致失败

44、spark日志相关的任务配置,在商用环境下,日志的成本是很高的,因此有些时候会有紧急情况需要把日志策略做临时更改,所以除特殊情况之外一般在集群中默认配置,不要任务自己瞎写

spark.eventLog.enabled=false    #是否开启日志聚合,好处是任务解释后在spark历史服务器中也可以打开执行计划等关键信息
spark.eventLog.logBlockUpdates.enabled=false   #是否记录数据快更新的日志,默认false,spark2.3.0之后开始支持
spark.eventLog.longForm.enabled=false    #是否用长表格显示日志,默认false
spark.eventLog.compress=false       #日志是否压缩,默认false
spark.eventLog.dir=file:///tmp/spark-events    #日志存放的父路径,默认在本地路径,这个配置通常会在集群配置文件中配置,目的路径通常是hdfs
spark.eventLog.overwrite=false     #是否覆盖的方式写入,一定要保持false,除非你只保留最新任务的日志,但反过来思考,你只保留最新任务的没大作用
spark.eventLog.buffer.kb=100k      #日志输出流的缓冲区大小

spark.ui.dagGraph.retainedRootRDDs=Int.MaxValue      #垃圾回收前,记录的DAG节点状态数量,一般不改,默认就行

45、广播变量支持的参数调整

spark.broadcast.blockSize=4m    #广播数据集数据快的大小,不要太大或太小,会影响传输效率
spark.broadcast.checksum=true   #是否启用广播数据的饱和式传输,好处是防止数据快有损坏,坏处就是增加传输成本
spark.broadcast.UDFCompressionThreshold=1m  #对用户自定义函数和python rdd数据的广播数据压缩阈值

网站公告

今日签到

点亮在社区的每一天
去签到