14、Flink 的 Operator State代码示例

发布于:2024-05-03 ⋅ 阅读:(176) ⋅ 点赞:(0)

1、CounterSource

class CounterSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction,CheckpointListener {

    /**
     * current offset for exactly once semantics
     */
    private Long offset = 0L;

    /**
     * flag for job cancellation
     */
    private volatile boolean isRunning = true;

    /**
     * 存储 state 的变量.
     */
    private ListState<Long> state;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
                "state",
                LongSerializer.INSTANCE));

        // 从已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        if (context.isRestored()) {
            System.out.println("任务出错,source 正在从 Checkpoint 重启");
            for (Long l : state.get()) {
                offset = l;
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.update(Collections.singletonList(offset));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        System.out.println("=====Source算子的Checkpoint " + checkpointId + " completed.");
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        System.out.println("=====Source算子的Checkpoint " + checkpointId + " Aborted.");
    }
}

2、BufferingSink

class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction, CheckpointListener {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element : bufferedElements) {
                System.out.println("当前阈值为=>" + threshold + ",输出元素为=>" + element);
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.update(bufferedElements);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                        "buffered-elements",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                        }));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            System.out.println("任务出错,sink 正在从 Checkpoint 重启");
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        System.out.println("=====Sink算子的Checkpoint " + checkpointId + " completed.");
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        System.out.println("=====Sink算子的Checkpoint " + checkpointId + " Aborted.");
    }
}

3、完整测试用例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * source 每 300ms 下发一条数据
 * sink 每攒够 10 条数据打印一次
 * 即整个链路每 3s 打印一次,完成一个循环
 * 设置 checkpoint 快照为 3s
 */
public class _03_OperatorState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.setParallelism(1);

        DataStreamSource<Long> myExactlyOnceSource = env.addSource(new CounterSource());

        // 模拟出现异常
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = myExactlyOnceSource.map(new MapFunction<Long, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Long value) throws Exception {
                if (value == 30 || value == 50) {
                    int error = 1 / 0;
                    return new Tuple2<String, Integer>("error", 0);
                }
                return new Tuple2<String, Integer>("res", value.intValue());
            }
        });

        map.addSink(new BufferingSink(10));

        env.execute();
    }
}