该文档仅提供性能调优的参考
资源调优
Spark 可以通过参数配置资源分配。资源分配不合理会导致 job 运行过慢甚至失败。资源调优就是为当前 job 分配合适的资源,提高资源利用率最终加快任务运行速度。
Driver
配置项 |
默认值 |
参考值 |
备注 |
spark.driver.memory |
1g |
不 OOM 下越小越好 |
如果数据需收集到 driver ,那么需要根据数据大小配置内存,防止 OOM |
spark.driver.memoryOverhead |
max (driverMemory * 0.1, 384) |
/ |
|
spark.driver.cores |
1 |
/ |
Executor
内存
-
太大的内存会导致 JVM 垃圾回收变慢,尽量小于 64 g
-
executor 申请的总内存不能超过 node/container 的总内存,申请的内存大小为以下参数总和
- spark.executor.memory
- spark.executor.memoryOverhead
- spark.memory.offHeap.size
- spark.executor.pyspark.memory
CPU
- 1 core 会导致无法利用 JVM 多线程,以及会使一些 broadcast 相关的参数失效
- core 设置太多可能会使 job 速度运行变慢
配置项 |
默认值 |
建议值 |
备注 |
spark.executor.memory |
1g |
4-8 g |
在不 OOM 情况下设小 |
spark.executor.memoryOverhead |
max (executor * 0.1, 384) |
/ |
用于虚拟机的开销、内部的字符串、本地开销等 |
spark.memory.offHeap.enabled |
false |
/ |
当有需要堆外内存的操作时才配置,一般默认 false 即可 |
spark.memory.offHeap.size |
0 |
/ |
offHeap.enabled true 才生效 |
spark.executor.cores |
|
4 |
|
Executor 数量 |
无 |
总 core / executor.cores |
Executor 内存分配
指 spark.executor.memory 的内存分配
Spark 的内存分为两大类:执行内存和存储内存。
- 执行内存:在shuffle, join, aggregation 等计算中使用的内存。
- 存储内存:集群中缓存和 broadcast 使用的内存。
spark.memory.fraction (默认 0.6)
用于执行内存和存储内存的百分比,剩余是为用户数据结构,Spark metadata 等预留的。在预留大小足够下提高此值,可以减少溢写磁盘。
存储内存和执行内存共享同一块空间,且有动态占用机制
- 双方基础内存占比由 spark.memory.storageFraction 决定
- 一方空闲时,另一方可以占用
- 当执行内存不足,且存在被占用内存时:可要求存储内存归还占用部分。存储内存会将占用部分转存到磁盘
- 当存储内存不足,且存在被占用内存时:不可要求执行内存归还
spark.memory.storageFraction(默认0.5):
不受驱逐的存储内存百分比,即这个占比的内存一定不会被驱逐到磁盘中
用于 task 的执行内存大小可以计算得出
spark.executor.memory * spark.memory.fraction *(1-spark.memory.storageFraction)/ spark.executor.cores
总结:
- 发生磁盘溢写时:可尝试调大 spark.executor.memory 或提高 spark.memory.fraction
- spark.memory.storageFraction 一般取默认值即可,不太推荐在溢写时调小该值
动态内存分配
Spark 提供了 Dynamic Executor Allocation ,它能够动态调整 executor 数量,以下场景可以考虑配置
- 和其他团队共享集群
- 在乎 cost
- 某一个 application 有若干不同大小的 job
主要参数如下
spark.dynamicAllocation.enable false
spark.dynamicAllocation.executorIdleTimeout 60s // 如果任务执行时间普遍短,可以调小 timeout
spark.dynamicAllocation.initialExecutors minExecutors // 对于大的 job,调大 initialExecutors
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors infinity //共享的 spark 集群最好配置 maxExecutors
并行度
Spark 并行度 = min( 任务数 = 分区数,总核数 )
一个参考值:分区数 = 总核心数的 2-3 倍
分区初始数量
分区数会影响 Spark 集群的并行度,下面有两种方式来计算分区数量
- 内存资源紧缺时:
Math.round(inputDataSize/availableTaskMemoryMB()).toInt
其中 inputDataSize 为每个 task 的数据大小,可以从 Spark UI 上查看;availableTaskMemoryMB 即为上文计算的用于 task 的执行内存大小
- 内存资源足够时:分区数量先设置为集群可用总 cores *2,然后逐步往上调,寻找一个最佳分区数(core 的整数倍)
什么是最佳分区数呢?执行时间最短就是最佳,此外还可以根据 Spark UI 判断
- 分区数量太多的表现:executor cpu 内存利用率过低,过多 pending 的 task
- 分区数量太少的表现:executor 空闲
分区调整
- 分区数量调整:使用
repartition()
可以调整分区数量,但会发生 shuffle,若减少分区,可以尝试使用coalesce()
来避免 shuffle (一些特殊场景 repartition 更优,其增加的 shuffle 可能会减少其他地方的 shuffle,降低整体的时间) - 分区策略调整:若发生数据倾斜,可以通过调整合适的分区策略避免
Shuffle 调优
Shuffle 调优的目的是:避免 spill 到 disk 导致任务速度变慢
当在 Spark UI 观察到存在溢写时,一般有以下手段
- 增加内存
- 配置堆外内存
- 增加分区以减少每个任务的数据量
- 调整 shuffle 参数
相关配置如下
配置项 |
默认值 |
推荐 |
备注 |
spark.executor.memory |
1g |
增加 |
内存足够增加内存是最好的方式 |
spark.sql.shuffle.partitions |
200 |
增加 |
调大分区数可以减少每个分区的数据量防止 spill |
|
关闭 |
打开 |
配置堆外内存减少 shuffle |
spark.memory.fraction |
0.6 |
增加 |
增加存储内存和执行内存的总额 |
spark.shuffle.file.buffer |
32k |
64k |
shuffle write 时,会先写到 BufferedOutputStream 缓冲区中,然后再溢写到磁盘。增加此值可以减少 IO 次数,推荐 64k |
spark.shuffle.service.index.cache.size |
100m |
减少 |
缓存的 shuffle 索引文件中索引的数量,减少该值可以防止内存爆炸 |
spark.io.compression.lz4.blockSize |
32k |
增加 |
增大此配置以减少 shuffle 文件的大小 |
spark.shuffle.service.enabled |
false |
/ |
启用外部 shuffle 服务,这样 spark shuffle file 不会保存在 executor |
spark.shuffle.io.backLog |
-1 |
/ |
启用 shuffle.service 时,控制 accept queue |
spark.shuffle.registration.timeout |
5000 |
/ |
启用 shuffle.service 时,注册的超时时间,推荐增大 |
另外还可以优化代码(SQL or RDD API)防止 shuffle
- Join 时广播小表(如使用 Broadcast Hash Join)
- 尽量使用窄依赖而不是宽依赖
- 使用 ReduceByKey 而不是 GroupbyKey
- 在要进行宽依赖之前,或者进行完一系列复杂操作后,或进行完某些耗时操作后,persist RDD 进行缓存
Spark 调优工具
推荐使用一些 Spark 调优工具来帮助调优
- Sparklens
- Sparklint
- Dr Elephant