暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(二十五)之自定义filter

逗先生大数据 2021-11-09
1066

1、代码逻辑实现

package day02;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Calendar;
import java.util.Random;

/**
* @program: Flink_learn
* @description: 自定义filter
* @author: Mr.逗
* @create: 2021-09-17 10:04
**/

public class SelfFilter {
//1、事件PrJo
public static class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}

public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}

@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + timestamp +
'}';
}
}
//2、自定义数据源
public static class ClickSource implements SourceFunction<Event>
{
private boolean running = true;
private String[] userArr = {"Mary", "Bob", "Alice", "Liz"};
private String[] urlArr = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
private Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running)
{
ctx.collect(new Event(
userArr[random.nextInt(userArr.length)]
,urlArr[random.nextInt(urlArr.length)]
, Calendar.getInstance().getTimeInMillis()
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
//3、自定义filter
public static class MyFilter implements FilterFunction<Event>
{

@Override
public boolean filter(Event v) throws Exception {
return v.user.equals("Mary");
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
//1、filter(r -> r.user.equals("Mary"))
streamSource.filter(v->v.user.equals("Mary")).print();
//2、FilterFunction
streamSource.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event v) throws Exception {
return v.user.equals("Mary");
}
}).print();
//3、new MyFilter
streamSource.filter(new MyFilter()).print();
//4、flatMap
streamSource.flatMap(new FlatMapFunction<Event, Event>() {
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (event.user.equals("Mary"))
out.collect(event);
}
}).print();
String name = SelfFilter.class.getName();
env.execute(name);
}
}

复制

2、结果之展示

5> Event{user='Mary', url='./home', timestamp=1631933148157}
5> Event{user='Mary', url='./home', timestamp=1631933148157}
8> Event{user='Mary', url='./home', timestamp=1631933148157}
2> Event{user='Mary', url='./home', timestamp=1631933148157}

复制


文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论