暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark #2 RDD编程(下)

别动我的月亮啊 2021-04-22
128


  • 键值对操作

  • Transformations操作

  • Actions操作

  • Shuffle操作(洗牌)

    • 性能影响

  • RDD持久化

    • 如何选择存储级别

    • 溢出数据

  • Shared Variables共享变量

    • Broadcast Variables广播变量

    • Accumulators累加器


RDD操作

键值对操作

虽然大多数的Spark操作可以在包含任何类型的RDD上运行,但有一些特殊操作仅在键值对的RDD上可用。最常见的是shuffle
操作,例如根据键对元素进行分组和聚合。

在Scala中,这些操作可以顺其自然的在包含Tuple2
对象的RDD上操作

这个Tuple2是只包含两个元素的Tuple对象。可以简单的理解成C++中的pair。

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

复制

Transformations操作

Actions操作

上面两个操作前文多次提及这里不再赘述

值得一提的是,Spark RDD API还公开了一些操作的异步版本。它们会立刻返回给调用者而不是在操作完成之后阻塞

Shuffle操作(洗牌)

Spark某些操作会触发一个叫做shuffle(洗牌)的事件。shuffle是Spark中在分区之间进行分组的一种机制

由于这些操作常常涉及到执行器和机器之间的数据拷贝,shuffle操作是复杂且费时的

为了理解shuffle发生了些什么,我们以reduceByKey
为例子进行考虑。reduceByKey
操作会产生一个新的RDD——由同一种key而被合并到同一个元组的value集合。并非单个键的所有值都在同一分区,同一机器,但必须将它们放在同一位置以计算结果

在Spark中,为了一些特定的操作,数据通常不会跨分区分布,而是在一些特定的位置。在计算时,一个任务只会操作一个分区。因此,要组织单个reduceByKey
任务要执行的所有数据,Spark需要进行一个all to all
操作。需要读取所有key的所有value,然后将所有分区的值集合起来计算每个键的最终结果,这个操作就是shuffle

尽管shuffled产生的各个分区中的元素集合是确定的,然而分区之间、分区内的元素之间的顺序并不是确定的。如果希望能够得到确定的顺序,可以像下面这样用

  • mapPartitions
    来进行对分区进行排序
  • repartitionAndSortWithinPartitions
    在重新分区的同时有效地进行分区内部地排序
  • sortBy
    使RDD全局有序

能够导致shuffle操作的有repartition
coalesce
形成的repartition
groupByKey
reduceByKey
形成的ByKey
操作,以及cogroup
join
这样的join
操作

性能影响

由于涉及到磁盘IO和网络IO,Shuffle的开销是非常大的。为了处理Shuffle中的数据,Spark生成一组任务——map任务来处理数据,并且又采用一组reduce任务来统计它们。这个术语来自MapReduce
,和Spark自己的map
reduce
操作没什么关系

在内部,若非内存空间不足,每个map中的结果会一直保存到内存中。然后将对分区里面的数据进行排序并写入文件当中。在reduce
阶段,将会读取这些已经排序好的块。因为shuffle阶段是在内存中对数据进行记录,因此其操作会占用大量的内存空间。特别的,reduceBByKey
aggregateByKey
是在map阶段创建内存中的数据结构,ByKey
会在reduce阶段创建数据结构。当内存中无法存放数据时,Spark将会将数据溢出到磁盘中。故会产生一些额外的磁盘IO以及垃圾收集的开销。

Shuffle也会在磁盘上产生大量的中间文件。在Spark1.3中,这些文件会一直存在直到对应的RDD不再使用而被垃圾收集。如果程序一直保留着RDDs的引用或者GC频率很低,可能会让垃圾收集很久之后才会发生。这也就意味着,一个Spark程序运行一段时间之后,会占用大量的磁盘空间。

临时存储目录在配置Spark Context时由spark.local.dir
参数指定。

Shuffle行为也可以通过配置参数来进行调整。

RDD持久化

相比于Hadoop,Spark可以让操作之间的数据集持久化存储。当持久化了一个RDD时,每个节点将会存储所有的分区的计算结果,并在其他对这个数据集的actions操作时重用这些数据。缓存是用于迭代算法和快速交互的关键工具

用户可以使用persist()
或者cache()
方法标记一个RDD
,让其在内存中缓存。当其第一次在action操作中计算时,会被保留在内存中。Spark’s cache是容错的——如果RDD中的一个分区丢失了,就会自动从创建它的transformation
操作中重新计算

除此之外,被缓存的RDD有不同的存储级别。允许用户控制存储,例如存储在磁盘、存储在内存、复制到其他节点上。这些级别可以通过传递StorageLevel
对象给persist()
方法来控制。cache
方法只有默认级别(StorageLevel.MEMORY_ONLY
, 在内存中存储序列化对象)的存储级别。

