跳转至

CheckpointConfig

package com.atguigu.checkpoint;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;

import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;

public class CheckpointConfigDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        // TODO 最终检查点:1.15开始,默认是true
//        configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);

        configuration.set(RestOptions.PORT, 8083);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        env.setParallelism(1);

        // TODO 开启 Changelog
        // 要求checkpoint的最大并发必须为1,其他参数建议在flink-conf配置文件中去指定
        env.enableChangelogStateBackend(true);

        // 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        // TODO 检查点常用配置
        // 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 2、指定检查点的存储位置
        // checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
        checkpointConfig.setCheckpointStorage("file:///D:/Dev/code/java/FlinkTutorial/output");
        // 3、checkpoint的超时时间: 默认10分钟
        checkpointConfig.setCheckpointTimeout(60000);
        // 4、同时运行中的checkpoint的最大数量
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔
        checkpointConfig.setMinPauseBetweenCheckpoints(1000);
        // 6、取消作业时,checkpoint的数据 是否保留在外部系统
        // DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删)
        // RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 7、允许 checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉
        checkpointConfig.setTolerableCheckpointFailureNumber(10);

        // TODO 开启 非对齐检查点(barrier非对齐)
        // 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1
        checkpointConfig.enableUnalignedCheckpoints();
        // 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点
        // 如果大于0, 一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)
        checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));

        env.socketTextStream("192.168.1.7", 9091)
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}

/**
 * TODO 检查点算法的总结
 * 1、Barrier对齐: 一个Task 收到 所有上游 同一个编号的 barrier之后,才会对自己的本地状态做 备份
 *      精准一次: 在barrier对齐过程中,barrier后面的数据 阻塞等待(不会越过barrier)
 *      至少一次: 在barrier对齐过程中,先到的barrier,其后面的数据 不阻塞 接着计算
 *
 * 2、非Barrier对齐: 一个Task 收到 第一个 barrier时,就开始 执行备份,能保证 精准一次(flink 1.11出的新算法)
 *      先到的barrier,将 本地状态 备份, 其后面的数据接着计算输出
 *      未到的barrier,其 前面的数据 接着计算输出,同时 也保存到 备份中
 *      最后一个barrier到达 该Task时,这个Task的备份结束
 */