暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【大数据开发】Flink之广播变量概念及实例(十四)

数据信息化 2020-06-27
1562

点击蓝字 关注我们


Flink之广播变量概念及实例(十四)

01

Flink广播变量概念及应用场景

■ 广播变量

    广播变量是分布式计算框架中经常会用到的一种数据共享方式,目的是对小数据集采用网络传输的方式,在每个并行的计算节点的实例内存中存储一份该数据集,所在的计算节点实例均可以在本地内存中直接读取被广播的数据集,这样能够避免在数据计算过程中多次通过远程的方式从其他节点中读取小数据集,从而提升整体任务的计算性能。


注意:

  1. 广播出去的变量是存在于每个节点的内存中的,所以,这个数据量不宜太大,因为广播出去的数据会常驻内存,除非程序停止。

  2. 广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

  3. 建议在数据集大几十兆或者几百兆的大小的时候进行广播,如果数据上G,就不建议采用广播变量。



02

Flink广播变量案例实战

■ 广播:广播变量通过

withBroadcastSet(DataSet,Strihg)方法进行注册。


■访问:可通过

getRuntimeContext().getBroadcastVariable(String)对变量进行访问。


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;


import java.util.List;


public class BroadcastExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();


//需要广播的变量
DataSet<empPOJO> broadcastDataSet = env
.fromElements(new empPOJO("100","leo",10000));


        DataSet<empPOJO> dataSource = env.readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/employee.csv")
.pojoType(empPOJO.class,"dept_id","emp_name","emp_salary");


DataSet<empPOJO> result = dataSource.filter(new RichFilterFunction<empPOJO>() {
List<empPOJO> broadList = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);


//对广播变量进行访问
broadList = getRuntimeContext().getBroadcastVariable("broadcastDataSet");
}


@Override
public boolean filter(empPOJO empPOJO) throws Exception {


//循环判断广播变量值是否与源数据相同,并剔除相同值
for(empPOJO broad : broadList){
return !broad.emp_name.equals(empPOJO.emp_name);
}
return false;
}
//进行注册
}).withBroadcastSet(broadcastDataSet,"broadcastDataSet");


DataSet<Tuple3<String,String,Integer>> dataresult = result.map(new MapFunction<empPOJO, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(empPOJO empPOJO) throws Exception {
return new Tuple3<>(empPOJO.dept_id,empPOJO.emp_name,empPOJO.emp_salary);
}
});


dataresult.print();
}


public static class empPOJO {
public String dept_id;
public String emp_name;
public int emp_salary;


public empPOJO(String dept_id, String emp_name, int emp_salary) {
this.dept_id = dept_id;
this.emp_name = emp_name;
this.emp_salary = emp_salary;
}


public empPOJO() {
}


@Override
public String toString() {
return "empPOJO{" +
"dept_id='" + dept_id + '\'' +
", emp_name='" + emp_name + '\'' +
", emp_salary=" + emp_salary +
'}';
}
}
}

扫描二维码

关注我们

微信号 : BIGDT_IN 


文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论