暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MapReduce-原理剖析之shuffle(下)

码农大腿哥 2021-03-11
229
前言:

分区的逻辑是什么?环形缓冲区的结构又是什么样?combiner隐藏着那些玄机?

这一切的背后,是人性的扭曲还是道德的沦丧,是玄学的操纵还是科技的支配

请收看大腿大数据之《走进shuffle-下》

本篇目标:

掌握分区、环形缓冲区、combiner的运行原理



01

分区

(Key , Value)格式数据进入环形缓冲区,会先进行一个分区的操作,得到每个数据的分区值,然后以(Key , Value , Partitions)形式返回

同一个分区的数据最终会发到同一个ReduceTask中进行聚合。所以分区的作用就是将数据分类划分,然后交给不同的ReduceTask去执行

1.默认分区

默认分区的逻辑是:用key的hashCode对numReduceTasks取余,这样每个key值就分到一个区号
 
numReduceTasks是设置的ReduceTasks的数量,默认为1,也可以手动设置

分区代码如下:

    public class HashPartitioner<K,Vextends Partitioner<K,V>{
      public int getPartition(K key , V value , int numReduceTasks ){  
        return  (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks ;
      }
    }
    复制

    2.自定义分区

    采用默认的分区模式,无法控制哪些key会被分到同一个分区,所以我们可以自定义分区逻辑

    不过自定义分区开发中很少用到,这里就不深入讲了



    02

    环形缓冲区

    1.环形缓冲区是什么

    每个MapTask都会在内存开启一个环形缓冲区,用于存储map任务的输出。环形缓冲区是一种数据结构,本质是个字节数组,又叫做KvBuffer

    环形缓冲区默认大小100MB,一旦达到阈值0.8,也就是80MB,一个后台线程就把数据溢写到磁盘

    环形缓冲区的大小可以通过io.sort.mb属性进行设置

    2.运行机制

    1)数据写入

    数据写入环形缓冲区时,不仅写入数据,还会写入元(索引)数据,放置元数据的区域被称为Kvmeta

    数据区域和元数据区域在KvBuffer中是相邻但不重叠的两个区域,通过一个分界点来划分。分界点是会变的,每次溢写之后都会更新一次


    数据包括两部分内容:

    key: 数据的偏移量
    value:数据内容

    元数据包括四部分内容:

    index:序号
    partition:数据的分区值
    keyStart:内存中key的起始位置
    valueStart:内存中value起始位置


    每读取一条数据,就会在数组里插入一条数据,另一方向写一条元数据。然后一直不停向下写

    2)反向写入


    数据一直往环形缓冲区里写,迟早会写满

    所以,当环形缓冲区写满80%时,将剩余的20%的中间设置为新的分界点。数据从该点继续往回写


    同时,环形缓冲区开始往磁盘溢写数据

    3)溢写

    环形缓冲区大小虽然可以通过参数设置,但数据和元数据一直在写,总会有写满的时候。这时就要把数据从内存刷到磁盘上再接着往内存写数据

    写磁盘时先创建一个名称类似"spill3.out"的临时文件,把数据写到这个文件中

    然后再将溢写数据的元信息写到内存数据结构SpillRecord中,元信息记录了临时文件的:partition的起始位置、原始数据大小和压缩后数据大小。如果元信息超过1MB,则写入"spill3.out.index"的磁盘文件中

    4)排序

    溢写之前要对数据进行排序,先根据环形缓冲区中数据的partition编号排序,然后按照key进行排序两个关键字按字典序排序

    排序移动的只是元数据,只要修改元数据记录的key起始位置和value起始位置即可实现排序

    排序结果是数据按照partition为单位聚集在一起,同一partition内的按照key有序



    03

    combiner

    combiner又称为预聚合,它是可选的,也可以不使用

    1.combiner作用

    像WordCount这样的需求,数据在MapTask端排好顺序,然后在ReduceTask端统一进行聚合,如下图:



    在这种情况下 "(a , 1)" 要多次传输到ReduceTask进行聚合。为了提高效率,可以提前在MapTask里做一次聚合,再进行传输


    如上图,只要传输2次 "(a , 3)" ,就可以最终实现同样的结果

    Combiner的作用就是对每个MapTask的数据进行局部汇总,减少网络传输,提高程序效率

    2.combiner作用时机

    combiner在MapTask中有两次执行的时机:

    第一次是环形缓冲区数据溢写前。数据分区排序后要溢写到磁盘,此时会进行一次combiner

    第二次是环形缓冲区溢写文件合并时。同一个分区的多个溢写文件需要进行合并,如果溢写文件数量>=3个,则合并时会再次运行Combiner(文件少于3个不会触发combiner,因为数据量少,启动Combiner不划算)

    combiner在ReduceTask中merge数据时也会执行,前提是在程序中开启了combiner

    3.combiner使用限制

    并非所有情况都可以用Combiner,如求平均值的时候,使用Combiner机制可能会导致数据错误


    如上图,如果在求平均值时提前使用combiner就会导致结果错误



    至此,我们已经把shuffle机制梳理清楚了。shuffle虽然复杂,但是仔细体会这其中的原理与机制后,不由得让人感叹:

    这迷人的shuffle,真TM精辟

    你学废了吗?

    — END  —


    文章转载自码农大腿哥,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论