暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark Core-shuffle

码农大腿哥 2021-03-15
1238
前言:

shuffle作为分布式计算引擎的难点,始终是绕不过去的坎

Spark shuffle和Map Reduce shuffle有着异曲同工之妙,它们作用一致,只是具体的实现细节上略有不同

本篇目标:

1.掌握shuffle的执行过程
2.理解一种shuffle优化机制-bypass



01

Spark shuffle介绍

我们知道Spark是基于RDD进行计算的,RDD里有分区。一般情况下,前后两个RDD的分区数据是一对一进行变换,也就是发生窄依赖

如果前一个RDD的分区数据被下游RDD多个分区共享,发生宽依赖,这个过程就叫做shuffle


1.ShuffleManager


在Spark中负责shuffle过程的执行、计算和处理的主要组件是ShuffleManager,即shuffle管理器

Spark曾经有两种shuffle机制,也就是有两个ShuffleManager的实现类:

  • hashShuffleManager

  • sortShuffleManager


主要区别是前者在shuffle的过程中会生成大量溢写文件,小文件过多会影响性能

sortShuffleManager溢写后会将所有溢写文件合并成一个磁盘文件,这样下游的Task读取文件时,只要读取一个文件就可以了

由于hashShuffleManager已经被淘汰了,就不详细说了。本篇主要介绍sortShuffleManager模式的shuffle执行机制

2.sortShuffleManager

sortShuffleManager中有个方法getWriter(),该方法中会通过匹配创建不同的Writer写对象

  • UnsafeShuffleWriter

  • BypassMergeSortShuffleWriter

  • SortShuffleWriter


在不同的情况下,会创建不同的Writer

1)UnsafeShuffleWriter

使用有预聚合功能的算子 
序列化规则支持重定位操作(KRYO支持);
shuffle下游的分区数量小于或等于16777216

2)BypassMergeSortShuffleWriter

不能使用有预聚合功能的算子;
shuffle下游的分区数量小于等于200(可配);

3)SortShuffleWriter

其它情况下创建SortShuffleWriter

本篇主要介绍使用SortShuffleWriter写数据的机制




02

shuffle过程

Spark的过程可以分成两个部分,shuffle write和shuffle read。也就是前一个Shuffle Map Task写数据和后一个Result Task读数据的过程


1.shuffle Write

1)数据写入内存

数据先读取到内存中,根据shuffle算子种类的不同,采用不同的数据结构

聚合类算子:如reduceByKey,这种情况会采用Map数据结构,一边聚合一边写内存

普通算子:如join,这种情况会采用ArrayList结构,数据直接写入内存

2)排序-溢写磁盘

发生shuffle后,后一个分区要等前一个分区数据处理完毕后才能执行。那么前一个RDD的结果就要保存下来,如果数据都放在内存,内存肯定不够使

所以内存中的数据要落盘,并且shuffle的数据一定会落盘Spark写入磁盘文件是存放在本地磁盘的,而不是HDFS

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件,默认的batch数量是10000条

每写入一条数据就会判断是否达到临界值,达到了就将内存中的数据溢到磁盘,最后清空内存数据


3)排序-合并磁盘文件

溢写到磁盘的临时文件会有多个,溢写完成后,每个Task会将所有磁盘文件合并在一起(如果内存够,就不会产生临时磁盘文件存中的数据进行合并

同时,生成一个index文件,记录了每个分区的索引、start offset和end offset

合并数据时对数据进行排序采用的是归并排序算法

最终的结果是每个Executor产生一个磁盘文件,下游的Task根据index文件读取相应的分区数据

2.shuffle read

SortShuffleWriter中有个getWriter()方法,获取到BlockStoreShuffleReader读对象执行read()方法开始读取数据

1)获取磁盘位置信息

Map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到MapStatus对象中,然后发送给Driver进程

Result Task通过请求 Driver 端的 MapOutputTrackerMaster 询问输出的数据位置


所有的Map task执行完毕后,Result Task就掌握了所有的磁盘小文件的位置信息

2)拉取数据

当所有ShuffleMapTasks结束后开始拉取数据,Result Task默认启动5个子线程去拉取数据

刚获取来的FileSegment存放在softBuffer内存缓冲区,由于缓冲区默认48MB,所以一次拉取的数据量不能太大

如果内存空间不足时,将 records 进行 sort 后 spill(溢出)到磁盘上,等到需要它们的时候再进行归并

3)aggregate


数据可以一边拉取一边进行聚合处理,因为Spark shuffle不需要全局有序

Spark采用的是Map数据结构,每次拉取出来的数据就直接放入HashMap里。如果已经存在响应的Key,那么就可以直接进行aggregate

等到所有的records都进入Map结构,就得到最后的结果



03

BypassSortShuffle

bypass是Spark的一种优化机制,其优化的主要原理是:不会进行排序

bypass运行机制的触发条件如下:

  • shuffle Result Task的数量小于默认值200

  • 不是聚合类的shuffle算子


在bypass机制下,数据从内存溢写出来后也是要写入临时磁盘文件

但是数据是按个分区创建一个文件,然后将所有分区文件合并,创建一个单独的索引文件

在上面的shuffle write过程中,没有进行排序操作,也就节省了这部分的性能开销



你学废了吗?

— END  —


文章转载自码农大腿哥,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论