1、代码逻辑实现
package day04;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;
/**
* @program: bigData_learn
* @description: 使用增量聚合函数,统计每个用户每5秒钟窗口的pv
* @author: Mr.逗
* @create: 2021-09-23 11:27
**/
public class IncAgg {
// SourceFunction并行度只能为1
// 自定义并行化版本的数据源,需要使用ParallelSourceFunction
public static class ClickSource implements SourceFunction<Event> {
private boolean running = true;
private String[] userArr = {"Mary", "Bob", "Alice", "Liz"};
private String[] urlArr = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
private Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running) {
// collect方法,向下游发送数据
ctx.collect(
new Event(
userArr[random.nextInt(userArr.length)],
urlArr[random.nextInt(urlArr.length)],
Calendar.getInstance().getTimeInMillis()
)
);
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
public static class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
public static class CountAgg implements AggregateFunction<Event,Integer,Integer>
{
//创建累加器
@Override
public Integer createAccumulator() {
return 0;
}
//定义累加规则
@Override
public Integer add(Event event, Integer accumulator) {
return accumulator+1;
}
//获取结果
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> source = env.addSource(new ClickSource());
source.keyBy(v->v.user)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new CountAgg())
.print();
String name = IncAgg.class.getName();
env.execute(name);
}
}
2、结果展示
2
1
1
2
2
1
3
1
1
2
1
2
2
3
2
1
1
1
1
1
3
1
1
2
2
2
1
2
1
1
1
1
1
2
1
2
1
2
3
2
文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。