Flink暴露了所有udf函数的接口,实现方式为接口或者抽象类。
实现MapFunction接口示例:
实现温度传感器实例转换成(传感器Id-温度)字符串描述。
自定义MapFunction类
public class CustomMapFunction implements MapFunction<SensorReading,String> {@Overridepublic String map(SensorReading sensorReading) throws Exception {return sensorReading.sensorId+"-"+sensorReading.temperature;}}
转换实现
// 创建流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据String inputPath = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt";// 获取数据DataStreamSource<String> dataStream = env.readTextFile(inputPath);// 1、先转换成SensorReading类型(简单转换操作)DataStream<SensorReading> stream1 = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String data) throws Exception {String[] arr = data.split(",");return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString()));}});// 调用自定义CustomMapFunction类的,转换输出SingleOutputStreamOperator<String> dataStreamMap = stream1.map(new CustomMapFunction());dataStreamMap.print("CustomFilterFunction");env.execute("Function test");
在idea 执行main 运行效果

实现FilterFunction接口示例:
实现过滤温度大于30度的温度传感器。
自定义FilterFunction类
public class CustomFilterFunction implements FilterFunction<SensorReading> {@Overridepublic boolean filter(SensorReading sensorReading) throws Exception {return sensorReading.temperature>30.0;}}
过滤实现
// 创建流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据String inputPath = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt";// 获取数据DataStreamSource<String> dataStream = env.readTextFile(inputPath);// 1、先转换成SensorReading类型(简单转换操作)DataStream<SensorReading> stream1 = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String data) throws Exception {String[] arr = data.split(",");return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString()));}});// 调用自定义CustomFilterFunction类的,实现过滤DataStream<SensorReading> dataStreamFilter = stream1.filter(new CustomFilterFunction());dataStreamFilter .print("CustomFilterFunction");env.execute("Function test");
执行main 运行效果

文章转载自大数据技术天涯,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




