news 2026/1/16 4:01:04

Flink CEP Pattern API、连续性、跳过策略、超时与迟到数据一篇讲透

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink CEP Pattern API、连续性、跳过策略、超时与迟到数据一篇讲透

1. 快速开始:依赖与基本骨架

FlinkCEP 不是 Flink binary 的默认组件(集群跑时需要把 jar 链接/分发到集群),你的项目里通常先加 Maven 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>2.2.0</version></dependency>

一个最小可运行的 CEP 骨架是:

  1. 输入流DataStream<Event>(建议 event time + watermark)
  2. 定义 Pattern(模式图)
  3. CEP.pattern(input, pattern)得到PatternStream
  4. process(...)/select(...)把匹配输出成业务结果流

另外有个容易忽略的要求:参与匹配的事件对象需要正确实现equals()/hashCode(),因为 CEP 内部会拿它们做比较和匹配。

2. Pattern 的本质:你在画一张“状态机图”

官方说“pattern sequence is a graph”,你可以把它理解为:
你用begin("start") -> next("middle") -> followedBy("end")这些 API,在画一个事件序列状态机;当事件流跑过来时,CEP 会维护大量“部分匹配”(partial match),直到完整走完图,才输出一个 match。

每个 pattern 节点必须有唯一名字(后面从Map<String, List<Event>>里取匹配结果就靠这个名字)。注意:pattern 名字不能包含:

3. 单个 Pattern 怎么写:条件与类型约束

3.1 where / or:事件是否能“进入”这个节点

  • where(...):必须满足条件才能被该节点接受
  • 多次where()连续调用是 AND
  • or(...)把条件变成 OR
start.where(SimpleCondition.of(e->e.getName().startsWith("foo"))).or(SimpleCondition.of(e->e.getId()==42));

3.2 SimpleCondition vs IterativeCondition:要不要看“历史”

  • SimpleCondition:只看当前事件本身(最快、最简单)
  • IterativeCondition:可以访问同一个 partial match 中先前已接受的事件(功能强,但要注意性能)
middle.oneOrMore().subtype(SubEvent.class).where(newIterativeCondition<SubEvent>(){@Overridepublicbooleanfilter(SubEventvalue,Context<SubEvent>ctx)throwsException{if(!value.getName().startsWith("foo"))returnfalse;doublesum=value.getPrice();for(Eventprev:ctx.getEventsForPattern("middle")){sum+=prev.getPrice();}returnsum<5.0;}});

经验建议:ctx.getEventsForPattern(...)的成本会随匹配复杂度上涨,能不用就不用,或者尽量减少遍历次数。

3.3 subtype:类型过滤

如果你的输入流类型是Event,但某个节点只接受SubEvent

pattern.subtype(SubEvent.class).where(SimpleCondition.of(se->se.getVolume()>=10.0));

4. 量词 Quantifier:一次、N 次、范围、可选、贪婪

CEP 里“单节点”默认只接收 1 条事件(singleton)。如果你要让一个节点接收多条(looping pattern),就用量词:

  • oneOrMore():至少 1 次(b+)
  • times(n):恰好 n 次
  • times(from, to):范围次数
  • timesOrMore(n):至少 n 次
  • optional():可出现 0 次
  • greedy():尽可能多地吃(当前只支持量词节点,不支持 group 贪婪)

你可以把它当成正则里的+ ? {m,n}

start.times(2,4).optional().greedy();

重要提醒:对 looping pattern(oneOrMore/times)强烈建议搭配within()until()来清理状态,不然在高吞吐长时间运行里,partial match 会持续增长,状态压力会很大。

5. Pattern 之间的连续性:next / followedBy / followedByAny

这是 CEP 最“容易写错”的点,因为写出来都能跑,但输出差别巨大。

5.1 next:严格连续(Strict Contiguity)

next("b")要求 b 必须紧挨着 a,中间不能有任何不匹配事件。

5.2 followedBy:宽松连续(Relaxed Contiguity)

允许中间插入无关事件,语义更像“跳过不匹配直到下一个匹配”。

5.3 followedByAny:非确定性宽松(Non-deterministic Relaxed)

不仅允许插入无关事件,还会产生更多组合匹配(同一个 start 可以对应多个 middle/end),匹配数量可能爆炸式增长。

经典对比:pattern “a b”,输入a, c, b1, b2

  • next:无匹配(c 破坏连续)
  • followedBy:只匹配{a b1}
  • followedByAny:匹配{a b1}{a b2}

5.4 NOT 模式:notNext / notFollowedBy

  • notNext("x"):紧接着不能出现 x,否则丢弃该 partial match
  • notFollowedBy("x"):在两段之间任意位置不能出现 x

注意两条限制:

  • pattern sequence 如果末尾是notFollowedBy(),必须配within()
  • NOT pattern 不能跟在 optional pattern 后面

