作者:禅与计算机程序设计艺术
1.简介
Apache Flink is an open source stream processing framework for distributed computing. It provides a high-level abstraction for dataflows that can be executed on multiple nodes to process large volumes of streaming data with low latency. With its fault tolerance mechanism based on the Resilient Distributed Datasets (RDD) architecture, it ensures that applications continue running even if some nodes fail or recover quickly without losing any data. However, it does not guarantee exactly-once semantics among all outputs when there are failures. In this article, we will explore how exactly-once semantics can be achieved using Apache Flink's checkpointing and recovery mechanisms.
Apache Flink has two main mechanisms for ensuring exactly-once message delivery:
Checkpointing: Checkpoints provide a point at which the state of the system is persisted so that all tasks can resume execution from the same consistent snapshot. The checkpoint creates a full snapshot of the application’s state, including all operators and their states. A checkpoint ensures that all messages have been processed before restoring from the checkpoint.
Recovery: When a failure occurs, Flink restores the latest completed checkpoint and restarts the failed task(s) to continue processing after the last successful checkpoint. During recovery, Flink replays unacknowledged messages to ensure that no duplicates are produced by the output streams.
In addition to these core mechanisms, Flink also includes various features such as state partitioning, metrics collection, windowing support, timers, type safety, etc., which make development simpler and more efficient. We assume readers are familiar with Flink concepts like JobManager, TaskManagers, dataflow, operator, datastream API, RDD, etc.
We will use Java APIs in this article to illustrate our approach. We will also demonstrate how different checkpointing strategies impact the level of fault tolerance and performance of Flink. Finally, we will discuss design considerations and tradeoffs involved in achieving exactly-once semantics in Flink.
2.基本概念术语说明
Before diving into the details of implementing exactly-once semantics in Flink, let us first understand the basic concepts and terminologies used in Flink:
2.1 Apache Flink Architecture
The following diagram shows the overall architecture of Apache Flink: Flink consists of three main components - JobManager, TaskManagers, and the Cluster. Each TaskManager runs one or many Tasks that execute parts of a parallel data flow graph called jobs. The JobManager acts as the central scheduler that assigns tasks to available TaskManagers and coordinates their execution. The cluster can consist of standalone machines or virtual machines deployed across multiple physical hosts depending on the size and complexity of the deployment requirements.
Each job contains one or more dataflows consisting of operators that transform input data into output data. Operators can be divided into two categories - sinks and sources. Sinks write the transformed data to external systems while sources read the input data from external systems. The connection between the data flows is called a channel. Flink supports a variety of programming languages like Java, Scala, Python, etc., allowing developers to easily create dataflows using their preferred language.
The runtime environment within each TaskManager is responsible for executing user code and managing local state. The runtime environment also handles communication with other TaskManagers for data exchanges and coordination purposes.
2.2 State Management in Apache Flink
State management refers to the storage of data structures and variables during the processing of streams. In Flink, state is typically stored in key-value stores known as state backends. Keyed state allows users to store values associated with keys in state and retrieve them later based on the key value. Non-keyed state allows storing state without associating it with specific keys.
There are several types of state backends supported in Flink:
Memory State Backend
This backend stores the state in memory and retains it only during the duration of the ApplicationMaster’s life cycle. As long as the application continues running, the state remains retained even if the TaskManager fails unexpectedly. This is suitable for scenarios where the required state retention time is very short or the computation is highly volatile. However, since the state is lost upon failure, it cannot be recovered if the machine crashes.
Memory state backends do not support exactly-once semantic guarantees due to the nature of non-persistent state. Any restart of the ApplicationMaster results in loss of state information. Therefore, they should only be used when absolutely necessary to retain state for extended periods of time.
FsStateBackend
FsStateBackend is a distributed file system based state backend that writes the state to a file system (such as HDFS). This state backend supports both keyed and non-keyed state and provides exactly once semantics. On recovery, the most recent complete checkpoint is restored, leading to exact duplicate free recovery. This makes FsStateBackend ideal for applications requiring strict consistency guarantees. However, the availability of a reliable file system may require careful configuration of replication and failover settings to minimize data loss.
RocksDB State Backend
RocksDB State Backend uses RocksDB as the underlying storage engine for the key-value pairs in state. This backend provides strong consistency guarantees along with high throughput for reads and writes. Additionally, RocksDB State Backend can perform compactions to merge small updates and reduce the space usage of state. To achieve exactly-once guarantees, the RocksDB State Backend takes periodic snapshots of the state and maintains a log of changes made to the state over time. These logs are then applied to the new state during recovery to restore exactly the same view of the state as existed at the end of the previous successful checkpoint. This makes RocksDB State Backend particularly suited for applications with high concurrency and extremely fast recovery times.
However, note that RocksDB State Backend currently requires using static partitioning (i.e., pre-defining the set of partitions ahead of time), which limits scalability and flexibility compared to the dynamic partitioning offered by other state backends. Furthermore, RocksDB State Backend is still under active development and may change in future versions until a stable release is made. For production deployments, we recommend using RocksDB State Backend with care and avoid using it for critical workloads until a stable version becomes available.
2.3 Windowing and Triggering Mechanism in Apache Flink
Windowing enables grouping events together based on certain characteristics such as time, count, content, or location. Windows define a finite size period of time within which a subset of data records is processed together. By doing this, windowing helps to eliminate redundant computations, improve efficiency, and optimize downstream processing.
Windows in Apache Flink are defined using either Time Windows or Count Windows. In Time Windows, windows are defined based on a time interval such as every minute, hour, or day. In Count Windows, windows are defined based on a predefined number of records in each window. Windows also need to specify a trigger that defines when a window will be emitted and aggregated. Different triggers determine whether a record falls within a particular window or not. Common triggers include time-based and count-based triggers. Once a trigger fires, a window will be created and aggregated accordingly.
To implement exactly-once semantics in Flink, we need to carefully choose the triggering strategy and the way data is grouped inside a window. Some common patterns for grouping data inside a window are:
Sliding Window Pattern
Sliding Window pattern involves defining fixed-sized windows that move along with the incoming data. In this pattern, each event belongs to only one window at a given point of time but can belong to multiple subsequent windows.
For example, suppose we want to group data into hourly windows and emit them every five minutes. In this case, we would configure a sliding window with a length of 1 hour and a slide of 5 minutes. Every record that arrives within the first 5 minutes of a new hour would be added to the current window. If another record arrives after 5 minutes, it would be added to the next hour window instead. This means that records from different hours might end up in the same window because they overlap slightly.
Another example of this pattern could be grouping sales transactions based on product categories, with a sliding window of 1 day and a slide of 1 hour. Transactions that occur within the first hour of a new day would be placed in the current window. After 1 hour passes, a new daily window would be started and all transactional data from the previous day would be moved to it.
The downside of this pattern is that it introduces unnecessary latencies in processing. Since each event is assigned to multiple windows, aggregations are performed for each window separately. Moreover, data from older windows is kept around longer than necessary, potentially causing resource wastage.
Session Window Pattern
Session Window pattern involves grouping continuous sequences of events into sessions. Events that are separated by a gap larger than the session timeout are considered to belong to separate sessions.
A typical example of this pattern is monitoring web page activity. In this scenario, we might want to track website visitors who spend more than X minutes on a single page or browse more than Y pages per day. If a visitor spends more than X minutes on a single page, we close out the session and start a new one. Similarly, if a visitor browses more than Y pages per day, we close out the corresponding session and start a new one.
By closing out sessions periodically, we limit the amount of data stored in memory and prevent slowdowns caused by continually accumulating large amounts of data. However, this approach can lead to incorrect results if a batch job is not triggered at the correct time.
An alternative approach to this problem is to assign a unique identifier to each visitor and keep track of the last activity timestamp for each visitor. Whenever a visitor performs an action, we update the timestamp in our state. Periodically, we can scan through our state table and terminate old sessions by checking for those whose last activity was more than N minutes ago. Using this approach, we can enforce exactly-once semantics across multiple batches of data regardless of interruption.