Flink常用算子之map、filter和flatMap使用方法示例
Flink实现UDF函数之FilterFunction、MapFunction函数
本文将对Flink Transformation中keyBy、reduce算子进行介绍,并结合例子展示具体使用方法
一、keyBy算子
keyBy算子根据事件的某种属性或数据的某个字段做为Key进行分组,相同Key的元素被分到了一起,进行后续算子统一处理。
示例如下:
功能描述:根据传感器id进行分组
SingleOutputStreamOperator<SensorReading> aggStream =
stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
});
二、aggregation聚合算子
aggregation聚合算子常用有sum、min、max 这些算子需要指定按照哪个字段(一个参数)进行聚合。
准备一组数据:
sensor_6,1608112830l,15.4
sensor_7,1608112837l,6.7
sensor_10,1608112842l,38.1
sensor_1,1608112731l,35.8
sensor_1,1608112851l,32
sensor_1,1608112731l,36.2
sensor_1,1608112837l,30.9
sensor_1,1608112842l,29.7
示例如下:
1、功能描述:使用min算子,根据id进行分组聚合,输出每个传感器温度最小值
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).min("temperature");
aggStream.print("aggStream");
输出结果:
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
min算子对temperature字段求最小值,并将结果保存在temperature字段上。对于其他字段(如:timeStamp字段),该操作并不能保证其数值。
2、功能描述:使用minBy算子,根据id进行分组聚合,输出每个传感器温度最小值
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).minBy("temperature");
aggStream.print("aggStream");
输出结果:
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
minBy与min的区别在于,minBy同时保留其他字段(如:timeStamp字段)的数值,min算子对temperature字段求最小值,minBy返回具有最小值的元素。
max、maxBy算子与类似min、minBy 相反过程,这里略示例
3、功能描述:使用sum算子,对temperature字段进行加和,并将结果保存在temperature字段上
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).sum("temperature");
aggStream.print("aggStream");
输出结果:
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=35.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=67.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=98.69999999999999}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=128.39999999999998}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=164.59999999999997}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
三、reduce算子
reduce算子是按照一个字段分组的数据流上,接受两个输入,生成一个输出,生成一个同类型的新元素。(即:合并当前的元素 和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是 只返回最后一次聚合的最终结果)
示例如下:
1、功能描述:根据id进行分组聚合,输出每个传感器温度最小值,以及最近的时间戳
DataStream<SensorReading> resultStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading t0, SensorReading t1) throws Exception {
if(t0.temperature.compareTo(t1.temperature)==1){
return new SensorReading(t1.sensorId,t1.timeStamp,t1.temperature);
}
return t0;
}
});
输出结果:
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
resultReduceStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
到此keyBy、aggregation、reduce 算子示例介绍完成
如果觉得文章能帮到您,欢迎关注,共同进步!
持续分享大数据、人工智能等科技类原创文章。