暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink实现UDF函数之FilterFunction、MapFunction函数

大数据技术天涯 2020-12-25
632

Flink暴露了所有udf函数的接口,实现方式为接口或者抽象类。

实现MapFunction接口示例:

实现温度传感器实例转换成(传感器Id-温度)字符串描述。

自定义MapFunction类

public class CustomMapFunction implements MapFunction<SensorReading,String> {
@Override
public String map(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId+"-"+sensorReading.temperature;
}
}

转换实现

 // 创建流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// 从文件中读取数据
String inputPath = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
// 获取数据
DataStreamSource<String> dataStream = env.readTextFile(inputPath);


// 1、先转换成SensorReading类型(简单转换操作)
DataStream<SensorReading> stream1 = dataStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String data) throws Exception {
String[] arr = data.split(",");
return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString()));
}
});


// 调用自定义CustomMapFunction类的,转换输出
SingleOutputStreamOperator<String> dataStreamMap = stream1.map(new CustomMapFunction());


dataStreamMap.print("CustomFilterFunction");

env.execute("Function test");

在idea 执行main 运行效果

实现FilterFunction接口示例:

实现过滤温度大于30度的温度传感器。

自定义FilterFunction类

public class CustomFilterFunction implements FilterFunction<SensorReading> {
@Override
public boolean filter(SensorReading sensorReading) throws Exception {
return sensorReading.temperature>30.0;
}
}

过滤实现

 // 创建流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// 从文件中读取数据
String inputPath = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
// 获取数据
DataStreamSource<String> dataStream = env.readTextFile(inputPath);


// 1、先转换成SensorReading类型(简单转换操作)
DataStream<SensorReading> stream1 = dataStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String data) throws Exception {
String[] arr = data.split(",");
return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString()));
}
});


// 调用自定义CustomFilterFunction类的,实现过滤
DataStream<SensorReading> dataStreamFilter = stream1.filter(new CustomFilterFunction());


dataStreamFilter .print("CustomFilterFunction");

env.execute("Function test");

执行main 运行效果


文章转载自大数据技术天涯,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论