暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(三十五)之自定义键控状态(四)ReducingState

逗先生大数据 2021-11-22
1068

一、AggregatingState的方法

AggregatingState需要和AggregateFunction配合使用

add()方法添加一个元素,触发AggregateFunction计算

get()获取State的值


二、AggregatingState描述器

在定义描述器时,第二个参数需要AggregateFunction类

//定义描述器
AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor(
"avg-temp",
new SensorRecordUtils.MyAvgTemp(),
TypeInformation.of(new TypeHint<Tuple2<Double, Integer>>(){})
);

//获取ReducingState
aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);

复制

三、求key出现就进行求平均值

1、处理类

package test;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @program: Flink_GY
* @description: AggregatingState求平均值
* @author: Mr.逗
* @create: 2021-09-09 11:35
**/

public class CountAVgWithAggregatingState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
private AggregatingState<Tuple2<Long, Long>, Double> aggregatingState;
//初始化
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// AggregatingStateDescriptor<Tuple2<Long, Long>, Tuple2<Double, Integer>, Double> descriptor
AggregatingStateDescriptor<Tuple2<Long,Long>,Tuple2<Long,Long>,Double> descriptor = new AggregatingStateDescriptor("AggregatingDescriptor", new AggregateFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Double>() {
//初始化
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Tuple2<Long, Long> value, Tuple2<Long, Long> accumulator) {
Long currentCount = accumulator.f0;
currentCount += 1;
accumulator.f0 = currentCount;
long sum=0;
return new Tuple2<>(accumulator.f0, accumulator.f1+value.f1);
}

@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
double avg=0;
//当key出现三次进行求平均值
if(accumulator.f0==3)
{
avg= (double) accumulator.f1 / accumulator.f0;
}
return avg;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}, Double.class);
aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
aggregatingState.add(Tuple2.of(value.f0,value.f1));
out.collect(Tuple2.of(value.f0,aggregatingState.get()));
}
}

复制

2、主类

package test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @program: bigdata_learn
* @description: 测试reduceState
* @author: Mr.逗
* @create: 2021-09-08 17:43
**/

public class Test1KeyedReduceStateMain {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(16);
//获取数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));


// 输出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(0)
.flatMap(new CountAVgWithAggregatingState())
.print();
String name = Test1KeyedReduceStateMain.class.getName();
env.execute(name);
}
}

复制

3、结果展示



文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论