背景
最近在做 Flink 任务数据源切换时遇到 offset 消费问题,遂写篇文章记录下来。
切换时只修改了 source 算子的 topic,uid 等其他信息保持不变:
- 发布时,发现算子的消费者点位重置为earliest,导致消息积压。
- 消息积压后,打算通过时间戳重置点位到发布前,但是发现点位重置失效。
原因分析
source算子点位初始化模式
source算子点位初始化有两种方式:1)消费者组偏移量:setStartFromGroupOffsets;2)时间戳:setStartFromTimestamp。
消费组偏移量(FromGroupOffsets)
该方式会将 startupMode 初始化为 StartupMode.GROUP_OFFSETS:
startupMode枚举:
时间戳(FromTimestamp)
该方式会将 startupMode 初始化为 StartupMode.TIMESTAMP:
source 算子初始化
示例代码:
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");
configuration.setString("execution.savepoint.path", "xxx");
configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 启用checkpoint
env.enableCheckpointing(5000);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
ParameterTool argTools = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(argTools);
// 添加数据源
// "old_topic", "new_topic"
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
.uid("kafka-source")
.name("kafka-source");
SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {
@Override
public HeartEntity map(String value) throws Exception {
HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);
return heartEntity;
}
}).uid("map-heart").name("map-heart");
// 使用状态计数
DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo)
.map(new RichMapFunction<HeartEntity, Long>() {
private transient ValueState<Long> countState;
private long count = 0;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));
countState = getRuntimeContext().getState(descriptor);
}
@Override
public Long map(HeartEntity value) throws Exception {
count++;
countState.update(count);
return countState.value();
}
})
.uid("count-map")
.name("count-map");
// 打印计数结果
countStream.print().uid("print").name("print");
// 启动Flink任务
env.execute("Flink Kafka State Count Example");
}
从状态启动
initializeState
状态初始化时,FlinkKafkaConsumerBase 执行 initializeState 方法中:当 source topic 从 sg_lock_heart_msg_topic 切换为 sg_tw_common_com_lock_heart_report_topic 时,可以看到新 topic 绑定的 source 算子仍然是从老 topic 的算子状态启动的,因为 uid 没变。
initializeState 往下走可以看到,restoreState 的是老 topic 分区的状态;
open
算子初始化时,如果状态不为空且 topic 分区不在状态中,那么就会把新的 topic 分区加入到状态中,并设置算子消费新分区的 startupMode 为 EARLIEST_OFFSET,即从最早的消息开始消费。
老的 topic 分区不会再消费,会被移除订阅。
订阅的 topic 分区
从指定时间戳启动
setStartFromTimestamp 设置启动模式为时间戳
然而在算子初始化时,由于从状态启动,新 topic分区 仍然会从 earliest 消费:
也就是说,checkpoint/savepoint 中存储的 source 点位状态在恢复时大于设置的时间戳。
解决方案
尝试一(修改 uid)
从 source 算子初始化的 open 过程可知,既然从状态启动时会将已存在 source 算子(uid在状态中)的新 topic 点位设置为最早,那么如果将新 topic 的 uid 改成与老 topic 的 uid 不一致,是否就能避免从 earliest 恢复:因为从状态恢复时新的 uid 并不在状态中,那么就不会走 open 中将新 topic 点位置为 earliest 的流程。
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer)
.uid("kafka-source-new")
.name("kafka-source-new");
可以看到在状态初始化阶段(initializeState),source 算子的状态 (restoreState)被置为空集合,而不是 null。为什么?
当在算子初始化时,因为 restoreState 不为 null,仍然会进入点位重置的流程:
可以看到这里将新 topic 分区放入了 restoreState 中,且点位置为 earliest(StarupMode 枚举中,EARLIEST_OFFSET = -915623761775L)。
再往下走,restoreState 会将其中的新 topic 分区放入订阅的分区中
从此,新 topic 又从最早开始消费😓。那么方案尝试一是失败的!
在线上实际操作时,消费点位确实被重置到了 earliest,又导致积压了😦。
尝试二(修改消费者组)
有没有办法让 restoreState 置为 null 呢,那就真的不会走到点位重置的流程了🎊
突然看到 restoreState 的注释:
如果消费者从状态恢复,就会设置 restoreState。那怎么让消费者不从状态恢复?无状态启动肯定是不行的,不能让其他算子的状态丢了。那我直接换个消费组名!试一试呢
Properties props = new Properties();
props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");
props.put("group.id", "flink-label-engine-new");
还是不行,直到目前发现只要从状态启动,context 上下文会让代码走进给 restoreState 赋值的位置。
isRestored分析
尝试三(新增拓扑图)
根据算子状态恢复可知,只要新增的 source 算子跟其他已有算子形成了算子链,如果以状态启动,那么 source 的点位就会被置为 earliest。
- 新增一个新 topic 的 source 算子和 sink 算子(要保证新增的算子与已有算子隔离,不会形成算子链),然后修改老 source 算子的 uid 和 topic 与新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer)
.uid("kafka-source-old")
.name("kafka-source-old");
// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new)
.uid("kafka-source-new")
.name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");
由于从状态启动,且新加入的算子与其他算子隔离,老 source 算子的点位从状态启动;新 source 算子的点位被置为 GROUP_OFFSET。
1. 暂停并保存状态;
2. 修改老 source 算子的 uid 和 topic 与 新算子保持一致,同时删除新算子;
3. 然后从状态启动(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子点位从状态恢复:
4. 下游 “count-map”的状态是否正常:发送测试消息,可以看出状态没丢失