暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

七、Spark调优与调试

程序猿小P 2021-06-03
209

一、使用SparkConf配置Spark

  • 在Scala中使用SparkConf创建一个应用
val conf = new SparkConf()
conf.set("spark.app.name","My Spark App")
conf.set("spark.master","local[4]")
conf.set("spark.ui.port","36000")//重载默认端口配置

//使用这个配置对象创建一个SparkContext
val sc = new SparkContext(conf)

当然你可以调用setAppName()和setMaster()来分别设置spark.app.name和spark.master的配置值。

复制
  • 更多的情况下,动态地为给定应用设置配置选项会方便很多。
$bin/spark-submit \
--class com.example.MyApp \
--master local[4] \
--name "My Spark App" \
--conf spark.ui.port=36000 \
myApp.jar

然后用户只要创建一个“空”的SparkConf,并直接传给SparkContext的构造方法就可以了。

复制
  • spark-submit也支持从文件中读取配置项的值。
    • 默认情况下,spark-submit脚本会在Spark安装目录中找到conf/spark-defaults.conf文件,尝试读取该文件中以空格隔开的键值对数据。
    • 你也可以通过spark-submit的--properties-File标记,自定义该文件的路径。
$bin/spark-submit \
--class com.example.MyApp \
--properties-File my-config.conf \
myApp.jar

## Contents of my-config.conf ##
spark.app.name  "My Spark App"
spark.master    local[4]
spark.ui.port   36000

复制
  • 配置的优先级

    • 优先级最高的是在用户代码中显式的调用set()设置的选项。
    • 其次是通过spark-submit的传递的参数
    • 再次是写在配置文件中的值,
    • 最后是系统的默认值。
  • 常用的配置项(参考125-126页)

二、Spark执行的组成部分:作业、任务和步骤

  • 在执行时,Spark会把多个操作合并为一组任务,把RDD的逻辑表示翻译为物理执行计划。

  • Spark程序会定义一个RDD对象的有向无环图(DAG),我们可以在稍后行动操作被触发时用它来进行计算。

  • 每个RDD维护了其指向一个或多个父节点的引用,以及表示其与父节点之间关系的信息。比如,当你在RDD上调用val b = a.map()时,b这个RDD就存下了对其父节点a的一个引用。这些引用使得RDD可以追踪到其所有的祖先节点。

  • Spark调度器从最终被调用行动操作的RDD出发,向上回溯所有必须计算的RDD。调度器会访问RDD的父节点,父节点的父节点,以此类推,递归向上生成计算所有必要的祖先RDD的物理计划。

1.步骤

  • 调度器为有向图中每个RDD输出计算步骤,步骤中包括RDD上需要应用于每个分区的任务。然后以相反的顺序执行这些步骤,步骤中包括RDD上需要应用于每个分区的任务。然后以相反的顺序执行这些步骤,计算得出最终所求的RDD。
  • RDD图与执行步骤的对应关系并不一定是一一对应的,比如,当调度器进行流水线执行,把多个RDD合并到一个步骤时。
  • 当RDD不需要混洗数据就可以从父节点计算出来时,调度器就会自动进行流水线执行。
  • 除了流水线执行的优化,当一个RDD已经缓存在集群内存或磁盘上时,Spark的内部调度器也会自动截短RDD谱系图。在这种情况下,Spark会"短路"求值,直接基于缓存下来的RDD进行计算。

2.作业

  • 特定的行动操作所生成的步骤的集合被称为一个作业。我们通过count()之类的方法触发行动操作,创建出由一个或多个步骤组成的作业。

3.任务

  • 一个物理步骤会启动很多任务,每个任务都是在不同数据分区上做同样的事情。
  • 任务内部的流程:
    • 从数据存储(如果该RDD是一个输入RDD)或已有RDD(如果该步骤是基于已经缓存的数据)或数据混洗的输出中获取输入数据。
    • 执行必要的操作来计算出这些操作所代表的RDD。例如,对输入数据执行filter()和map()函数,或者进行分组或规约操作。
    • 把输出写到一个数据混洗文件中,写入外部存储,或者是发回驱动器程序(如果最终RDD调用的是类似count()这样的行动操作)

查找信息

Spark网页用户界面

  • 1.作业页面:步骤与任务的进度和指标,以及更多的内容
  • 2.存储页面:已缓存的RDD的信息
  • 3.执行器页面:应用中的执行器进程列表
  • 4.环境页面:用来调试Spark配置项

三、关键性能考量

1.并行度

  • 当Spark调度并运行任务时,Spark会为每个分区中的数据创建出一个任务。该任务在默认情况下会需要集群中的一个计算核心来执行。
  • Spark也会针对RDD直接推断出合适的并行度,这对于大多数用例来说已经足够了。
  • 输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD会为数据在HDFS上的每个文件区块创建一个分区。

spark提供两种方法来对操作的并行度进行调优。

  • 第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度。
  • 第二种方法是对任何已有的RDD,可以进行重新分区来获得更多或更少的分区数。
    • 重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目。
    • 如果你确定要减少RDD分区,可以使用coalesce()操作。由于没有打乱数据,该操作比repartition()更为高效。
    • 如果你认为当前的并行度过高或者过低,可以利用这些方法对数据分布进行重新调整。
    • 例:
