Spark性能调优实战
概述
Spark(Spark SQL)在离线计算场景应用广泛,为了保证Spark应用更好地满足业务场景需求,同时能够在线上稳定地运行,我们需要关注Spark的调优工作。首先,需要了解Spark对外的接口并如何高效地使用;其次要搞清楚内部的运行机制以及参数配置体系;最后是要能够深入分析spark的日志信息。进一步来讲,对于Spark的深度使用者,需要关注社区各个版本的迭代、bug修复以及性能优化的情况,才能更好地打开思路,提高解决问题的效率。主要途径有:spark的release-note、databricks官方博客、源码。
为了方便Spark相关性能问题的排查,本文记录了日常Spark使用过程中遇到的问题和解决思路,用于积累过程中进行复盘总结,强化Spark的深入理解和实战经验。
FAQ
如何优化Spark集群CPU利用率?
产生CPU利用率低一般有两种原因导致:Executor上线程挂起和Task任务数太多调度开销大。需要以下三方面来考虑:
- 并行度:拆分任务的粒度,涉及参数有:spark.default.parallelism、spark.sql.shuffle.partitions、spark.sql.files.maxPartitionBytes、spark.sql.files.openCostInBytes
- 并发度N:集群总体cores,spark.executor.instances、spark.executor.cores、spark.task.cpus
- 执行内存M:executor在可获得执行内存,下限是spark.executor.memory spark.memory.fraction (1- spark.memory.storageFraction),上限是spark.executor.memory * spark.memory.fraction
在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间。这样,在运行时 CPU 利用率往往不会太差。
总而言之,需要平衡“并行度、并发度、执行内存”,去提升CPU利用率,所以更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等这些工具,去观察你的CPU利用率,然后回过头来平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式来去调优。
针对CET达到几百规模的大SQL在上千规模Hadoop集群的执行性能调优?
SparkSQL thriftserver 侧的优化:
- 元数据读写一些锁的优化,从比较大的锁粒度降到比较小的锁粒度;
- 引入多线程,提高解析每个互不依赖的子查询的并行度。
DAGScheduler侧的优化:
- 引入线程池提高Task被调度到Executor的效率,降低调度延迟;
- 适当调小spark.locality.wait.node,降低延迟调度的时间,提高调度的效率;
- 适当调大spark.resultGetter.threads的数值,提高处理返回结果的效率。
如何利用到ReuseExchange的特性?
比如有这样一个场景,读取一批parque文件,都是按照A字段做group by后,进行两个场景的计算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//方式1
val filePath: String = _
val df = spark.read.parquet(filePath)
val dfPV = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
dfPV.union(dfUV).show()
//方式2
val filePath: String = _
//预先分好组,当然如果dfPV和dfUV有过滤的操作的话(结果集较小),强行reuse就不太适合了
val df = spark.read.parquet(filePath).repartition($"userId")
//下面两个dfPV和dfUV,就会复用df
val dfPV = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
dfPV.union(dfUV).show()前提条件:
- 多个查询所依赖的分区规则要与 Shuffle 中间数据的分区规则保持一致
- 多个查询所涉及的字段(Attributes)要保持一致
Spark集群机器选型一般依据是什么?
- 如果你的计算场景涉及到大量的聚合、排序、哈希计算、数值计算等等,那么你的机器配置就要加强CPU;
- 如果你的计算场景需要反复消耗同一份或是同一批数据集,比如机器学习、数据分析、图计算,那么为了把需要频繁访问的数据缓存进内存,你自然需要加大内存配置;
- 如果你的计算场景会引入大量shuffle,又不能通过广播来消除Shuffle,那么你就需要配置足够的SSD以及高吞吐网络。
建模分析时,判断一批IP地址是否在原始海量的IP段范围中,性能不理想。抽象为非等值JOIN的优化?
分析物理算子org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec,扩展出一个BroadcastRangeJoinExec算子,通过给那个小表做索引并排序,Join时候就不需要每一条都扫描,只扫描一部分就可以了。SPARK-24020
建模分析时,少量重点人员账号与原始数据进行碰撞,性能不理想。在大小表基于非分区字段join时,大表读取的数据过多,执行性能较差?
可以采用Runtime Filter的方式,它的原理和DPP(动态分区裁剪)类似,因为DPP要求你的Join条件中包含了分区字段才会开启DPP,Runtime Filter可以把一些非分区字段条件形成一个filter放到大表上,类似传统数据库Query Rewrite,如果表大表能够过滤较多数据,从而可以提高JOIN的性能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60case class IndexJoinRule(sparkSession: SparkSession)
extends Rule[LogicalPlan]
with Logging {
override def apply(plan: LogicalPlan):LogicalPlan = {
val convertedPlan = applyIndexJoinRule(Plan)
convertedPlan
}
private def applyIndexJoinRule(plan: LogicalPlan) = plan transform {
// select a.col1, b.col2
case j (
left (_,lFilter (lConditon,_))),
right (_, rFilter (rCondition, _)),_,
Some(joinCond),_) =>
convertInnerJoin(j, left, lFilter,lConditon,
right, rFilter,rConditon, joinCond)
// select a.*, b.col2
case j (
left (lCondition, _),
right (_, rFilter (rCondtion, _)),_,
Some(joinCond), _) =>
convertInnerJoin(j, left, lFilter,lConditon,
right, rFilter,rConditon, joinCond)
//其他计划规则
}
private def convertInnerJoin(j: Join,
left: LogicalPlan,
lFilter: Filter,
lConditon: EXpression,
right: LogicalPlan,
rFilter: Filter,
rCondition: Expression,
joinCond: Expression) : LogicalPlan = {
//基于统计信息和配置来确定是否转换logicalplan
//将小表的索引列值查询出来追加到大表的过滤条件
//小表结果
val result = {
val qe = sparkSession.sessionState.executePlan(planToQuery)
qe.assertAnalyzed()
new DataSet[Row](qe, RowEncoder(qe.analyzed.schema))
}.collect().map{row=>Literal.create(row.get(0), joinCol.dataType)}.distinct
val newJoin = {
//转为in条件
val inCond = In(getJoinAttr(joinCond, right).get, result)
val newCond = And(rCondition, inCond)
val newRight = right.transform{
case f (_,_) => f.copy(condition = newCond)
}
j.copy(right = newRight)
}
newJoin.copy(left, right)
}
}
1
2org.apache.spark.shuffle.FetchFailedException:
Failed to send RPC XXX to /xxx:7337:java.nio.channels.ColsedChannelException- 原因:external shuffle服务将数据发送给container时,发现container已经关闭连接,出现该异常应该和org.apache.spark.shuffle.FetchFailedException: Connection from /xxx:7337 closed同时出现;
- 解决方案:参考org.apache.spark.shuffle.FetchFailedException: Connection from /xxx:7337 closed的解决方案。
- 进一步补充:
- 在验证中发现关闭参数spark.shuffle.readHostLocalDisk,可以规避该异常的出现;
- 顺着上述参数发现在spark3.0.0中org.apache.spark.network.shuffle.ExternalBlockStoreClient#getHostLocalDirs指定rpc操作后默认关闭了RPC client,导致后续其他任务使用该client时出现已经关闭的情况。排查发现社区在SPARK-32663已经修复了该问题。
如何加快netty堆外内存的回收?snappy+parquet格式数据会导致,netty堆外内存增长太快,导致netty使用过多direct memory报错?
首先,io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7633633280, max: 7635730432),这个问题,往往出现在Shuffle read阶段,spark用netty的client/server来拉取远端节点数据,并且透过java.nio.DirectByteBuffers来缓存接收到的数据块。当数据分布存在比较严重的倾斜问题的时候,就会导致某些Block过大,从而导致单个线程占用的Direct Buffer超过16MB,从而报出上面的错误。
因此,要从根本上解决问题,可以先搞定数据倾斜的问题,如果数据倾斜消除了,那么这个问题大概率自己就会消失掉。关于消除数据倾斜的方法,可以参考后面AQE那几讲,以及两阶段Shuffle那一讲。
接下来,假设你消除了Data Skew之后,这个报错还在,那么就继续用下面的办法。DirectByteBuffers默认的大小就是spark.executor.memory的大小,也就是说,它在逻辑上,会“计入”Executor memory内存的消耗。spark.executor.memory这玩意其实指定的JVM heap堆内的内存大小,而DirectByteBuffers是堆外内存,按理说两者应该区别对待,然而默认情况下,并没有。因此,如果DirectByteBuffers消耗非常的过分,那么我们可以在spark.executor.extraJavaOptions当中,特意地去指定-XX:MaxDirectMemorySize这个参数,这个参数,就是用来指定DirectByteBuffers的内存大小,可以把它设置的大一些。
再者,假设上面的设置,还不能解决问题,那么接下来,我们就得做进一步的精细化调优。首先,把spark.reducer.maxSizeInFlight,设置成-XX:MaxDirectMemorySize / spark.executor.cores ,这个设置的意图,是降低每个线程需要缓存的数据量。然后,把spark.maxRemoteBlockSizeFetchToMem,设置成spark.reducer.maxSizeInFlight / 5,这个设置的意图,是为了把大的Block直接落盘,从而迅速释放线程占用的Direct buffer,降低Direct buffer(也就是堆外内存)的消耗,从而降低OOM的风险。
java.lang.OutOfMemoryError: GC overhead limit exceeded
- 原因:数据量太大,内存不够。
- 解决方案:
- 增大spark.executor.memory的值,减小spark.executor.cores
- 减少输入数据量,将原来的数据量分几次任务完成,每次读取其中一部分
ERROR An error occurred while trying to connect to the Java server (127.0.0.1:57439) Connection refused
- 原因:
- 节点上运行的container多,每个任务shuffle write到磁盘的量大,导致磁盘满,节点重启
- 节点其他服务多,抢占内存资源,NodeManager处于假死状态
- 解决方案:
- 确保节点没有过多其他服务进程
- 扩大磁盘容量
- 降低内存可分配量,比如为总内存的90%,可分配内存少了,并发任务数就少了,出现问题概率降低
- 增大NodeManager的堆内存
- 原因:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /9.4.36.40:7337
- 背景:shuffle过程包括shuffle read和shuffle write两个过程。对于spark on yarn,shuffle write是container写数据到本地磁盘(路径由core-site.xml中hadoop.tmp.dir指定)过程; shuffle read是container请求external shuffle服务获取数据过程,external shuffle是NodeManager进程中的一个服务,默认端口是7337,或者通过spark.shuffle.service.port指定。
- 定位过程:拉取任务运行日志,查看container日志;查看对应ip上NodeManager进程运行日志,路径由yarn-env.sh中YARN_LOG_DIR指定。
- 原因:container请求NodeManager上external shufflle服务,不能正常connect,说明NodeManager可能挂掉了,原因可能是:
- 节点上运行的container多,每个任务shuffle write到磁盘的量大,导致磁盘满,节点重启
- 节点其他服务多,抢占内存资源,NodeManager处于假死状态
- 解决方案:
- 确保节点没有过多其他服务进程
- 扩大磁盘容量
- 降低内存可分配量,比如为总内存的90%,可分配内存少了,并发任务数就少了,出现问题概率降低
- 增大NodeManager的堆内存
spark任务中stage有retry
- 原因:
- 下一个stage获取上一个stage没有获取到全部输出结果,只获取到部分结果,对于没有获取的输出结果retry stage以产出缺失的结果;
- 部分输出结果确实已经丢失 ,部分输出结果没有丢失,只是下一个stage获取结果超时,误认为输出结果丢失。
- 解决方案:
- 针对原因(1),查看进程是否正常,查看机器资源是否正常,比如磁盘是否满或者其他;
- 针对原因(2),调大超时时间,如调大spark.network.timeout值。
- 原因:
Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (200) reached)
- 原因:executor失败重试次数达到阈值
- 解决方案:
- 调整运行参数,减少executor失败次数;
- 调整spark.yarn.max.executor.failures的值,可在spark-defaults.conf中调整。确定方式,在日志中搜索”Final app status:”,确定原因,在日志统计”Container marked as failed:”出现次数。
task反复调度到有问题的executor?
通过这些黑名单的设置可以避免由于 task 反复调度在有问题的 executor/node (坏盘,磁盘满了,shuffle fetch 失败,环境错误等)上,进而导致整个 Application 运行失败的情况。
扩展阅读
- Apache Spark 完全替代传统数仓的技术挑战及实践,马刚@eBay,大数据团队成员
- Spark性能调优实战,吴磊,FreeWheel机器学习团队负责人
- https://databricks.com/blog
- Spark性能调优指南笔记,笔者