暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(二十二)之自定义Map

逗先生大数据 2021-10-28
1385

1、代码逻辑实现

package day02;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Random;
/**
* @program: Flink_learn
* @description: 自定义map
* @author: Mr.逗
* @create: 2021-09-14 16:11
**/

public class SelfMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1、.map(r -> Tuple2.of(r, r))
env.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(random.nextInt(1000));
Thread.sleep(1000);
}
}

@Override
public void cancel() {
running = false;
}
}).map(r -> Tuple2.of(r, r))
// 会被擦除成Tuple2<Object, Object>
// 需要returns方法来标注一下map函数的输出类型
.returns(Types.TUPLE(Types.INT, Types.INT)).print();
//2、new MapFunction
env.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(random.nextInt(1000));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return Tuple2.of(value, value);
}
}).print();
//3、.map(new MyMap())
env.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(random.nextInt(1000));
Thread.sleep(1000);
}
}

@Override
public void cancel() {
running = false;
}
}).map(new MyMap()).print();
//4、flatMap(new FlatMapFunction<Integer, Tuple2<Integer, Integer>>()
env.addSource(new SourceFunction<Integer>() {
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(random.nextInt(1000));
Thread.sleep(1000);
}
}

@Override
public void cancel() {
running = false;
}
}).flatMap(new FlatMapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
out.collect(Tuple2.of(value, value));
}
}).print();

env.execute();
}

public static class MyMap implements MapFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return Tuple2.of(value, value);
}
}
}

2、结果展示

(124,124)
(208,208)
(496,496)
(252,252)
(845,845)
(567,567)
(507,507)
(839,839)


文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论