概述

Spark(Spark SQL)在离线计算场景应用广泛,为了保证Spark应用更好地满足业务场景需求,同时能够在线上稳定地运行,我们需要关注Spark的调优工作。首先,需要了解Spark对外的接口并如何高效地使用;其次要搞清楚内部的运行机制以及参数配置体系;最后是要能够深入分析spark的日志信息。进一步来讲,对于Spark的深度使用者,需要关注社区各个版本的迭代、bug修复以及性能优化的情况,才能更好地打开思路,提高解决问题的效率。主要途径有:spark的release-note、databricks官方博客、源码。

为了方便Spark相关性能问题的排查,本文记录了日常Spark使用过程中遇到的问题和解决思路,用于积累过程中进行复盘总结,强化Spark的深入理解和实战经验。

FAQ

  1. 如何优化Spark集群CPU利用率?

    产生CPU利用率低一般有两种原因导致:Executor上线程挂起和Task任务数太多调度开销大。需要以下三方面来考虑:

    • 并行度:拆分任务的粒度,涉及参数有:spark.default.parallelism、spark.sql.shuffle.partitions、spark.sql.files.maxPartitionBytes、spark.sql.files.openCostInBytes
    • 并发度N:集群总体cores,spark.executor.instances、spark.executor.cores、spark.task.cpus
    • 执行内存M:executor在可获得执行内存,下限是spark.executor.memory spark.memory.fraction (1- spark.memory.storageFraction),上限是spark.executor.memory * spark.memory.fraction

    在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间。这样,在运行时 CPU 利用率往往不会太差。

    总而言之,需要平衡“并行度、并发度、执行内存”,去提升CPU利用率,所以更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等这些工具,去观察你的CPU利用率,然后回过头来平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式来去调优。

  2. 针对CET达到几百规模的大SQL在上千规模Hadoop集群的执行性能调优?

    SparkSQL thriftserver 侧的优化:

    • 元数据读写一些锁的优化,从比较大的锁粒度降到比较小的锁粒度;
    • 引入多线程,提高解析每个互不依赖的子查询的并行度。

    DAGScheduler侧的优化:

    • 引入线程池提高Task被调度到Executor的效率,降低调度延迟;
    • 适当调小spark.locality.wait.node,降低延迟调度的时间,提高调度的效率;
    • 适当调大spark.resultGetter.threads的数值,提高处理返回结果的效率。
  3. 如何利用到ReuseExchange的特性?

    比如有这样一个场景,读取一批parque文件,都是按照A字段做group by后,进行两个场景的计算。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //方式1
    val filePath: String = _
    val df = spark.read.parquet(filePath)
    val dfPV = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
    val dfUV = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
    dfPV.union(dfUV).show()

    //方式2
    val filePath: String = _
    //预先分好组,当然如果dfPV和dfUV有过滤的操作的话(结果集较小),强行reuse就不太适合了
    val df = spark.read.parquet(filePath).repartition($"userId")
    //下面两个dfPV和dfUV,就会复用df
    val dfPV = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
    val dfUV = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
    dfPV.union(dfUV).show()

    前提条件:

    • 多个查询所依赖的分区规则要与 Shuffle 中间数据的分区规则保持一致
    • 多个查询所涉及的字段(Attributes)要保持一致
  4. Spark集群机器选型一般依据是什么?

    • 如果你的计算场景涉及到大量的聚合、排序、哈希计算、数值计算等等,那么你的机器配置就要加强CPU;
    • 如果你的计算场景需要反复消耗同一份或是同一批数据集,比如机器学习、数据分析、图计算,那么为了把需要频繁访问的数据缓存进内存,你自然需要加大内存配置;
    • 如果你的计算场景会引入大量shuffle,又不能通过广播来消除Shuffle,那么你就需要配置足够的SSD以及高吞吐网络。
  5. 建模分析时,判断一批IP地址是否在原始海量的IP段范围中,性能不理想。抽象为非等值JOIN的优化?

    分析物理算子org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec,扩展出一个BroadcastRangeJoinExec算子,通过给那个小表做索引并排序,Join时候就不需要每一条都扫描,只扫描一部分就可以了。SPARK-24020

  6. 建模分析时,少量重点人员账号与原始数据进行碰撞,性能不理想。在大小表基于非分区字段join时,大表读取的数据过多,执行性能较差?

    可以采用Runtime Filter的方式,它的原理和DPP(动态分区裁剪)类似,因为DPP要求你的Join条件中包含了分区字段才会开启DPP,Runtime Filter可以把一些非分区字段条件形成一个filter放到大表上,类似传统数据库Query Rewrite,如果表大表能够过滤较多数据,从而可以提高JOIN的性能。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    case class IndexJoinRule(sparkSession: SparkSession) 
    extends Rule[LogicalPlan]
    with Logging {

    override def apply(plan: LogicalPlan):LogicalPlan = {
    val convertedPlan = applyIndexJoinRule(Plan)
    convertedPlan
    }

    private def applyIndexJoinRule(plan: LogicalPlan) = plan transform {
    // select a.col1, b.col2
    case j@Join(
    left@Project(_,lFilter@Filter(lConditon,_))),
    right@Proejct(_, rFilter@Filter(rCondition, _)),_,
    Some(joinCond),_) =>
    convertInnerJoin(j, left, lFilter,lConditon,
    right, rFilter,rConditon, joinCond)
    // select a.*, b.col2
    case j@Join(
    left@Filter(lCondition, _),
    right@Project(_, rFilter@Filter(rCondtion, _)),_,
    Some(joinCond), _) =>
    convertInnerJoin(j, left, lFilter,lConditon,
    right, rFilter,rConditon, joinCond)
    //其他计划规则

    }

    private def convertInnerJoin(j: Join,
    left: LogicalPlan,
    lFilter: Filter,
    lConditon: EXpression
    right: LogicalPlan,
    rFilter: Filter,
    rCondition: Expression,
    joinCond: Expression) : LogicalPlan = {

    //基于统计信息和配置来确定是否转换logicalplan

    //将小表的索引列值查询出来追加到大表的过滤条件
    //小表结果
    val result = {
    val qe = sparkSession.sessionState.executePlan(planToQuery)
    qe.assertAnalyzed()
    new DataSet[Row](qe, RowEncoder(qe.analyzed.schema))
    }.collect().map{row=>Literal.create(row.get(0), joinCol.dataType)}.distinct

    val newJoin = {
    //转为in条件
    val inCond = In(getJoinAttr(joinCond, right).get, result)
    val newCond = And(rCondition, inCond)
    val newRight = right.transform{
    case f@Filter(_,_) => f.copy(condition = newCond)
    }
    j.copy(right = newRight)
    }
    newJoin.copy(left, right)
    }

    }
  1. 1
    2
    org.apache.spark.shuffle.FetchFailedException: 
    Failed to send RPC XXX to /xxx:7337:java.nio.channels.ColsedChannelException
    • 原因:external shuffle服务将数据发送给container时,发现container已经关闭连接,出现该异常应该和org.apache.spark.shuffle.FetchFailedException: Connection from /xxx:7337 closed同时出现;
    • 解决方案:参考org.apache.spark.shuffle.FetchFailedException: Connection from /xxx:7337 closed的解决方案。
    • 进一步补充:
      • 在验证中发现关闭参数spark.shuffle.readHostLocalDisk,可以规避该异常的出现;
      • 顺着上述参数发现在spark3.0.0中org.apache.spark.network.shuffle.ExternalBlockStoreClient#getHostLocalDirs指定rpc操作后默认关闭了RPC client,导致后续其他任务使用该client时出现已经关闭的情况。排查发现社区在SPARK-32663已经修复了该问题。
  2. 如何加快netty堆外内存的回收?snappy+parquet格式数据会导致,netty堆外内存增长太快,导致netty使用过多direct memory报错?

    首先,io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7633633280, max: 7635730432),这个问题,往往出现在Shuffle read阶段,spark用netty的client/server来拉取远端节点数据,并且透过java.nio.DirectByteBuffers来缓存接收到的数据块。当数据分布存在比较严重的倾斜问题的时候,就会导致某些Block过大,从而导致单个线程占用的Direct Buffer超过16MB,从而报出上面的错误。

    因此,要从根本上解决问题,可以先搞定数据倾斜的问题,如果数据倾斜消除了,那么这个问题大概率自己就会消失掉。关于消除数据倾斜的方法,可以参考后面AQE那几讲,以及两阶段Shuffle那一讲。

    接下来,假设你消除了Data Skew之后,这个报错还在,那么就继续用下面的办法。DirectByteBuffers默认的大小就是spark.executor.memory的大小,也就是说,它在逻辑上,会“计入”Executor memory内存的消耗。spark.executor.memory这玩意其实指定的JVM heap堆内的内存大小,而DirectByteBuffers是堆外内存,按理说两者应该区别对待,然而默认情况下,并没有。因此,如果DirectByteBuffers消耗非常的过分,那么我们可以在spark.executor.extraJavaOptions当中,特意地去指定-XX:MaxDirectMemorySize这个参数,这个参数,就是用来指定DirectByteBuffers的内存大小,可以把它设置的大一些。

    再者,假设上面的设置,还不能解决问题,那么接下来,我们就得做进一步的精细化调优。首先,把spark.reducer.maxSizeInFlight,设置成-XX:MaxDirectMemorySize / spark.executor.cores ,这个设置的意图,是降低每个线程需要缓存的数据量。然后,把spark.maxRemoteBlockSizeFetchToMem,设置成spark.reducer.maxSizeInFlight / 5,这个设置的意图,是为了把大的Block直接落盘,从而迅速释放线程占用的Direct buffer,降低Direct buffer(也就是堆外内存)的消耗,从而降低OOM的风险。

  3. java.lang.OutOfMemoryError: GC overhead limit exceeded

    • 原因:数据量太大,内存不够。
    • 解决方案:
      • 增大spark.executor.memory的值,减小spark.executor.cores
      • 减少输入数据量,将原来的数据量分几次任务完成,每次读取其中一部分
  4. ERROR An error occurred while trying to connect to the Java server (127.0.0.1:57439) Connection refused

    • 原因:
      • 节点上运行的container多,每个任务shuffle write到磁盘的量大,导致磁盘满,节点重启
      • 节点其他服务多,抢占内存资源,NodeManager处于假死状态
    • 解决方案:
      • 确保节点没有过多其他服务进程
      • 扩大磁盘容量
      • 降低内存可分配量,比如为总内存的90%,可分配内存少了,并发任务数就少了,出现问题概率降低
      • 增大NodeManager的堆内存
  5. org.apache.spark.shuffle.FetchFailedException: Failed to connect to /9.4.36.40:7337

    • 背景:shuffle过程包括shuffle read和shuffle write两个过程。对于spark on yarn,shuffle write是container写数据到本地磁盘(路径由core-site.xml中hadoop.tmp.dir指定)过程; shuffle read是container请求external shuffle服务获取数据过程,external shuffle是NodeManager进程中的一个服务,默认端口是7337,或者通过spark.shuffle.service.port指定。
    • 定位过程:拉取任务运行日志,查看container日志;查看对应ip上NodeManager进程运行日志,路径由yarn-env.sh中YARN_LOG_DIR指定。
    • 原因:container请求NodeManager上external shufflle服务,不能正常connect,说明NodeManager可能挂掉了,原因可能是:
      • 节点上运行的container多,每个任务shuffle write到磁盘的量大,导致磁盘满,节点重启
      • 节点其他服务多,抢占内存资源,NodeManager处于假死状态
    • 解决方案:
      • 确保节点没有过多其他服务进程
      • 扩大磁盘容量
      • 降低内存可分配量,比如为总内存的90%,可分配内存少了,并发任务数就少了,出现问题概率降低
      • 增大NodeManager的堆内存
  6. spark任务中stage有retry

    • 原因:
      • 下一个stage获取上一个stage没有获取到全部输出结果,只获取到部分结果,对于没有获取的输出结果retry stage以产出缺失的结果;
      • 部分输出结果确实已经丢失 ,部分输出结果没有丢失,只是下一个stage获取结果超时,误认为输出结果丢失。
    • 解决方案:
      • 针对原因(1),查看进程是否正常,查看机器资源是否正常,比如磁盘是否满或者其他;
      • 针对原因(2),调大超时时间,如调大spark.network.timeout值。
  7. Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (200) reached)

    • 原因:executor失败重试次数达到阈值
    • 解决方案:
      • 调整运行参数,减少executor失败次数;
      • 调整spark.yarn.max.executor.failures的值,可在spark-defaults.conf中调整。确定方式,在日志中搜索”Final app status:”,确定原因,在日志统计”Container marked as failed:”出现次数。
  8. task反复调度到有问题的executor?

    通过这些黑名单的设置可以避免由于 task 反复调度在有问题的 executor/node (坏盘,磁盘满了,shuffle fetch 失败,环境错误等)上,进而导致整个 Application 运行失败的情况。

扩展阅读