暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

窗口的低语:解锁 Flink 窗口函数的秘密

大数据技能圈 2023-09-04
38

Window Function

窗口函数

窗口是处理无限流的核心。Windows将流分成有限大小的“桶”,我们可以对其进行计算。今天我们来解密如何在Flink中执行窗口。

1

Window Function

(1)

ReduceFunction

ReduceFunction用于对数据流中的元素进行逐个聚合操作。它接收两个相同类型的输入参数,并将它们聚合为一个输出结果。ReduceFunction通常用于在滚动窗口中进行增量聚合操作,例如计算总和或最大值。
ReduceFunction内部通常会计算一次就输出一次结果。当ReduceFunction收到一批数据时,它会对这批数据进行计算和聚合操作,并在计算完成后生成一个输出结果。这个输出结果可以直接作为ReduceFunction的输出,也可以作为下一步计算的输入。在分布式计算中,多个ReduceFunction的输出结果可以再次聚合起来,得到最终的计算结果。因此,ReduceFunction通常是实时计算结果,并逐步输出的。
代码举例:
    DataStream<Tuple2<String, Long>> input = ...;


    input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
    public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
    return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
    }
        });

    上面的示例聚合了窗口中所有元素的元组的第二个字段。

    (2)

    AggregateFunction

    AggregateFunction用于对数据流中的元素进行一些聚合操作,并可以返回多个聚合结果。它接收一个输入元素并返回多个中间结果(Accumulator),然后将所有中间结果合并为一个最终结果。AggregateFunction通常用于在滑动窗口中进行聚合操作,例如计算平均值或计数。
    AggregateFunction是一种更通用的函数类型,在处理输入数据流时也可以进行计算和聚合操作。与ReduceFunction不同的是,AggregateFunction通常是在整个数据集上进行计算和聚合,而不是在每个数据到达时就输出一个结果。
    具体来说,AggregateFunction会收集和处理整个数据集,将数据集分为不同的子集进行计算和聚合,最终生成一个或多个聚合结果。这些聚合结果可以通过用户定义的方式进行组合,或者作为计算的中间结果传递给其他函数。AggregateFunction适用于对大规模数据集进行复杂的聚合操作,例如计算平均值、求和、最大值、最小值等。
    代码示例:
      private static class AverageAggregate
      implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
      @Override
      public Tuple2<Long, Long> createAccumulator() {
      return new Tuple2<>(0L, 0L);
      }


      @Override
      public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
      return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
      }


      @Override
      public Double getResult(Tuple2<Long, Long> accumulator) {
      return ((double) accumulator.f0) accumulator.f1;
      }


      @Override
      public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
      return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
      }
      }


      DataStream<Tuple2<String, Long>> input = ...;


      input
      .keyBy(<key selector>)
      .window(<window assigner>)
          .aggregate(new AverageAggregate());
      上面的示例计算窗口中元素的第二个字段的平均值。

      (3)

      ProcessWindowFunction

      ProcessWindowFunction获取一个包含窗口所有元素的Iterable,以及一个访问时间和状态信息的Context对象,这使它能够提供比其他窗口函数更大的灵活性。这是以性能和资源消耗为代价的,因为不能增量地聚合元素,而是需要在内部缓冲,直到窗口被认为可以处理为止。
      在使用ProcessWindowFunction时,需要实现其抽象方法process(),该方法接收输入数据、上下文对象和Collector,并输出结果数据。通过重写process()方法,开发者可以自定义窗口函数的操作逻辑。
      值得注意的是,ProcessWindowFunction只能在KeyedStream上使用,它基于每个键(key)的窗口执行计算。每个键都会有一个对应的窗口并进行独立处理。
      代码示例:
        DataStream<Tuple2<String, Long>> input = ...;


        input
        .keyBy(t -> t.f0)
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .process(new MyProcessWindowFunction());


        /* ... */


        public class MyProcessWindowFunction
        extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {


        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
        long count = 0;
        for (Tuple2<String, Long> in: input) {
        count++;
        }
        out.collect("Window: " + context.window() + "count: " + count);
        }
        }
        该示例显示了一个ProcessWindowFunction,它对窗口中的元素进行计数。此外,窗口函数将窗口的信息添加到输出中。

        (4)

        ProcessWindowFunction 和 ReduceFunction 组合 

        下面的示例展示了如何将增量ReduceFunction与ProcessWindowFunction结合起来,以返回窗口中最小的事件以及窗口的开始时间。
        代码示例:
          DataStream<SensorReading> input = ...;


          input
          .keyBy(<key selector>)
          .window(<window assigner>)
          .reduce(new MyReduceFunction(), new MyProcessWindowFunction());


          // Function definitions


          private static class MyReduceFunction implements ReduceFunction<SensorReading> {


          public SensorReading reduce(SensorReading r1, SensorReading r2) {
          return r1.value() > r2.value() ? r2 : r1;
          }
          }


          private static class MyProcessWindowFunction
          extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {


          public void process(String key,
          Context context,
          Iterable<SensorReading> minReadings,
          Collector<Tuple2<Long, SensorReading>> out) {
          SensorReading min = minReadings.iterator().next();
          out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
          }
          }


          (5)

          ProcessWindowFunction 和 AggregateFunction 组合 

          下面的示例展示了如何将增量AggregateFunction与ProcessWindowFunction结合起来计算平均值,并在平均值的同时发出键和窗口。
          代码示例:
            DataStream<Tuple2<String, Long>> input = ...;


            input
            .keyBy(<key selector>)
            .window(<window assigner>)
            .aggregate(new AverageAggregate(), new MyProcessWindowFunction());


            // Function definitions


            /**
            * 累加器用于保存累加和和计数。
            * The {@code getResult} method
            * computes the average.
            */
            private static class AverageAggregate
            implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
            @Override
            public Tuple2<Long, Long> createAccumulator() {
            return new Tuple2<>(0L, 0L);
            }


            @Override
            public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
            }


            @Override
            public Double getResult(Tuple2<Long, Long> accumulator) {
            return ((double) accumulator.f0) accumulator.f1;
            }


            @Override
            public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
            return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
            }
            }


            private static class MyProcessWindowFunction
            extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {


            public void process(String key,
            Context context,
            Iterable<Double> averages,
            Collector<Tuple2<String, Double>> out) {
            Double average = averages.iterator().next();
            out.collect(new Tuple2<>(key, average));
            }
            }

            2

            ProcessFunction

            (1)

            ProcessFunction

            ProcessFunction是一个低级的流处理操作,允许访问所有(无循环)流应用程序的基本构建块:
            events  (流元素)
            state  (容错、一致、仅在keyed stream上)
            timers  (事件时间和处理时间,仅在keyed stream上)
            ProcessFunction可以看作是一个可以访问keyed state和计时器的FlatMapFunction。它通过调用输入流中接收到的每个事件来处理事件。
            对于容错状态,ProcessFunction允许访问Flink的keyed state,可以通过RuntimeContext访问,类似于其他有状态函数访问keyed state的方式。
            timers 允许应用程序对处理时间和事件时间的变化作出反应。对函数processElement(…)的每次调用都会获得一个Context对象,该对象可以访问元素的事件时间戳和TimerService。TimerService可用于为未来的事件/处理时间瞬间注册回调。对于event-time timers,onTimer(…)方法在当前水印被推进到或超过定时器的时间戳时被调用,而对于 processing-time timers,onTimer(…)方法在时钟时间达到指定时间时被调用。在该调用期间,所有状态再次被绑定到为创建计时器时使用的键,从而允许计时器操纵keyed state。
            示例代码:
              public class MyProcessFunction extends ProcessFunction<Event, Result> {


              @Override
              public void processElement(Event event, Context context, Collector<Result> collector) throws Exception {
              // 在这里编写处理逻辑
              // 可以根据业务需求进行各种操作,如数据转换、过滤、聚合等
              // 使用 collector 发送结果到下游


              // 示例:假设只处理 event.type 为 "click" 的事件
              if (event.getType().equals("click")) {
              // 将事件进行转换,生成 Result 对象,并发出到下游
              Result result = new Result(event.getUser(), event.getTimestamp());
              collector.collect(result);
              }
              }
              }

              (2)

              CoProcessFunction

              CoProcessFunction用于处理两个流之间的连接和联合操作。与 ProcessFunction 类似,CoProcessFunction 也可以执行一系列的转换、过滤、聚合和侧输出等操作,但它可以同时处理两个输入流。
              CoProcessFunction 提供了以下几个重要的方法:
              processElement1: 对第一个输入流的每个事件调用一次该方法,可以用于执行一些计算和处理操作,并将结果发出到下游。
              processElement2: 对第二个输入流的每个事件调用一次该方法,可以用于执行一些计算和处理操作,并将结果发出到下游。
              onTimer: 当定时器触发时,会调用该方法,可以在此处编写定时任务的逻辑。
              使用 CoProcessFunction 可以处理多流之间的复杂关系,例如实现流的连接、联合、合并、分流等操作。常见的应用场景包括实时数据分析、数据关联、事件匹配等。
              示例代码:
                public class MyCoProcessFunction extends CoProcessFunction<InputType1, InputType2, OutputType> {


                @Override
                public void processElement1(InputType1 value, Context ctx, Collector<OutputType> out) throws Exception {
                // 处理第一个输入流的元素
                // 使用 ctx 来访问时间戳、计时器等上下文信息
                // 使用 out.collect() 发送处理结果
                }


                @Override
                public void processElement2(InputType2 value, Context ctx, Collector<OutputType> out) throws Exception {
                // 处理第二个输入流的元素
                // 使用 ctx 来访问时间戳、计时器等上下文信息
                // 使用 out.collect() 发送处理结果
                }
                }


                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


                DataStream<InputType1> inputStream1 = ...; // 创建第一个输入流
                DataStream<InputType2> inputStream2 = ...; // 创建第二个输入流


                DataStream<OutputType> outputStream = inputStream1
                .connect(inputStream2)
                .process(new MyCoProcessFunction());


                outputStream.print();


                env.execute("CoProcessFunction Example");

                (3)

                KeyedCoProcessFunction

                KeyedCoProcessFunction 和 CoProcessFunction 在使用场景上有一些区别,下面是它们的主要区别:
                键控操作:KeyedCoProcessFunction 是基于事件的键进行操作的,它可以按照键对事件进行处理。每个事件都会与一个键相关联,KeyedCoProcessFunction 可以根据键的值对事件进行分组和连接操作。在处理同一个键的事件时,可以使用状态和定时器进行相关操作。而 CoProcessFunction 则没有这样的键控操作,它是在两个输入流之间进行自由连接和联合操作的。
                状态管理:KeyedCoProcessFunction 可以使用键控状态(Keyed State)来存储和获取与特定键关联的状态信息。这使得可以在 KeyedCoProcessFunction 中跟踪和管理与特定键相关的状态。而 CoProcessFunction 可以使用 Operator State(算子状态),但它是全局状态,没有与特定键关联。
                实时数据关联:KeyedCoProcessFunction 适用于实时数据关联的场景,特别是在处理两个输入流之间的关联操作时非常有用。例如,根据用户 ID 将点击事件与购买事件进行关联,或者根据某个 ID 将两个数据流进行连接。CoProcessFunction 则适用于更一般的场景,可以灵活地处理两个输入流之间的各种联合、连接和关联操作。
                KeyedCoProcessFunction 更适用于需要基于键进行操作和状态管理的场景,它提供了键控操作和键控状态的能力。而 CoProcessFunction 则更适用于一般的流处理场景,可以处理各种自由的流连接和联合操作。选择适合的函数取决于具体的业务需求和数据处理需求。
                  public class MergeStreamsFunction extends KeyedCoProcessFunction<KeyType, InputType1, InputType2, OutputType> {

                  private ValueState<Integer> sumState;
                  private ValueState<Integer> incrementState;

                  @Override
                  public void open(Configuration parameters) throws Exception {
                  super.open(parameters);
                  ValueStateDescriptor<Integer> sumStateDescriptor = new ValueStateDescriptor<>("sumState", Integer.class);
                  sumState = getRuntimeContext().getState(sumStateDescriptor);

                  ValueStateDescriptor<Integer> incrementStateDescriptor = new ValueStateDescriptor<>("incrementState", Integer.class);
                  incrementState = getRuntimeContext().getState(incrementStateDescriptor);
                  }

                  @Override
                  public void processElement1(InputType1 input1, Context context, Collector<OutputType> collector) throws Exception {
                  // 处理第一个输入流的元素
                  Integer increment = input1.getIncrement();
                  incrementState.update(increment);

                  Integer sum = sumState.value();
                  if (sum == null) {
                  sum = 0;
                  }

                  sum += increment;
                  sumState.update(sum);

                  collector.collect(new OutputType(input1.getKey(), sum, increment));
                  }

                  @Override
                  public void processElement2(InputType2 input2, Context context, Collector<OutputType> collector) throws Exception {
                  // 处理第二个输入流的元素
                  Integer increment = incrementState.value();
                  Integer sum = sumState.value();

                  Integer newSum = sum + input2.getValue();

                  collector.collect(new OutputType(input2.getKey(), newSum, increment));
                  }
                  }

                  (4)

                  KeyedProcessFunction

                  KeyedProcessFunction用于处理流数据。它可以根据指定的键值对每个输入流的元素进行单独处理。用户可以根据具体需求自定义处理逻辑。
                  KeyedProcessFunction的核心概念是"键",它将流数据按照键值进行分组,并将相同键值的元素分配给同一个处理实例。KeyedProcessFunction有三个泛型参数:K、I、O,分别表示输入流的键类型、输入流的元素类型和输出流的元素类型。
                  KeyedProcessFunction提供了一系列的方法用于实现自定义的处理逻辑,主要包括以下几个:
                  processElement(I value, Context ctx, Collector out):该方法会对输入流的每个元素进行处理。通过value参数可以获取输入流的元素值,可以根据具体需求进行操作,并使用Collector.collect()方法发送处理结果。
                  onTimer(long timestamp, OnTimerContext ctx, Collector out):该方法用于处理定时器触发的操作。可以根据具体需求实现相应的逻辑,例如定时触发的计算或清理操作。
                  timerService():该方法可以获取定时器服务的引用,通过该引用可以注册和删除定时器。
                  在下面的示例中,keyyedprocessfunction维护每个键的计数,并且每当一分钟过去(在事件时间中)而没有更新该键时,就会发出一个键/计数对:
                  代码示例:
                    DataStream<Tuple2<String, String>> stream = ...;


                    // apply the process function onto a keyed stream
                    DataStream<Tuple2<String, Long>> result = stream
                    .keyBy(value -> value.f0)
                    .process(new CountWithTimeoutFunction());


                    /**
                    * The data type stored in the state
                    */
                    public class CountWithTimestamp {


                    public String key;
                    public long count;
                    public long lastModified;
                    }


                    /**
                    * The implementation of the ProcessFunction that maintains the count and timeouts
                    */
                    public class CountWithTimeoutFunction
                    extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {


                    /** The state that is maintained by this process function */
                    private ValueState<CountWithTimestamp> state;


                    @Override
                    public void open(Configuration parameters) throws Exception {
                    state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
                    }


                    @Override
                    public void processElement(
                    Tuple2<String, String> value,
                    Context ctx,
                    Collector<Tuple2<String, Long>> out) throws Exception {


                    // retrieve the current count
                    CountWithTimestamp current = state.value();
                    if (current == null) {
                    current = new CountWithTimestamp();
                    current.key = value.f0;
                    }


                    // update the state's count
                    current.count++;


                    // set the state's timestamp to the record's assigned event time timestamp
                    current.lastModified = ctx.timestamp();


                    // write the state back
                    state.update(current);


                    // schedule the next timer 60 seconds from the current event time
                    ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
                    }


                    @Override
                    public void onTimer(
                    long timestamp,
                    OnTimerContext ctx,
                    Collector<Tuple2<String, Long>> out) throws Exception {


                    // get the state for the key that scheduled the timer
                    CountWithTimestamp result = state.value();


                    // check if this is an outdated timer or the latest timer
                    if (timestamp == result.lastModified + 60000) {
                    // emit the state on timeout
                    out.collect(new Tuple2<String, Long>(result.key, result.count));
                    }
                    }
                    }


                    动动小手

                    关注我们

                    文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论