6. looping pattern 内部连续性:consecutive 与 allowCombinations

当你写oneOrMore()这种“多次”节点时,节点内部默认是 relaxed contiguity。

如果你希望“这些 repeated 事件必须紧挨着”,用consecutive()

.oneOrMore().consecutive()

如果你希望“重复节点内部也产生更多组合”(类似 followedByAny 的组合爆炸),用allowCombinations()

.oneOrMore().allowCombinations()

工程上要谨慎:allowCombinations()很容易导致匹配结果数量急剧上升,尤其在高基数 key 或热点 key 下会放大状态与 CPU。

7. within:给整个 pattern sequence 加时间窗口

within(Duration.ofSeconds(10))表示:从该 partial match 开始到完成匹配,必须在 10 秒内,否则丢弃(并且你可以捕获“超时 partial match”,后面会讲)。

一个 pattern sequence 只能有一个时间约束,如果你在不同节点上写多个,最终会取最小的那个。

8. AfterMatchSkipStrategy:控制“一个事件被复用到多少个匹配”

CEP 的默认行为是:同一条事件可以参与多个成功匹配。为了控制结果数量与业务语义,需要 skip strategy。

常用五种:

  • noSkip():全输出(最多)
  • skipToNext():输出一个 match 后,丢掉“和这个 match 共享同一起点事件”的其他 partial match(适合避免同起点产生多结果)
  • skipPastLastEvent():输出一个 match 后,丢掉“在该 match 覆盖范围内启动的所有 partial match”(最激进,结果最少)
  • skipToFirst("patternName"):跳到某节点第一次出现的位置
  • skipToLast("patternName"):跳到某节点最后一次出现的位置

设置方式:

AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.begin("start",skip).where(...).followedBy("middle").where(...).followedBy("end").where(...);

实战建议:

  • 你只想要“最典型的一条告警”,别让同一起点产生一堆结果:优先考虑skipToNext()
  • 你只想要“完全不重叠的匹配”:优先考虑skipPastLastEvent()
  • 如果你的模式里有oneOrMore(),默认noSkip()可能会让结果量很夸张,务必明确选择策略

9. 输出与处理:推荐用 PatternProcessFunction(并处理超时)

9.1 processMatch:每次完整匹配触发一次

processMatch收到的是:

Map<String, List<IN>> match

key 是 pattern 名字,value 是该节点接收的事件列表(因为 looping 节点可能接收多条)。

DataStream<Alert>result=patternStream.process(newPatternProcessFunction<Event,Alert>(){@OverridepublicvoidprocessMatch(Map<String,List<Event>>pattern,Contextctx,Collector<Alert>out){Eventstart=pattern.get("start").get(0);Eventend=pattern.get("end").get(0);out.collect(newAlert(start,end));}});

Context 里还能拿到时间信息(processing time / timestamp 等),并支持 side output。

9.2 超时 partial match:TimedOutPartialMatchHandler(用 side output 旁路)

只要你用了within(...),就可能发生“开始了但没完成就超时”的 partial match。可以用 mixin 方式实现TimedOutPartialMatchHandler

OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>main=patternStream.process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){out.collect(createAlert(match));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventstart=match.get("start").get(0);ctx.output(timeoutTag,newTimeoutEvent(start));}});DataStream<TimeoutEvent>timeoutStream=main.getSideOutput(timeoutTag);

注意:processTimedOutMatch不能写主输出,只能用 side output。

9.3 旧 API:select / flatSelect 仍可用,但底层会转成 PatternProcessFunction

新项目建议直接用process(...),逻辑更直观,能力也更完整。

10. Event Time 下迟到数据:CEP 假设 watermark 正确

CEP 对 event time 的处理逻辑是:

  • 元素先进入 buffer,按 timestamp 排序
  • watermark 到来时,处理 timestamp < watermark 的元素
  • timestamp 小于“最后看到的 watermark”的事件,被认为是 late element,不再参与匹配

如果你不想让迟到数据悄悄丢掉,可以用sideOutputLateData

OutputTag<Event>lateTag=newOutputTag<>("late-data"){};SingleOutputStreamOperator<Alert>out=patternStream.sideOutputLateData(lateTag).select(newPatternSelectFunction<Event,Alert>(){...});DataStream<Event>late=out.getSideOutput(lateTag);

工程建议:如果业务允许一定乱序,一定要把 watermark 策略和 allowed lateness 设计好;CEP 本身是“以 watermark 为分界线”的。

11. 性能与内存:SharedBuffer cache 参数什么时候有用

CEP 内部维护 SharedBuffer 来保存 partial matches 与事件引用。官方给了三项 cache 配置:

  • pipeline.cep.sharedbuffer.cache.entry-slots
  • pipeline.cep.sharedbuffer.cache.event-slots
  • pipeline.cep.sharedbuffer.cache.statistics-interval

