Flink-Source算子点位提交问题(Earliest)

发布于:2025-07-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

背景

最近在做 Flink 任务数据源切换时遇到 offset 消费问题,遂写篇文章记录下来。

切换时只修改了 source 算子的 topic,uid 等其他信息保持不变:

  1. 发布时,发现算子的消费者点位重置为earliest,导致消息积压。
  2. 消息积压后,打算通过时间戳重置点位到发布前,但是发现点位重置失效。

原因分析

source算子点位初始化模式

source算子点位初始化有两种方式:1)消费者组偏移量:setStartFromGroupOffsets;2)时间戳:setStartFromTimestamp。

消费组偏移量(FromGroupOffsets)

该方式会将 startupMode 初始化为 StartupMode.GROUP_OFFSETS:

startupMode枚举:

时间戳(FromTimestamp)

该方式会将 startupMode 初始化为 StartupMode.TIMESTAMP:

source 算子初始化

示例代码:

public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    // configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
    // configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");
    configuration.setString("execution.savepoint.path", "xxx");
    configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    // 启用checkpoint
    env.enableCheckpointing(5000);
    env.setParallelism(1);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    ParameterTool argTools = ParameterTool.fromArgs(args);
    env.getConfig().setGlobalJobParameters(argTools);

    // 添加数据源
    // "old_topic", "new_topic"
    FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();
    consumer.setStartFromGroupOffsets();
    DataStream<String> stream = env.addSource(consumer)
    .uid("kafka-source")
    .name("kafka-source");

    SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {
        @Override
        public HeartEntity map(String value) throws Exception {
            HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);
            return heartEntity;
        }
    }).uid("map-heart").name("map-heart");

    // 使用状态计数
    DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo)
    .map(new RichMapFunction<HeartEntity, Long>() {
        private transient ValueState<Long> countState;
        private long count = 0;

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化状态
            ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));
            countState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public Long map(HeartEntity value) throws Exception {
            count++;
            countState.update(count);
            return countState.value();
        }
    })
    .uid("count-map")
    .name("count-map");

    // 打印计数结果
    countStream.print().uid("print").name("print");

    // 启动Flink任务
    env.execute("Flink Kafka State Count Example");

}
从状态启动
initializeState

状态初始化时,FlinkKafkaConsumerBase 执行 initializeState 方法中:当 source topic 从 sg_lock_heart_msg_topic 切换为 sg_tw_common_com_lock_heart_report_topic 时,可以看到新 topic 绑定的 source 算子仍然是从老 topic 的算子状态启动的,因为 uid 没变。

initializeState 往下走可以看到,restoreState 的是老 topic 分区的状态;

open

算子初始化时,如果状态不为空且 topic 分区不在状态中,那么就会把新的 topic 分区加入到状态中,并设置算子消费新分区的 startupMode 为 EARLIEST_OFFSET,即从最早的消息开始消费。

老的 topic 分区不会再消费,会被移除订阅。

订阅的 topic 分区

从指定时间戳启动

setStartFromTimestamp 设置启动模式为时间戳

然而在算子初始化时,由于从状态启动,新 topic分区 仍然会从 earliest 消费:

也就是说,checkpoint/savepoint 中存储的 source 点位状态在恢复时大于设置的时间戳。

解决方案

尝试一(修改 uid)

从 source 算子初始化的 open 过程可知,既然从状态启动时会将已存在 source 算子(uid在状态中)的新 topic 点位设置为最早,那么如果将新 topic 的 uid 改成与老 topic 的 uid 不一致,是否就能避免从 earliest 恢复:因为从状态恢复时新的 uid 并不在状态中,那么就不会走 open 中将新 topic 点位置为 earliest 的流程。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer)
                .uid("kafka-source-new")
                .name("kafka-source-new");

可以看到在状态初始化阶段(initializeState),source 算子的状态 (restoreState)被置为空集合,而不是 null。为什么?

当在算子初始化时,因为 restoreState 不为 null,仍然会进入点位重置的流程:

可以看到这里将新 topic 分区放入了 restoreState 中,且点位置为 earliest(StarupMode 枚举中,EARLIEST_OFFSET = -915623761775L)。

再往下走,restoreState 会将其中的新 topic 分区放入订阅的分区中

从此,新 topic 又从最早开始消费😓。那么方案尝试一是失败的!

在线上实际操作时,消费点位确实被重置到了 earliest,又导致积压了😦。

尝试二(修改消费者组)

有没有办法让 restoreState 置为 null 呢,那就真的不会走到点位重置的流程了🎊

突然看到 restoreState 的注释:

如果消费者从状态恢复,就会设置 restoreState。那怎么让消费者不从状态恢复?无状态启动肯定是不行的,不能让其他算子的状态丢了。那我直接换个消费组名!试一试呢

Properties props = new Properties();
        props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");
        props.put("group.id", "flink-label-engine-new");

还是不行,直到目前发现只要从状态启动,context 上下文会让代码走进给 restoreState 赋值的位置。

isRestored分析

isRestore分析

尝试三(新增拓扑图)

根据算子状态恢复可知,只要新增的 source 算子跟其他已有算子形成了算子链,如果以状态启动,那么 source 的点位就会被置为 earliest。

  1. 新增一个新 topic 的 source 算子和 sink 算子(要保证新增的算子与已有算子隔离,不会形成算子链),然后修改老 source 算子的 uid 和 topic 与新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
        .uid("kafka-source-old")
        .name("kafka-source-old");

// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new)
        .uid("kafka-source-new")
        .name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");

由于从状态启动,且新加入的算子与其他算子隔离,老 source 算子的点位从状态启动;新 source 算子的点位被置为 GROUP_OFFSET。

1. 暂停并保存状态;

2. 修改老 source 算子的 uid 和 topic 与 新算子保持一致,同时删除新算子;

3. 然后从状态启动(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子点位从状态恢复:

4. 下游 “count-map”的状态是否正常:发送测试消息,可以看出状态没丢失


网站公告

今日签到

点亮在社区的每一天
去签到