1、代码逻辑实现
package day02;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Calendar;
import java.util.Random;
/**
* @program: Flink_learn
* @description: 自定义filter
* @author: Mr.逗
* @create: 2021-09-17 10:04
**/
public class SelfFilter {
//1、事件PrJo
public static class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + timestamp +
'}';
}
}
//2、自定义数据源
public static class ClickSource implements SourceFunction<Event>
{
private boolean running = true;
private String[] userArr = {"Mary", "Bob", "Alice", "Liz"};
private String[] urlArr = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
private Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running)
{
ctx.collect(new Event(
userArr[random.nextInt(userArr.length)]
,urlArr[random.nextInt(urlArr.length)]
, Calendar.getInstance().getTimeInMillis()
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
//3、自定义filter
public static class MyFilter implements FilterFunction<Event>
{
@Override
public boolean filter(Event v) throws Exception {
return v.user.equals("Mary");
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
//1、filter(r -> r.user.equals("Mary"))
streamSource.filter(v->v.user.equals("Mary")).print();
//2、FilterFunction
streamSource.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event v) throws Exception {
return v.user.equals("Mary");
}
}).print();
//3、new MyFilter
streamSource.filter(new MyFilter()).print();
//4、flatMap
streamSource.flatMap(new FlatMapFunction<Event, Event>() {
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (event.user.equals("Mary"))
out.collect(event);
}
}).print();
String name = SelfFilter.class.getName();
env.execute(name);
}
}复制
2、结果之展示
5> Event{user='Mary', url='./home', timestamp=1631933148157}
5> Event{user='Mary', url='./home', timestamp=1631933148157}
8> Event{user='Mary', url='./home', timestamp=1631933148157}
2> Event{user='Mary', url='./home', timestamp=1631933148157}复制
文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
2025年4月中国数据库流行度排行榜:OB高分复登顶,崖山稳驭撼十强
墨天轮编辑部
2540次阅读
2025-04-09 15:33:27
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1184次阅读
2025-04-27 16:53:22
2025年3月国产数据库中标情况一览:TDSQL大单622万、GaussDB大单581万……
通讯员
857次阅读
2025-04-10 15:35:48
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
678次阅读
2025-04-30 15:24:06
数据库,没有关税却有壁垒
多明戈教你玩狼人杀
583次阅读
2025-04-11 09:38:42
天津市政府数据库框采结果公布,7家数据库产品入选!
通讯员
569次阅读
2025-04-10 12:32:35
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
547次阅读
2025-04-14 09:40:20
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
487次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
464次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
457次阅读
2025-04-30 12:17:50