完整的存储级别如下:

存储级别描述
MEMORY_ONLY
将RDD以反序列化的Java对象存入JVM中,如果内存中存储空间不够,一些分区将无法存储只能在它们被调用的时候重新计算,这是默认的存储级别
MEMORY_AND_DISK
将RDD以反序列化的Java对象存入JVM中,如果内存不足,将其存入磁盘,并在需要时从磁盘调用
MEMORY_ONLY_SER
将RDD以序列化的Java对象的方式存储(每个分区存储一个字节数组)。相比于反序列的方式,这往往能够有效地利用空间,不过在读取时,CPU负担更重
MEMORY_AND_DISK_SER
和上面类似。不过是将内存存储不下的分区溢出到磁盘,而不是每次需要时重新计算
DISK_ONLY
只在磁盘内存储RDD
MEMORY_ONLY_2

MEMORY_AND_DISK_2

etc
和上面的类似,不过会在两个集群节点中复制所有的分区
OFF_HEAD(experimental)
MEMORY_ONLY_SER
类似,但是将数据存储在堆外存储器中。这需要开启堆外内存

注意:在Python中,存储对象始终使用Pickle
库进行序列化,因此它不在意是否选择序列化的方式。

Spark默认缓存Shuffle操作的中间结果(即便用户没有调用persist方法)。这是为了防止在Shuffle阶段节点故障,而导致重新计算输入。

如何选择存储级别

Spark存储级别的不同意味着内存空间和CPU性能之间的权衡。我们一般基于下面的步骤考虑来选择使用哪一个存储级别:

  • 如果由足够的空间存储RDD,就使用MEMORY_ONLY
    的存储级别。这是最有效利用CPU的选择
  • 如果没有足够的空间,尝试用MEMORY_ONLY_SER
    的隔离级别,并选择一个速度够快的序列化库,对于Java或者Scala来说是一种非常合适的途径
  • 除非计算数据集的代价非常高或者数据太多,否则不要将数据溢出到磁盘。重新计算可能比从磁盘中读取的速度更快
  • 如果希望集群容错性更强可以选择将数据复制到多个节点。用户可以从其他节点中得到数据,而不是重新计算

溢出数据

Spark监控缓存,并使用LRU策略将久的数据分区移出。如果希望手动移出RDD,可以使用RDD.unpersist
方法。注意这个方法默认不阻塞。如果要阻塞直到资源被释放,在调用方法时设置blocking=true

Shared Variables共享变量

通常来说,当一个方法传递给Spark集群的远程节点中执行时(类似map或者reduce方法)。它会在这个函数使用的所有变量的副本上面执行。这些变量复制到每一台机器中,并且远程工作的机器不会将更改过的变量带给驱动程序。跨任务支持通用的读写共享变量将是低效的。不过Spark提供了两种有效的共享变量:broadcast Variables
accumulators

Broadcast Variables广播变量

广播变量允许程序在每一台机器中缓存一个只读的共享变量。使用其可以为每个节点提供一个大型输入数据集的副本。Spark还尝试使用高效广播算法分发广播变量,以降低通信成本。

Spark Action在一组由Shuffle操作分割出的阶段执行。Spark自动每个阶段中间,广播任务需要的常见的数据。这些数据以序列化缓存在内存中,并在任务执行前反序列化。这也意味着这些广播变量只有在各个阶段任务需要相同的变量或者反向形式中缓存数据时有用。

广播变量通过调用SparkContext.broadcast
方法创建。通过调用广播变量的value
方法可以获取值

scala> val broadcastVar = sc.broadcast(Array(123))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(123)

复制

为了保证每个节点获取的广播变量是一致的,广播对象一旦创建不能修改。如果想要释放变量占用的资源——调用unpersist
方法。如果广播之后被再次使用,它将重新广播。如果想要永久的删除变量,调用destroy
方法。这些方法默认都是不会阻塞的,可以在调用方法时传入blocking=true
,来阻塞资源。

Accumulators累加器

累加器时只能进行加法的一种变量。可以用作计数器或者是求和。

用户可以通过SparkContext.longAccumulator
或者SparkContext.doubleAccumulator
创建一个Long或者Double类型的累加器。任务可以对这个累加器进行加法操作。然而,它们是不能读取这个累加器的值的。只有驱动程序才能使用value
方法得到值。

scala> val accum = sc.longAccumulator("My Accumulator")
scala> sc.parallelize(Array(1234)).foreach(x => accum.add(x))
scala> accum.value
res2: Long = 10

复制


文章转载自别动我的月亮啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论