暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink Window Trigger

李孟的博客 2021-01-23
293

一.简介

在window操作时,有三个重要点:

  • 窗口分配器(assigner),决定着流入flink的数据,该属于哪个窗口。
  • 时间戳抽取器/watermark生成器,抽取时间戳并驱动着程序正常执行。
  • trigger,决定着数据啥时候落地。

flink 有很多内置的触发器,对于基于事件事件窗口触发器叫做EventTimeTrigger,其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。

二.实现

在window使用过程中,我们发现即使我们不指定trigger也是可以,这个时候assigner会自动为我们创建一个默认trigger,类型由TimeCharacteristic决定。

Trigger

@PublicEvolving
public abstract class Trigger<TW extends Windowimplements Serializable {
 private static final long serialVersionUID = -4104633972991191369L;
 /**
  * Called for every element that gets added to a pane. The result of this will determine
  * whether the pane is evaluated to emit results.
  *
  * @param element The element that arrived.
  * @param timestamp The timestamp of the element that arrived.
  * @param window The window to which the element is being added.
  * @param ctx A context object that can be used to register timer callbacks.
  */

 public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
 /**
  * Called when a processing-time timer that was set using the trigger context fires.
  *
  * @param time The timestamp at which the timer fired.
  * @param window The window for which the timer fired.
  * @param ctx A context object that can be used to register timer callbacks.
  */

 public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
 /**
  * Called when an event-time timer that was set using the trigger context fires.
  *
  * @param time The timestamp at which the timer fired.
  * @param window The window for which the timer fired.
  * @param ctx A context object that can be used to register timer callbacks.
  */

 public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
 /**
  * Returns true if this trigger supports merging of trigger state and can therefore
  * be used with a
  * {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
  *
  * <p>If this returns {@code true} you must properly implement
  * {@link #onMerge(Window, OnMergeContext)}
  */

 public boolean canMerge() {
  return false;
 }
 /**
  * Called when several windows have been merged into one window by the
  * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
  *
  * @param window The new window that results from the merge.
  * @param ctx A context object that can be used to register timer callbacks and access state.
  */

 public void onMerge(W window, OnMergeContext ctx) throws Exception {
  throw new UnsupportedOperationException("This trigger does not support merging.");
 }
 /**
  * Clears any state that the trigger might still hold for the given window. This is called
  * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
  * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
  * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
  */

 public abstract void clear(W window, TriggerContext ctx) throws Exception;
}

复制

抽象方法

  • onElement ,进入窗口的每个元素都会调用该方法。
  • onEventTime,事件时间timer触发的时候被调用。
  • onProcessingTime,处理时间timer触发的时候会被调用。
  • onMerge,由状态的触发器相关,并在它们相应的窗口合并时,合并两个触发器的状态,例如使用会话窗口。
  • clear,窗口销毁时调用(窗口结束期加上最大延迟时间)。

返回类TriggerResult

  • CONTINUE,什么都不做。
  • FIRE,触发计算。
  • PURE,清除窗口的元素。
  • FIRE_AND_PURE,触发计算和清除窗口元素。

通过参考内置的EventTimeTrigger类可以看出,窗口触发计算的逻辑就是由onElement/onEventTime/onProcessingTime几个重要方法内、根据具体的业务逻辑来控制返回TriggerResult以达到相应的窗口控制。

三.内置Trigger

  • Flink内部内置了很多Trigger,类型以及作用如下
  • EventTimeTrigger,EventTime超过Watermark时触发计算。
  • CountTrigger,个数超过maxCount时触发计算。
  • ProcessingTimeTrigger,onProcessingTime触发时触发计算。
  • ContinuousEventTimeTrigger,基于事件事件的按照指定时间间隔持续触发的触发器。
  • ContinuousProcessingTimeTrigger,基于处理时间的按照指定时间间隔持续触发的触发器。
  • PurgingTrigger,该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。
  • DeltaTrigger,基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。

四.实现

计数通过固定窗口的消息条数,超过规定阈值输出。CountTrigger实现

public class CountTrigger extends Trigger<FlinkSourceTrigger.modelTimeWindow{
  private static final long serialVersionUID = 1L;
  private final long maxCount;
  private final  ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count",new Sum(), LongSerializer.INSTANCE);
  public CountTrigger(long maxCount) {
  super();
  this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(FlinkSourceTrigger.model element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  ctx.registerProcessingTimeTimer(window.maxTimestamp());
  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if(count.get() >= maxCount){
    count.clear();
    return TriggerResult.FIRE_AND_PURGE;
  }
  return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
  return TriggerResult.FIRE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
  return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
  ctx.getPartitionedState(stateDesc).clear();
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
  ctx.mergePartitionedState(stateDesc);
  long l = window.maxTimestamp();
  if(l > ctx.getCurrentProcessingTime()){
  ctx.registerProcessingTimeTimer(l);
}
}
private static class  Sum implements ReduceFunction<Long>{
  private static final long serialVersionUID = 2L;
  @Override
  public Long reduce(Long value1, Long value2) throws Exception {
    return value1 + value2;
  }
}
}

复制

主程序实现

object FlinkSourceTrigger{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  
    env.setParallelism(1)
    val line = env.socketTextStream("localhost",9999)
    import org.apache.flink.api.scala._
    line.filter(f=> !StringUtils.isNullOrWhitespaceOnly(f))
        .map(new LineSplitter)
        .keyBy(f=>f.id)
        .timeWindow(Time.seconds(10))
        .trigger(new CountTrigger(2))
        .process(new MyProcessWindowFunction)
        .print()

    env.execute("FlinkSourceTrigger")
  }
  case class model(id:String,time:Long,num:Intextends Serializable
  class MyProcessWindowFunction extends ProcessWindowFunction[model, model, StringTimeWindow]{
    override def process(key: String, context: Context, elements: Iterable[model], out: Collector[model]): Unit = {
      println("start")
      elements.foreach(f=>{
        out.collect(f)
      })
      println("end")
    }
  }
  class LineSplitter extends MapFunction[String,model]{
    override def map(value: String): model = {
      val arrays = value.toLowerCase.split("\\W+")
      model(arrays(0), arrays(1).toLong, 1)
    }
  }
}

复制

输入

aa 1601365080
aa 1601365080
aa 1601365080
aa 1601365080
bb 1601466080
bb 1601466080
ee 1601469080
ee 1601469080

复制

输出

start
model(aa,1601365080,1)
model(aa,1601365080,1)
end
start
model(aa,1601365080,1)
model(aa,1601365080,1)
end
start
model(bb,1601466080,1)
model(bb,1601466080,1)
end
start
model(ee,1601469080,1)
model(ee,1601469080,1)
end

复制


文章转载自李孟的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论