
点击蓝字 关注我们

Flink全局参数案例实战(十七)

01
全局参数及案例实战
Flink允许将自定义配置值传递到ExecutionConfig,以供自定义配置全局可用。
注意:
GlobalConfig与params的区别在于,后者只是在局部代码中设置的Config并没有影响到env,所以在代码的后面要加上withParameters (config);但是GlobalConfig不同,它改变了env的环境里面的config。所以,在代码后面,不需要显式的指定加上config。
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import scala.Tuple3;
public class GloballyParameters {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
configuration.setInteger("limit",10000);
//设置全局变量
env.getConfig().setGlobalJobParameters(configuration);
DataSet<empPOJO> dataSource = env.readCsvFile("file:///Users/zhangjingyu/Desktopflink/src/main/resources/employee.csv")
.pojoType(empPOJO.class,"dept_id","emp_name","emp_salary");
DataSet<empPOJO> dataresult = dataSource.filter(new RichFilterFunction<empPOJO>() {
int limit = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//引入全局参数
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
Configuration globalConfig = (Configuration) globalJobParameters;
limit = globalConfig.getInteger("limit",0);
}
@Override
public boolean filter(empPOJO empPOJO) throws Exception {
return empPOJO.emp_salary > limit;
}
});
DataSet<Tuple3<String,String,Integer>> result = dataresult.map(new MapFunction<empPOJO, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(empPOJO empPOJO) throws Exception {
return new Tuple3<>(empPOJO.dept_id,empPOJO.emp_name,empPOJO.emp_salary);
}
});
result.print();
}
public static class empPOJO {
public String dept_id;
public String emp_name;
public int emp_salary;
public empPOJO(String dept_id, String emp_name, int emp_salary) {
this.dept_id = dept_id;
this.emp_name = emp_name;
this.emp_salary = emp_salary;
}
public empPOJO() {
}
@Override
public String toString() {
return "empPOJO{" +
"dept_id='" + dept_id + '\'' +
", emp_name='" + emp_name + '\'' +
", emp_salary=" + emp_salary +
'}';
}
}
}
复制
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
public class GloballyParameters1 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
configuration.setString("limit","leo");
//使用全局变量
env.getConfig().setGlobalJobParameters(configuration);
DataSet<empPOJO> dataSource = env.readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/employee.csv")
.pojoType(empPOJO.class,"dept_id","emp_name","emp_salary");
DataSet<empPOJO> datasult = dataSource.filter(new MyFilter());
DataSet<Tuple3<String,String,Integer>> result = datasult.map(new MapFunction<empPOJO, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(empPOJO empPOJO) throws Exception {
return new Tuple3<>(empPOJO.dept_id,empPOJO.emp_name,empPOJO.emp_salary);
}
});
result.print();
}
public static class MyFilter extends RichFilterFunction<empPOJO>{
String limit = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//引入全局变量
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
Configuration globalConfig = (Configuration) globalJobParameters;
//设施初始值
limit = globalConfig.getString("limit","leo");
}
@Override
public boolean filter(empPOJO empPOJO) throws Exception {
return !empPOJO.emp_name.equals(limit);
}
}
public static class empPOJO {
public String dept_id;
public String emp_name;
public int emp_salary;
public empPOJO(String dept_id, String emp_name, int emp_salary) {
this.dept_id = dept_id;
this.emp_name = emp_name;
this.emp_salary = emp_salary;
}
public empPOJO() {
}
@Override
public String toString() {
return "empPOJO{" +
"dept_id='" + dept_id + '\'' +
", emp_name='" + emp_name + '\'' +
", emp_salary=" + emp_salary +
'}';
}
}
}
复制


扫描二维码
关注我们
微信号 : BIGDT_IN
文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
金仓数据库26套!宁波市司法局信息系统适配改造(一期)采购项目
天下观查
321次阅读
2025-03-21 10:33:59
达梦数据与法本信息签署战略合作协议
达梦数据
294次阅读
2025-03-06 09:26:57
国产化+性能王炸!这套国产方案让 3.5T 数据 5 小时“无感搬家”
YMatrix
279次阅读
2025-03-13 09:51:26
大连农商40万,采购Greenplum数据库原厂订阅服务
天下观查
278次阅读
2025-03-13 09:52:29
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
SelectDB
185次阅读
2025-03-03 11:23:24
国产数据库高光时刻!天翼云TeleDB荣登TPC-DS全球测评总榜第二
天翼云开发者社区
184次阅读
2025-03-13 17:24:48
神州数码携手云原生数据库 PolarDB,共筑国产数据库新生态
神州数码集团
173次阅读
2025-03-03 18:04:27
DBAIOPS社区将在知衍平台上推出数据库运维智能体
白鳝的洞穴
172次阅读
2025-03-07 10:29:18
为什么总是很难客观评价某个国产数据库产品
白鳝的洞穴
157次阅读
2025-03-19 11:21:09
史诗级革新 | Apache Flink 2.0 正式发布
严少安
153次阅读
2025-03-25 00:55:05