暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark2.x进阶:深入理解Spark中的闭包

大数据开发运维架构 2020-02-18
149

    在Spark的代码里,变量及函数的作用范围和声明周期在spark的集群运行模式下是比较难理解的,尤其是对初学者来说。这里的闭包问题跟在RDD的算子中操作作用域外部的变量有关。

    Spark中的闭包变量一般指,在算子作用域的外部声明,却在算子作用域内存操作和执行的变量。

    下面通过一个代码实例来帮助你更好的理解闭包问题,假如在Spark中想求一下5(1,2,3,4,5)个数的和sum(初始值为0),这里先贴下代码:

    package com.hadoop.ljs.spark220.study.closePackage;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import java.util.Arrays;
    import java.util.List;
    /**
    * @author: Created By lujisen
    * @company ChinaUnicom Software JiNan
    * @date: 2020-02-18 20:08
    * @version: v1.0
    * @description: com.hadoop.ljs.spark220.study.closePackage
    */
    public class SparkClosePackage {
        public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("SparkClosePackage").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
            List<Integer> numList2 = Arrays.asList(12345);
    final int[] sum = {0};
            JavaRDD<Integer> soureData =  sc.parallelize(numList2);
    soureData.foreach(new VoidFunction<Integer>() {
    @Override
    public void call(Integer value) throws Exception {
    sum[0] +=value;
    }
    });
    System.out.println("求和结果"+sum[0]);
    sc.close();
    }
    }
    复制

    程序的输出结果:

        结果是不是跟你想象的是不太一样,sum不是15  而是0。为什么呢?

    这里就涉及到了RDD的作用域问题,对于RDD的各个算子来说,作用域只是算子的内存代码,上面的代码却操作了作用域外的变量sum,据不同的编程语言的语法,这种功能是可以做到的,而这种现象就叫做闭包,闭包简单来说,就是操作的不属于一个作用域范围的变量。

        生产上一般我们都是提交Spark的任务到集群上执行,无论是standalone/yarn-client本地模式还是standalone/yarn-cluster集群模式,任务都是转化成task分批次发送到Worker节点的Executor中运行的,每一个批次的Task执行相同的代码,处理不同的数据,闭包变量在task执行之前,肯定是需要在driver端处理,然后被序列化成多个副本,每个副本都发送到各个executor进程中,以便后期task使用。

     

        这里干涩的讲不太容易听明白,这里我从结合一个图再详细说一下:

        

        

        这里你输入了数据(1,2,3,4,5),这里有变量sum=0,想通过foreach算子,求和保存到sum中,我们将工程打包,提交到集群运行,这里肯定生产一个driver进行运行咱们的main函数,序列化sum变量,拷贝多个序列化后的副本到两个Executor中,当运行到foreach这个算子的时候,分批次发送task到已分配的Executor中执行,每个都保存了一个sum副本,这里算完以后,每个Executor会计算出自己的结果:一个是6,一个是9;最后你在driver端去打印这个sum的时候,Executor对sum的操作,driver是完全感知不到的。

        因此综上所述,在你使用集群模式运行作业的时候,切忌不要在算子内部,对作用域外面的闭包变量进行改变其值的操作,因为那没有任何意义,算子仅仅会在executor进程中,改变变量副本的值,对于driver端的变量没有任何影响,我们也获取不到executor端的变量副本的值。

        如果希望在集群模式下,对某个driver端的变量,进行分布式并行的、全局性的修改,可以使用Spark提供的全局累加器(Accumulator),后面我们会讲解一个Accumulator的高级用法,自定义Accumulator,实现任意机制和算法的全局计算器。

    文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论