
Flink的两阶段提交(Two-Phase Commit)是一种分布式事务处理机制,用于确保在跨多个节点的分布式系统中进行的事务的原子性。在分布式系统中,因为涉及到多个节点的操作,因此很容易出现部分节点执行了事务,而其他节点没有执行的情况,这将导致数据的不一致性。为了解决这个问题,需要使用一种同步机制,确保在一个事务中,所有节点要么全部执行,要么全部回滚。而Flink的两阶段提交正是一种这样的机制。
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Random;public class TwoPhaseCommitExample {private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitExample.class);public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启检查点,每5000ms进行一次env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 数据源env.addSource(new SourceFunction<String>() {private volatile boolean running = true;private String[] data = new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"};@Overridepublic void run(SourceContext<String> ctx) throws Exception {final Random rand = new Random(System.currentTimeMillis());while(running){int idx = rand.nextInt(data.length);ctx.collect(data[idx]);Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}})// 进行转换操作.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}}).map(new TransactionMapper())// 写入输出.addSink(new TransactionSink());env.execute("TwoPhaseCommitExample");}/*** 事务对象*/public static class Transaction {private String id;private String data;private TransactionState state;public Transaction() {}public Transaction(String id, String data) {this.id = id;this.data = data;this.state = TransactionState.New;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getData() {return data;}public void setData(String data) {this.data = data;}public TransactionState getState(){return state;}public void setState(TransactionState state){this.state = state;}@Overridepublic String toString() {return "Transaction{" +"id='" + id + '\'' +", data='" + data + '\'' +", state=" + state +'}';}}/*** 事务状态枚举类*/public static enum TransactionState {New,Prepared,Committed,Aborted}/*** 事务转换操作*/public static class TransactionMapper extends RichMapFunction<String, Transaction> {private transient ValueState<Transaction> state;@Overridepublic void open(Configuration config) throws Exception {super.open(config);ValueStateDescriptor<Transaction> descriptor =new ValueStateDescriptor<>("transaction", Transaction.class);state = getRuntimeContext().getState(descriptor);}@Overridepublic Transaction map(String value) throws Exception {Transaction transaction = state.value();if(transaction == null){transaction = new Transaction(value, "");state.update(transaction);}transaction.setData(value);transaction.setState(TransactionState.Prepared);return transaction;}}/*** 事务输出操作*/public static class TransactionSink extends RichSinkFunction<Transaction> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(TransactionSink.class);private List<Transaction> transactions = new ArrayList<>();private transient ValueState<Byte> state;@Overridepublic void open(Configuration config) throws Exception {super.open(config);ValueStateDescriptor<Byte> descriptor =new ValueStateDescriptor<>("state", Byte.class);state = getRuntimeContext().getState(descriptor);}@Overridepublic void invoke(Transaction transaction) throws Exception {switch (transaction.getState()) {case Prepared:transactions.add(transaction);break;case Aborted:transaction.setState(TransactionState.Aborted);break;case Committed:for (Transaction t : transactions) {t.setState(TransactionState.Committed);}state.update((byte)0);transactions.clear();break;default:break;}}}}
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




