1. 快速开始:依赖与基本骨架
FlinkCEP 不是 Flink binary 的默认组件(集群跑时需要把 jar 链接/分发到集群),你的项目里通常先加 Maven 依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>2.2.0</version></dependency>一个最小可运行的 CEP 骨架是:
- 输入流
DataStream<Event>(建议 event time + watermark) - 定义 Pattern(模式图)
CEP.pattern(input, pattern)得到PatternStream- 用
process(...)/select(...)把匹配输出成业务结果流
另外有个容易忽略的要求:参与匹配的事件对象需要正确实现equals()/hashCode(),因为 CEP 内部会拿它们做比较和匹配。
2. Pattern 的本质:你在画一张“状态机图”
官方说“pattern sequence is a graph”,你可以把它理解为:
你用begin("start") -> next("middle") -> followedBy("end")这些 API,在画一个事件序列状态机;当事件流跑过来时,CEP 会维护大量“部分匹配”(partial match),直到完整走完图,才输出一个 match。
每个 pattern 节点必须有唯一名字(后面从Map<String, List<Event>>里取匹配结果就靠这个名字)。注意:pattern 名字不能包含:。
3. 单个 Pattern 怎么写:条件与类型约束
3.1 where / or:事件是否能“进入”这个节点
where(...):必须满足条件才能被该节点接受- 多次
where()连续调用是 AND or(...)把条件变成 OR
start.where(SimpleCondition.of(e->e.getName().startsWith("foo"))).or(SimpleCondition.of(e->e.getId()==42));3.2 SimpleCondition vs IterativeCondition:要不要看“历史”
SimpleCondition:只看当前事件本身(最快、最简单)IterativeCondition:可以访问同一个 partial match 中先前已接受的事件(功能强,但要注意性能)
middle.oneOrMore().subtype(SubEvent.class).where(newIterativeCondition<SubEvent>(){@Overridepublicbooleanfilter(SubEventvalue,Context<SubEvent>ctx)throwsException{if(!value.getName().startsWith("foo"))returnfalse;doublesum=value.getPrice();for(Eventprev:ctx.getEventsForPattern("middle")){sum+=prev.getPrice();}returnsum<5.0;}});经验建议:ctx.getEventsForPattern(...)的成本会随匹配复杂度上涨,能不用就不用,或者尽量减少遍历次数。
3.3 subtype:类型过滤
如果你的输入流类型是Event,但某个节点只接受SubEvent:
pattern.subtype(SubEvent.class).where(SimpleCondition.of(se->se.getVolume()>=10.0));4. 量词 Quantifier:一次、N 次、范围、可选、贪婪
CEP 里“单节点”默认只接收 1 条事件(singleton)。如果你要让一个节点接收多条(looping pattern),就用量词:
oneOrMore():至少 1 次(b+)times(n):恰好 n 次times(from, to):范围次数timesOrMore(n):至少 n 次optional():可出现 0 次greedy():尽可能多地吃(当前只支持量词节点,不支持 group 贪婪)
你可以把它当成正则里的+ ? {m,n}:
start.times(2,4).optional().greedy();重要提醒:对 looping pattern(oneOrMore/times)强烈建议搭配within()或until()来清理状态,不然在高吞吐长时间运行里,partial match 会持续增长,状态压力会很大。
5. Pattern 之间的连续性:next / followedBy / followedByAny
这是 CEP 最“容易写错”的点,因为写出来都能跑,但输出差别巨大。
5.1 next:严格连续(Strict Contiguity)
next("b")要求 b 必须紧挨着 a,中间不能有任何不匹配事件。
5.2 followedBy:宽松连续(Relaxed Contiguity)
允许中间插入无关事件,语义更像“跳过不匹配直到下一个匹配”。
5.3 followedByAny:非确定性宽松(Non-deterministic Relaxed)
不仅允许插入无关事件,还会产生更多组合匹配(同一个 start 可以对应多个 middle/end),匹配数量可能爆炸式增长。
经典对比:pattern “a b”,输入a, c, b1, b2
next:无匹配(c 破坏连续)followedBy:只匹配{a b1}followedByAny:匹配{a b1}和{a b2}
5.4 NOT 模式:notNext / notFollowedBy
notNext("x"):紧接着不能出现 x,否则丢弃该 partial matchnotFollowedBy("x"):在两段之间任意位置不能出现 x
注意两条限制:
- pattern sequence 如果末尾是
notFollowedBy(),必须配within() - NOT pattern 不能跟在 optional pattern 后面
6. looping pattern 内部连续性:consecutive 与 allowCombinations
当你写oneOrMore()这种“多次”节点时,节点内部默认是 relaxed contiguity。
如果你希望“这些 repeated 事件必须紧挨着”,用consecutive():
.oneOrMore().consecutive()如果你希望“重复节点内部也产生更多组合”(类似 followedByAny 的组合爆炸),用allowCombinations():
.oneOrMore().allowCombinations()工程上要谨慎:allowCombinations()很容易导致匹配结果数量急剧上升,尤其在高基数 key 或热点 key 下会放大状态与 CPU。
7. within:给整个 pattern sequence 加时间窗口
within(Duration.ofSeconds(10))表示:从该 partial match 开始到完成匹配,必须在 10 秒内,否则丢弃(并且你可以捕获“超时 partial match”,后面会讲)。
一个 pattern sequence 只能有一个时间约束,如果你在不同节点上写多个,最终会取最小的那个。
8. AfterMatchSkipStrategy:控制“一个事件被复用到多少个匹配”
CEP 的默认行为是:同一条事件可以参与多个成功匹配。为了控制结果数量与业务语义,需要 skip strategy。
常用五种:
noSkip():全输出(最多)skipToNext():输出一个 match 后,丢掉“和这个 match 共享同一起点事件”的其他 partial match(适合避免同起点产生多结果)skipPastLastEvent():输出一个 match 后,丢掉“在该 match 覆盖范围内启动的所有 partial match”(最激进,结果最少)skipToFirst("patternName"):跳到某节点第一次出现的位置skipToLast("patternName"):跳到某节点最后一次出现的位置
设置方式:
AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.begin("start",skip).where(...).followedBy("middle").where(...).followedBy("end").where(...);实战建议:
- 你只想要“最典型的一条告警”,别让同一起点产生一堆结果:优先考虑
skipToNext() - 你只想要“完全不重叠的匹配”:优先考虑
skipPastLastEvent() - 如果你的模式里有
oneOrMore(),默认noSkip()可能会让结果量很夸张,务必明确选择策略
9. 输出与处理:推荐用 PatternProcessFunction(并处理超时)
9.1 processMatch:每次完整匹配触发一次
processMatch收到的是:
Map<String, List<IN>> match
key 是 pattern 名字,value 是该节点接收的事件列表(因为 looping 节点可能接收多条)。
DataStream<Alert>result=patternStream.process(newPatternProcessFunction<Event,Alert>(){@OverridepublicvoidprocessMatch(Map<String,List<Event>>pattern,Contextctx,Collector<Alert>out){Eventstart=pattern.get("start").get(0);Eventend=pattern.get("end").get(0);out.collect(newAlert(start,end));}});Context 里还能拿到时间信息(processing time / timestamp 等),并支持 side output。
9.2 超时 partial match:TimedOutPartialMatchHandler(用 side output 旁路)
只要你用了within(...),就可能发生“开始了但没完成就超时”的 partial match。可以用 mixin 方式实现TimedOutPartialMatchHandler:
OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>main=patternStream.process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){out.collect(createAlert(match));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventstart=match.get("start").get(0);ctx.output(timeoutTag,newTimeoutEvent(start));}});DataStream<TimeoutEvent>timeoutStream=main.getSideOutput(timeoutTag);注意:processTimedOutMatch不能写主输出,只能用 side output。
9.3 旧 API:select / flatSelect 仍可用,但底层会转成 PatternProcessFunction
新项目建议直接用process(...),逻辑更直观,能力也更完整。
10. Event Time 下迟到数据:CEP 假设 watermark 正确
CEP 对 event time 的处理逻辑是:
- 元素先进入 buffer,按 timestamp 排序
- watermark 到来时,处理 timestamp < watermark 的元素
- timestamp 小于“最后看到的 watermark”的事件,被认为是 late element,不再参与匹配
如果你不想让迟到数据悄悄丢掉,可以用sideOutputLateData:
OutputTag<Event>lateTag=newOutputTag<>("late-data"){};SingleOutputStreamOperator<Alert>out=patternStream.sideOutputLateData(lateTag).select(newPatternSelectFunction<Event,Alert>(){...});DataStream<Event>late=out.getSideOutput(lateTag);工程建议:如果业务允许一定乱序,一定要把 watermark 策略和 allowed lateness 设计好;CEP 本身是“以 watermark 为分界线”的。
11. 性能与内存:SharedBuffer cache 参数什么时候有用
CEP 内部维护 SharedBuffer 来保存 partial matches 与事件引用。官方给了三项 cache 配置:
pipeline.cep.sharedbuffer.cache.entry-slotspipeline.cep.sharedbuffer.cache.event-slotspipeline.cep.sharedbuffer.cache.statistics-interval
关键点是:这些 cache 主要在 state backend = RocksDB 时用于限制纯内存占用,超过 cache 的部分会被“换出”到 RocksDB state 里。
反过来,如果你不是 RocksDB(例如 heap hashmap state),开启 cache 反而可能拖慢性能(copy-on-write 等开销变重)。
一句话策略:
- RocksDB:可以用 cache slots 控制内存并换取可控的吞吐
- 非 RocksDB:谨慎开启,先压测再决定
12. 一个更完整的示例:keyBy + within + 超时 + 迟到旁路
下面示意一个常见告警:同一个 id 的事件流中,出现error -> critical,要求 10 秒内完成。
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Event>input=...;// 建议设置 watermarkDataStream<Event>keyed=input.keyBy(Event::getId);AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.<Event>begin("start",skip).next("middle").where(SimpleCondition.of(e->"error".equals(e.getName()))).followedBy("end").where(SimpleCondition.of(e->"critical".equals(e.getName()))).within(Duration.ofSeconds(10));PatternStream<Event>ps=CEP.pattern(keyed,pattern);OutputTag<Event>lateTag=newOutputTag<>("late"){};OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>alerts=ps.sideOutputLateData(lateTag).process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){Eventerr=match.get("middle").get(0);Eventcri=match.get("end").get(0);out.collect(newAlert(err,cri));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventerr=match.get("middle").get(0);ctx.output(timeoutTag,newTimeoutEvent(err));}});DataStream<Event>late=alerts.getSideOutput(lateTag);DataStream<TimeoutEvent>timeouts=alerts.getSideOutput(timeoutTag);你可以按业务需要,把next/followedBy调整为更严格或更宽松的连续性,并且用 skip strategy 控制输出爆炸。
13. 迁移提示:老版本 savepoint
如果你需要从 Flink <= 1.5 的 savepoint 恢复,官方策略是:先迁移到 1.6–1.12,重新打 savepoint,再用 Flink >= 1.13 恢复(因为 1.13 起不再兼容 <=1.5 的 savepoint)。
结尾:写 CEP 最容易翻车的 3 件事
- 连续性没想清楚:
followedByAny直接把输出量放大好几倍 - looping pattern 不加
within()/until():状态长期累积 - event time watermark 设计不当:迟到数据悄悄被 CEP 当 late 丢掉(建议 sideOutputLateData 兜底)