关键点是:这些 cache 主要在 state backend = RocksDB 时用于限制纯内存占用,超过 cache 的部分会被“换出”到 RocksDB state 里。

反过来,如果你不是 RocksDB(例如 heap hashmap state),开启 cache 反而可能拖慢性能(copy-on-write 等开销变重)。

一句话策略:

  • RocksDB:可以用 cache slots 控制内存并换取可控的吞吐
  • 非 RocksDB:谨慎开启,先压测再决定

12. 一个更完整的示例:keyBy + within + 超时 + 迟到旁路

下面示意一个常见告警:同一个 id 的事件流中,出现error -> critical,要求 10 秒内完成。

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Event>input=...;// 建议设置 watermarkDataStream<Event>keyed=input.keyBy(Event::getId);AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.<Event>begin("start",skip).next("middle").where(SimpleCondition.of(e->"error".equals(e.getName()))).followedBy("end").where(SimpleCondition.of(e->"critical".equals(e.getName()))).within(Duration.ofSeconds(10));PatternStream<Event>ps=CEP.pattern(keyed,pattern);OutputTag<Event>lateTag=newOutputTag<>("late"){};OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>alerts=ps.sideOutputLateData(lateTag).process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){Eventerr=match.get("middle").get(0);Eventcri=match.get("end").get(0);out.collect(newAlert(err,cri));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventerr=match.get("middle").get(0);ctx.output(timeoutTag,newTimeoutEvent(err));}});DataStream<Event>late=alerts.getSideOutput(lateTag);DataStream<TimeoutEvent>timeouts=alerts.getSideOutput(timeoutTag);

你可以按业务需要,把next/followedBy调整为更严格或更宽松的连续性,并且用 skip strategy 控制输出爆炸。

13. 迁移提示:老版本 savepoint

如果你需要从 Flink <= 1.5 的 savepoint 恢复,官方策略是:先迁移到 1.6–1.12,重新打 savepoint,再用 Flink >= 1.13 恢复(因为 1.13 起不再兼容 <=1.5 的 savepoint)。

结尾:写 CEP 最容易翻车的 3 件事

  1. 连续性没想清楚:followedByAny直接把输出量放大好几倍
  2. looping pattern 不加within()/until():状态长期累积
  3. event time watermark 设计不当:迟到数据悄悄被 CEP 当 late 丢掉(建议 sideOutputLateData 兜底)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/15 9:27:57

SeedVR2-7B:让模糊视频重获新生的AI修复神器

SeedVR2-7B&#xff1a;让模糊视频重获新生的AI修复神器 【免费下载链接】SeedVR2-7B 项目地址: https://ai.gitcode.com/hf_mirrors/ByteDance-Seed/SeedVR2-7B 想要让那些模糊不清的视频文件焕发新生吗&#xff1f;SeedVR2-7B作为字节跳动推出的新一代AI视频修复模型…

作者头像 李华
网站建设 2026/1/14 7:59:12

AI万能分类器高级教程:自定义分类规则设置

AI万能分类器高级教程&#xff1a;自定义分类规则设置 1. 引言 在当今信息爆炸的时代&#xff0c;文本数据的自动化处理已成为企业提升效率的关键环节。无论是客服工单、用户反馈还是社交媒体舆情&#xff0c;如何快速准确地对海量文本进行归类&#xff0c;是智能系统面临的核…

作者头像 李华
网站建设 2026/1/15 8:35:17

AI万能分类器高级教程:自定义模型参数调优方法

AI万能分类器高级教程&#xff1a;自定义模型参数调优方法 1. 引言&#xff1a;构建智能文本分类的零样本范式 随着自然语言处理技术的发展&#xff0c;传统基于监督学习的文本分类方法面临数据标注成本高、泛化能力弱等挑战。在实际业务场景中&#xff0c;如工单系统、客服对…

作者头像 李华
网站建设 2026/1/14 11:59:40

3D Slicer终极指南:从入门到精通的医学影像处理全流程

3D Slicer终极指南&#xff1a;从入门到精通的医学影像处理全流程 【免费下载链接】Slicer Multi-platform, free open source software for visualization and image computing. 项目地址: https://gitcode.com/gh_mirrors/sl/Slicer 在当今数字化医疗时代&#xff0c;…

作者头像 李华
网站建设 2026/1/15 1:58:23

AiPPT终极配置指南:零基础快速打造智能PPT生成系统

AiPPT终极配置指南&#xff1a;零基础快速打造智能PPT生成系统 【免费下载链接】AiPPT AI 智能生成 PPT&#xff0c;通过主题/文件/网址等方式生成PPT&#xff0c;支持原生图表、动画、3D特效等复杂PPT的解析和渲染&#xff0c;支持用户自定义模板&#xff0c;支持智能添加动画…

作者头像 李华