暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(三十一)之自定义键控状态(一)之ValueState

逗先生大数据 2021-09-17
1931

1、介绍

ValueState[T]保存单个的值,值的类型为T。

  • get操作: ValueState.value()

  • set操作: ValueState.update(value: T)

2、求当key出现了三次进行平均值计算

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
* @program: bigdata_learn
* @description: 当key出现了三次进行求平均值
* @author: Mr.逗
* @create: 2021-09-07 17:40
**/

public class CountAverageWithValueState extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {
/**
* 定义一个state:keyed state
*
* 1. ValueState里面只能存一条数据,如果来了第二条,就会覆盖第一条。
* 2. Tuple2<Long, Long>
* Long:
* 当前的key出现多少次 count 3
* Long:
* 当前的value的总和 sum
*
* sum/count = avg
*
*
* 如果我们想要使用这个state,首先要对state进行注册(初始化),固定的套路
*
*
*/

private ValueState<Tuple2<Long,Long>> countAndSum;
/**
* 这个方法其实是一个初始化的方法,只会执行一次
* 我们可以用来注册我们的状态
* @param parameters
* @throws Exception
*/

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//注册状态
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"avg_age",//状态名字
Types.TUPLE(Types.LONG, Types.LONG));//状态存储的数据类型
countAndSum=getRuntimeContext().getState(descriptor);
}

/**
* 每来一条数据,都会调用这个方法
* key相同
* @param element
* @param out
* @throws Exception
*/

@Override
public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
//拿到当前key的状态值
Tuple2<Long, Long> currentState = countAndSum.value();
//如果状态值没有初始化,则进行初始化
if (currentState==null)
{
currentState=Tuple2.of(0L,0L);
}
//更新状态值中元素的个数
currentState.f0+=1;
//更新状态值中的总值
currentState.f1+=element.f1;
//更新状态
countAndSum.update(currentState);
// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
if (currentState.f0==3)
{
double avg= (double)currentState.f1 currentState.f0;
//输出key及对应的平均值
out.collect(Tuple2.of(element.f0,avg));
//清空状态值
countAndSum.clear();
}
}
}


文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论