1.假设我们读取了大量数据,然后马上进行filter()操作筛选掉数据集中的绝大部分数据。
2.默认情况下,filter返回的RDD的分区数和其父节点一样,这样可能会产生很多空的分区或者只有很少数据的分区。在这样的情况下,可以通过合并得到分区更少的RDD来提高应用性能。
3.样例代码请参考136页例8-11

复制

2.序列化格式

  • 默认情况下,Spark会使用Java内建的序列化库。
  • Spark也支持使用第三方序列化库Kryo,可以提供比Java的序列化工具更短的序列化时间和更高压缩比的二进制表示,但不能直接序列化全部类型的对象。几乎所有的应用都在迁移到Kryo后获得了更好的性能。
  • 列化库Kryo的使用
    • 要使用Kryo序列化工具,你需要设置spark.serializer为org.apache.spark.serializer.KryoSerializer。
    • 为了获得最佳性能,还应该向Kryo注册想要序列化的类,如下所示
val conf = new SparkCond()
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//严格要求注册类
conf.set("spark.kryo.registrationRequired","true")
conf.registerKryoClasses(Array(classOf[MyClass],classOf[MyOtherClass]))

1.注册类可以让Kryo避免把每个对象的完整类名写下来,成千上万条记录累计节省的空间相当可观。
2.如果你想强制要求这种注册,可以把spark.kryo.registrationRequired设置为true,这样Kryo会在遇到未注册的类时抛出错误。
3.不论是选用Kryo还是Java序列化,如果代码中引用了一个没有扩展Java的Serializable接口的类,你都会遇到NotSerializableException。
这种情况下,要查出引发问题的类时比较困难的,因为用户代码会引用到许许多多不同的类。
很多JVM都支持通过一个特别的选项来帮助调试这一情况:"-Dsun.io.serialization.extended DebugInfo=true"
你可以通过设置spark-submit的--driver-java-options和--executor-java-options标记来打开这个选项。

复制

3.内存管理

在各个执行器进程中,内存的用途

  • RDD存储
    • 当调用RDD的persist()或cache()方法时,这个RDD的分区会被存储到缓存区中。
    • Spark会根据spark.storage.memoryFraction限制用来缓存的内存占整个JVM堆空间的比例大小。如果超出限制,旧的分区数据会被移出内存。
  • 数据混洗与聚合的缓存区
    • 当进行数据混洗操作时,Spark会创建出一些中间缓存区来存储数据混洗的输出数据。
    • Spark会尝试根据spark.shuffle.memoryFraction限定这种缓存区内存占总内存的比例。
  • 用户代码
    • Spark可以执行任意用户代码,所以用户的函数可以自行申请大量内存。
    • 例如,如果一个用户应用分配了巨大的数据或其他对象,那这些都会占用总的内存。
    • 用户代码可以访问JVM堆空间中除分配给RDD存储和数据混洗存储以外的全部剩余空间。
  • 默认情况下,Spark会使用60%的空间存储RDD,20%存储数据混洗操作产生的数据,剩下20%留给用户程序。
  • 用户可以自行调节这些选项来追求更好的性能表现。
    • 如果用户代码中分配了大量的对象,那么降低RDD存储和数据混洗存储所占用的空间可以有效避免程序内存不足的情况。

4.除了调整内存各区域比例,我们还可以为一些工作负载改进缓存行为的某些要素。

  • Spark默认的cache()操作会以MEMORY_ONLY的存储等级持久化数据。
    • 这意味着如果缓存新的RDD分区时空间不够,旧的分区就会直接删除。当用到这些分区数据时,再进行重算。
  • 所以有时以MEMORY_AND_DISK的存储等级调用persist()方法会获得更好的效果。
    • 因为在这种存储等级下,内存中放不下的旧分区会被写入磁盘,当再次需要用到的时候在从磁盘上读取回来。这样的代价有可能比重算各分区要低很多,也可以带来更稳定的性能表现。
  • 对默认缓存策略的另一个改进是缓存序列化后的对象而非直接缓存。
    • 我们可以通过MEMORY_ONLY_SER或者MEMORY_AND_DISK_SER的存储等级来实现这一点。
    • 缓存序列化后的对象会使缓存过程变慢,因为序列化对象也会消耗一些代价,不过这可以显著减少JVM的垃圾回收时间,因为很多独立的记录现在可以作为单个序列化的缓存而存储。
    • 垃圾回收的代价与堆里的对象数目相关,而不是和数据的字节数相关。这种缓存方式会把大量对象序列化为一个巨大的缓存区对象。
    • 如果你需要以对象的形式缓存大量数据(比如数GB的数据),或者是注意到了长时间的垃圾回收暂停,可以考虑配置这个选项。

5.硬件供给

  • 切记,"越多越好"的原则在设置执行器节点内存时并不一定适用。使用巨大的对空间可能会导致垃圾回收的长时间暂停,从而严重影响Spark作业的吞吐量。
  • Mesos和YARN本身就已经支持在同一个物理主机上运行多个较小的执行器实例,所以使用较小内存的执行器实例不代表应用所使用的总资源一定会减少。


文章转载自程序猿小P,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论