一、AggregatingState的方法
AggregatingState需要和AggregateFunction配合使用
add()方法添加一个元素,触发AggregateFunction计算
get()获取State的值
二、AggregatingState描述器
在定义描述器时,第二个参数需要AggregateFunction类
//定义描述器
AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor(
"avg-temp",
new SensorRecordUtils.MyAvgTemp(),
TypeInformation.of(new TypeHint<Tuple2<Double, Integer>>(){})
);
//获取ReducingState
aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);复制
三、求key出现就进行求平均值
1、处理类
package test;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* @program: Flink_GY
* @description: AggregatingState求平均值
* @author: Mr.逗
* @create: 2021-09-09 11:35
**/
public class CountAVgWithAggregatingState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
private AggregatingState<Tuple2<Long, Long>, Double> aggregatingState;
//初始化
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// AggregatingStateDescriptor<Tuple2<Long, Long>, Tuple2<Double, Integer>, Double> descriptor
AggregatingStateDescriptor<Tuple2<Long,Long>,Tuple2<Long,Long>,Double> descriptor = new AggregatingStateDescriptor("AggregatingDescriptor", new AggregateFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Double>() {
//初始化
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<Long, Long> value, Tuple2<Long, Long> accumulator) {
Long currentCount = accumulator.f0;
currentCount += 1;
accumulator.f0 = currentCount;
long sum=0;
return new Tuple2<>(accumulator.f0, accumulator.f1+value.f1);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
double avg=0;
//当key出现三次进行求平均值
if(accumulator.f0==3)
{
avg= (double) accumulator.f1 / accumulator.f0;
}
return avg;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}, Double.class);
aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
aggregatingState.add(Tuple2.of(value.f0,value.f1));
out.collect(Tuple2.of(value.f0,aggregatingState.get()));
}
}复制
2、主类
package test;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @program: bigdata_learn
* @description: 测试reduceState
* @author: Mr.逗
* @create: 2021-09-08 17:43
**/
public class Test1KeyedReduceStateMain {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(16);
//获取数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));
// 输出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(0)
.flatMap(new CountAVgWithAggregatingState())
.print();
String name = Test1KeyedReduceStateMain.class.getName();
env.execute(name);
}
}复制
3、结果展示
文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1195次阅读
2025-04-27 16:53:22
2025年3月国产数据库中标情况一览:TDSQL大单622万、GaussDB大单581万……
通讯员
876次阅读
2025-04-10 15:35:48
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
683次阅读
2025-04-30 15:24:06
数据库,没有关税却有壁垒
多明戈教你玩狼人杀
584次阅读
2025-04-11 09:38:42
天津市政府数据库框采结果公布,7家数据库产品入选!
通讯员
573次阅读
2025-04-10 12:32:35
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
559次阅读
2025-04-14 09:40:20
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
490次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
465次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
458次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
352次阅读
2025-04-18 10:01:22