Window Function
窗口函数

窗口是处理无限流的核心。Windows将流分成有限大小的“桶”,我们可以对其进行计算。今天我们来解密如何在Flink中执行窗口。
(1)
ReduceFunction
✦
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
(2)
AggregateFunction
✦
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
(3)
ProcessWindowFunction
✦
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunctionextends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}}
(4)
ProcessWindowFunction 和 ReduceFunction 组合 ✦
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}}
(5)
ProcessWindowFunction 和 AggregateFunction 组合 ✦
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate(), new MyProcessWindowFunction());// Function definitions/*** 累加器用于保存累加和和计数。* The {@code getResult} method* computes the average.*/private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}}private static class MyProcessWindowFunctionextends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {public void process(String key,Context context,Iterable<Double> averages,Collector<Tuple2<String, Double>> out) {Double average = averages.iterator().next();out.collect(new Tuple2<>(key, average));}}
(1)
ProcessFunction✦
public class MyProcessFunction extends ProcessFunction<Event, Result> {@Overridepublic void processElement(Event event, Context context, Collector<Result> collector) throws Exception {// 在这里编写处理逻辑// 可以根据业务需求进行各种操作,如数据转换、过滤、聚合等// 使用 collector 发送结果到下游// 示例:假设只处理 event.type 为 "click" 的事件if (event.getType().equals("click")) {// 将事件进行转换,生成 Result 对象,并发出到下游Result result = new Result(event.getUser(), event.getTimestamp());collector.collect(result);}}}
(2)
CoProcessFunction✦
public class MyCoProcessFunction extends CoProcessFunction<InputType1, InputType2, OutputType> {@Overridepublic void processElement1(InputType1 value, Context ctx, Collector<OutputType> out) throws Exception {// 处理第一个输入流的元素// 使用 ctx 来访问时间戳、计时器等上下文信息// 使用 out.collect() 发送处理结果}@Overridepublic void processElement2(InputType2 value, Context ctx, Collector<OutputType> out) throws Exception {// 处理第二个输入流的元素// 使用 ctx 来访问时间戳、计时器等上下文信息// 使用 out.collect() 发送处理结果}}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<InputType1> inputStream1 = ...; // 创建第一个输入流DataStream<InputType2> inputStream2 = ...; // 创建第二个输入流DataStream<OutputType> outputStream = inputStream1.connect(inputStream2).process(new MyCoProcessFunction());outputStream.print();env.execute("CoProcessFunction Example");
(3)
KeyedCoProcessFunction
✦
public class MergeStreamsFunction extends KeyedCoProcessFunction<KeyType, InputType1, InputType2, OutputType> {private ValueState<Integer> sumState;private ValueState<Integer> incrementState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<Integer> sumStateDescriptor = new ValueStateDescriptor<>("sumState", Integer.class);sumState = getRuntimeContext().getState(sumStateDescriptor);ValueStateDescriptor<Integer> incrementStateDescriptor = new ValueStateDescriptor<>("incrementState", Integer.class);incrementState = getRuntimeContext().getState(incrementStateDescriptor);}@Overridepublic void processElement1(InputType1 input1, Context context, Collector<OutputType> collector) throws Exception {// 处理第一个输入流的元素Integer increment = input1.getIncrement();incrementState.update(increment);Integer sum = sumState.value();if (sum == null) {sum = 0;}sum += increment;sumState.update(sum);collector.collect(new OutputType(input1.getKey(), sum, increment));}@Overridepublic void processElement2(InputType2 input2, Context context, Collector<OutputType> collector) throws Exception {// 处理第二个输入流的元素Integer increment = incrementState.value();Integer sum = sumState.value();Integer newSum = sum + input2.getValue();collector.collect(new OutputType(input2.getKey(), newSum, increment));}}
(4)
KeyedProcessFunction✦
DataStream<Tuple2<String, String>> stream = ...;// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream.keyBy(value -> value.f0).process(new CountWithTimeoutFunction());/*** The data type stored in the state*/public class CountWithTimestamp {public String key;public long count;public long lastModified;}/*** The implementation of the ProcessFunction that maintains the count and timeouts*/public class CountWithTimeoutFunctionextends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {/** The state that is maintained by this process function */private ValueState<CountWithTimestamp> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}@Overridepublic void processElement(Tuple2<String, String> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {// retrieve the current countCountWithTimestamp current = state.value();if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// update the state's countcurrent.count++;// set the state's timestamp to the record's assigned event time timestampcurrent.lastModified = ctx.timestamp();// write the state backstate.update(current);// schedule the next timer 60 seconds from the current event timectx.timerService().registerEventTimeTimer(current.lastModified + 60000);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// get the state for the key that scheduled the timerCountWithTimestamp result = state.value();// check if this is an outdated timer or the latest timerif (timestamp == result.lastModified + 60000) {// emit the state on timeoutout.collect(new Tuple2<String, Long>(result.key, result.count));}}}
动动小手

关注我们
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




