在 Apache Flink 中,Pattern 是 Flink CEP(Complex Event Processing)模块 的核心概念之一。它用于定义你希望从数据流中检测出的 事件序列模式(Event Sequence Pattern)。
🎯 一、什么是 Flink Pattern?
Pattern
是对一系列事件行为的描述规则,用来匹配流中符合某种顺序、条件或时间范围的事件组合。
你可以用 Pattern
来表示:
- 用户连续登录失败
- 某个设备短时间内多次报警
- 用户点击 A → B → C 的行为路径
- 异常交易行为等
🧱 二、Pattern 的基本结构
一个完整的 Pattern
通常由以下几部分组成:
组成部分 | 描述 |
---|---|
名称(Name) | 为每个模式步骤命名,便于后续提取结果 |
条件(Condition) | 定义该步骤需满足的事件属性条件 |
数量限定(Quantifier) | 控制事件出现次数(如 oneOrMore, times(3)) |
时间限制(Time Limit) | 设置整个模式匹配的最大时间窗口(within) |
🔍 三、Pattern 示例解析
示例目标:
识别“用户在10秒内连续登录失败超过3次”的异常行为
Pattern<Event, ?> pattern = Pattern.<Event>begin("开始")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("登录失败");
}
})
.times(3)
.within(Time.seconds(10));
✅ 解释:
部分 | 含义 |
---|---|
.begin("开始") |
定义第一个匹配步骤,命名为 “开始” |
.where(...) |
匹配事件类型为“登录失败” |
.times(3) |
要求连续发生3次 |
.within(Time.seconds(10)) |
整个匹配必须在10秒内完成 |
🧩 四、Pattern 的常用方法详解
1. 起始和连接模式
方法 | 说明 |
---|---|
begin("name") |
定义模式起始条件 |
next("name") |
严格近邻:要求下一个事件紧接上一个之后 |
followedBy("name") |
非严格近邻:允许中间有其他事件 |
notNext() / notFollowedBy() |
排除某个事件出现 |
// 严格顺序:A 后面必须是 B,不能有其他事件插入
Pattern<Event, ?> strictPattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("A"); }
})
.next("middle")
.where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("B"); }
});
2. 事件出现次数控制(Quantifiers)
方法 | 描述 |
---|---|
.times(n) |
精确匹配 n 次 |
.oneOrMore() |
至少一次 |
.times(2, 4) |
出现 2~4 次 |
.optional() |
可选匹配,可有可无 |
.greedy() |
贪婪匹配(尽可能多匹配) |
Pattern<Event, ?> optionalPattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("A"); }
})
.followedBy("maybe").where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("B"); }
}).optional() // 可选步骤
.followedBy("end").where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("C"); }
});
3. 时间约束(Time Constraints)
方法 | 描述 |
---|---|
.within(Time.time) |
模式匹配必须在这个时间窗口内完成 |
.withinWindow(Time.time) |
设置单步之间的时间间隔(仅限某些版本) |
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
public boolean filter(Event event) { return event.getType().equals("A"); }
})
.times(2)
.within(Time.seconds(5)); // 两次 A 必须在5秒内出现
📌 五、完整 Java 示例代码
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
public class FlinkPatternExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟输入事件流
DataStream<Event> eventStream = env.fromElements(
new Event("userA", "登录失败", 1000L),
new Event("userA", "登录失败", 2000L),
new Event("userA", "登录失败", 3000L),
new Event("userA", "登录成功", 4000L)
);
// 定义 Pattern:连续3次登录失败,在10秒内
Pattern<Event, ?> pattern = Pattern.<Event>begin("开始")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getType().equals("登录失败");
}
})
.times(3)
.within(Time.seconds(10));
// 将 Pattern 应用于数据流
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
// 提取并处理匹配到的事件
patternStream.select(new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> patternMap) throws Exception {
List<Event> events = patternMap.get("开始");
return "发现异常!用户 [" + events.get(0).userId + "] 在 10 秒内连续登录失败:" + events.size() + " 次";
}
}).print();
env.execute("Flink Pattern Example");
}
// 事件类
public static class Event {
public String userId;
public String type;
public long timestamp;
public Event(String userId, String type, long timestamp) {
this.userId = userId;
this.type = type;
this.timestamp = timestamp;
}
public String getType() {
return type;
}
public String getUserId() {
return userId;
}
@Override
public String toString() {
return "{userId: " + userId + ", type: " + type + ", timestamp: " + timestamp + "}";
}
}
}
📊 六、运行输出示例
发现异常!用户 [userA] 在 10 秒内连续登录失败:3 次
⚙️ 七、Pattern 使用建议
场景 | 建议 |
---|---|
多步骤行为分析 | 使用 begin().next().followedBy() 构建清晰逻辑 |
异常检测 | 结合 times() 和 within() 控制频率 |
排除特定事件 | 使用 notFollowedBy() |
复杂状态流转 | 使用 begin().where(...).followedBy(...).where(...) |
性能优化 | 设置合理的时间窗口,避免状态无限增长 |
✅ 八、Pattern 的作用总结
功能 | 说明 |
---|---|
行为识别 | 如用户操作路径、漏斗转化率 |
异常检测 | 如频繁请求、登录失败、支付异常 |
业务规则匹配 | 如风控策略、营销活动触发条件 |
流式规则引擎 | 实时判断是否符合预设逻辑 |
与 Flink 状态结合 | 支持高并发、低延迟的状态化检测 |
🧠 九、Pattern 与其他组件的关系
组件 | 作用 |
---|---|
Pattern |
定义要检测的事件序列规则 |
PatternStream |
表示匹配成功的事件流 |
CEP.pattern(stream, pattern) |
将 Pattern 应用于原始流 |
select() / process() |
对匹配结果进行处理 |
📘 十、扩展学习方向
如果你希望我为你演示以下内容,请继续提问:
- Flink Pattern 与 Kafka 集成实战
- 带超时处理的 Pattern(如未完成则触发告警)
- 使用侧输出(sideOutput)处理未匹配的事件
- 多个 Pattern 的组合使用(OR、AND、NOT)
- 自定义 Pattern 匹配逻辑(使用
IterativeCondition
)
📌 一句话总结:
Flink Pattern 是一种用于描述事件序列匹配规则的 DSL,它是构建实时行为识别、风控系统、日志分析的核心工具。