暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【大数据分析】Flink全局参数案例实战(十七)

数据信息化 2020-06-30
2938

点击蓝字 关注我们


Flink全局参数案例实战(十七)

01

全局参数及案例实战

    Flink允许将自定义配置值传递到ExecutionConfig,以供自定义配置全局可用。


注意:

    GlobalConfig与params的区别在于,后者只是在局部代码中设置的Config并没有影响到env,所以在代码的后面要加上withParameters (config);但是GlobalConfig不同,它改变了env的环境里面的config。所以,在代码后面,不需要显式的指定加上config。

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import scala.Tuple3;




public class GloballyParameters {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


Configuration configuration = new Configuration();


configuration.setInteger("limit",10000);


//设置全局变量
env.getConfig().setGlobalJobParameters(configuration);


        DataSet<empPOJO> dataSource = env.readCsvFile("file:///Users/zhangjingyu/Desktopflink/src/main/resources/employee.csv")
.pojoType(empPOJO.class,"dept_id","emp_name","emp_salary");


DataSet<empPOJO> dataresult = dataSource.filter(new RichFilterFunction<empPOJO>() {
int limit = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//引入全局参数
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
Configuration globalConfig = (Configuration) globalJobParameters;
limit = globalConfig.getInteger("limit",0);
}


@Override
public boolean filter(empPOJO empPOJO) throws Exception {
return empPOJO.emp_salary > limit;
}
});


DataSet<Tuple3<String,String,Integer>> result = dataresult.map(new MapFunction<empPOJO, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(empPOJO empPOJO) throws Exception {
return new Tuple3<>(empPOJO.dept_id,empPOJO.emp_name,empPOJO.emp_salary);
}
});
result.print();
}


public static class empPOJO {
public String dept_id;
public String emp_name;
public int emp_salary;


public empPOJO(String dept_id, String emp_name, int emp_salary) {
this.dept_id = dept_id;
this.emp_name = emp_name;
this.emp_salary = emp_salary;
}


public empPOJO() {
}


@Override
public String toString() {
return "empPOJO{" +
"dept_id='" + dept_id + '\'' +
", emp_name='" + emp_name + '\'' +
", emp_salary=" + emp_salary +
'}';
}
}
}
复制
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;


public class GloballyParameters1 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


Configuration configuration = new Configuration();


configuration.setString("limit","leo");


//使用全局变量
env.getConfig().setGlobalJobParameters(configuration);


        DataSet<empPOJO> dataSource = env.readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/employee.csv")
.pojoType(empPOJO.class,"dept_id","emp_name","emp_salary");


DataSet<empPOJO> datasult = dataSource.filter(new MyFilter());


DataSet<Tuple3<String,String,Integer>> result = datasult.map(new MapFunction<empPOJO, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(empPOJO empPOJO) throws Exception {
return new Tuple3<>(empPOJO.dept_id,empPOJO.emp_name,empPOJO.emp_salary);
}
});
result.print();




}
public static class MyFilter extends RichFilterFunction<empPOJO>{


String limit = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//引入全局变量
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
Configuration globalConfig = (Configuration) globalJobParameters;
//设施初始值
limit = globalConfig.getString("limit","leo");
}
@Override
public boolean filter(empPOJO empPOJO) throws Exception {
return !empPOJO.emp_name.equals(limit);
}




}




public static class empPOJO {
public String dept_id;
public String emp_name;
public int emp_salary;


public empPOJO(String dept_id, String emp_name, int emp_salary) {
this.dept_id = dept_id;
this.emp_name = emp_name;
this.emp_salary = emp_salary;
}


public empPOJO() {
}


@Override
public String toString() {
return "empPOJO{" +
"dept_id='" + dept_id + '\'' +
", emp_name='" + emp_name + '\'' +
", emp_salary=" + emp_salary +
'}';
}
}
}
复制

扫描二维码

关注我们

微信号 : BIGDT_IN 


文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论