news 2026/2/16 23:40:38

Flink学习笔记:如何做容错

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink学习笔记:如何做容错

现在我们已经了解了 Flink 的状态如何定义和使用,那 Flink 是如何做容错的呢?今天我们一起来了解一下。

先来回答问题, Flink 是通过状态快照来做容错的,在 Flink 中状态快照分为 Checkpoint 和 Savepoint 两种。

Checkpoint

Checkpoint 是一种自动执行的快照,其目的是让 Flink 任务可以从故障中恢复。它可以是增量的,并且为快速恢复进行了优化。

如何开启 Checkpoint

Checkpoint 默认是关闭的,开启的方法很简单,只需要调用 enableCheckpointing() 方法即可。除了这个方法之外,Checkpoint 还有一些高级特性。我们来看几个比较常用的,更多的选项可以查看官方文档。

/* by 01022.hk - online tools website : 01022.hk/zh/checkkeyword.html */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000); // 高级选项: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setExternalizedCheckpointRetention( ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints();
  • CheckpointingMode:支持 EXACTLY_ONCE 和 AT_LEAST_ONCE 两种,精确一次有更好的数据一致性,而至少一次可以提供更低的延迟。

  • MinPauseBetweenCheckpoints:Checkpoint 之间最小间隔时间,单位是毫秒,即前一次 Checkpoint 执行完成之后必须间隔 n 毫秒之后才会开启下一次 Checkpoint。

  • CheckpointTimeout:Checkpoint 超时时间,单位为毫秒,表示 Checkpoint 必须在 n 毫秒内完成,否则就会因超时失败。

  • TolerableCheckpointFailureNumber:可容忍连续失败次数,默认是0。超过这个阈值之后,整个 Flink 作业会触发 fail over。

  • MaxConcurrentCheckpoints:Checkpoint 并发数,默认情况下是1,在同一时间只允许一个 Checkpoint 执行。这个参数不能和最小间隔时间一起使用。

  • ExternalizedCheckpointRetention:周期存储 Checkpoint 到外部存储,这样在任务失败时 Checkpoint 也不会被删除。

  • enableUnalignedCheckpoints:使用非对齐的 Checkpoint,可以减少在产生背压时 Checkpoint 的创建时间。

Checkpoint 存储

Flink 提供了两种存储类型:JobManagerCheckpointStorage 和 FileSystemCheckpointStorage。默认是 JobManagerCheckpointStorage,即将 Checkpoint 快照存储在 JobManager 的堆内存中,也可以设置 Checkpoint 目录,将快照存储在外部存储系统中。

Checkpoint 目录通过 execution.checkpointing.dir 设置项设置。其目录结构如下:

/* by 01022.hk - online tools website : 01022.hk/zh/checkkeyword.html */ /user-defined-checkpoint-dir /{job-id} | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ...
Checkpoint 工作原理

在前文中,我们曾经提到过 Checkpoint Coordinator,它是 JobManager 的其中一个模块。它在 Checkpoint 过程中担任着重要的角色。

现在来看下 Checkpoint 的完整流程

1、Checkpoint Coordinator 触发所有 Source 节点开始 Checkpoint,Source 收到触发命令后,会将自己的 State 进行持久化(图中三角形),并且向下游发送 barrier 事件(图中的小矩形)。当 Source 节点的 State 持久化完成之后,会数据存储的地址发送给 Checkpoint Coordinator。

2、barrier 事件随着事件流传输到下游节点,当下游节点收到所有的上游 barrier 事件后,也会将自己的 State 持久化,并继续向下传播 barrier 事件。持久化完成后,也同样将数据存储地址发送给 Checkpoint Coordinator。

3、当所有的算子都完成持久化过程后,Checkpoint Coordinator 会将一些元数据进行持久化。

至此,一次完整的 Checkpoint 流程就结束了。

Savepoint

学习完 Checkpoint 之后,我们再来了解下另一种快照——Savepoint。

Savepoint 是依据 checkpoint 机制创建的一致性镜像。通常用来做 Flink 作业的重启或更新等运维操作。Savepoint 包含稳定存储上的二进制文件(作业状态的镜像)和元数据文件两部分。

使用 Savepoint

根据官方文档的提示,在我们的程序中,最好显式调用 uid() 方法来为算子指定一个 ID,这些 ID 被用来恢复每个算子的状态。如果不指定的话,Flink 任务会自动生成算子 ID,但是生成的 ID 与程序结构有关,也就是说,如果程序的结构改变了的话,就没有办法从 Savepoint 恢复对应算子的状态了。

有了这个前提条件之后,我们就可以使用命令来操作 Savepoint 了。

// 触发 savepoint $ bin/flink savepoint :jobId [:targetDirectory] // 触发 savepoint, 指定 type,默认是 canonical $ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory] // 触发 savepoint,客户端拿到 trigger id 后立即返回 $ bin/flink savepoint :jobId [:targetDirectory] -detached // 使用 savepoint 停止作业 $ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId // 从 savepoint 恢复 $ bin/flink run -s :savepointPath [:runArgs] // 删除 savepoint $ bin/flink savepoint -d :savepointPath

在 触发 savepoint 时,我们可以指定格式,两种格式的区别是:

  • canonical(标准格式):在任何存储都保持统一格式,重在保证兼容性。

  • native(原生格式):标准格式创建和恢复都很慢,原生格式是以特定的状态后端的格式生成,可以更快的创建和恢复。

Checkpoint 与 Savepoint 区别

这是面试最常见的问题之一,有了 checkpoint,为什么还需要 savepoint?或者说两者之间有什么区别?

从概念上来讲,Checkpoint 类似数据库的恢复日志,而 Savepoint 类似数据库的备份。Checkpoint 主要用于作业故障的恢复,它的管理和删除也都是 Flink 内部处理,用户不需要过多关注。Savepoint 主要用于有计划的手动运维,例如升级 Flink 版本。它的创建、删除操作都需要用户手动执行。

下面是官方文档给出的 Checkpoint 和 Savepoint 支持的操作。✓表示完全支持,x表示不支持,!表示目前有效,但没有正式保证支持,使用时存在一定风险。

操作标准 Savepoint原生 Savepoint对齐 Checkpoint非对齐 Checkpoint
更换状态后端xxx
State Processor API (写)xxx
State Processor API (读)!!x
自包含和可移动xx
Schema 变更!!!
任意 job 升级x
非任意 job 升级
Flink 小版本升级x
Flink bug/patch 版本升级
扩缩容

总结

本文我们介绍了 Flink 是如何做容错的,分别介绍了 Checkpoint 和 Savepoint,以及它们之间的区别。本文多次提到了 Checkpoint 和 Savepoint 依赖的稳定存储,我会在下一篇文章进行详细的介绍。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/15 16:29:52

小程序基于springboot的乡镇普法知识科普宣传系统 律师预约系统设计与实现_qf4cwws6(java毕业设计项目源码)

目录具体实现截图项目介绍论文大纲核心代码部分展示项目运行指导结论源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作具体实现截图 本系统(程序源码数据库调试部署讲解)同时还支持java、ThinkPHP、Node.js、Spring B…

作者头像 李华
网站建设 2026/2/15 16:29:49

面向对象编程三大特性:封装、继承、多态的核心要义

封装 封装是面向对象编程的基本原则,封装要求在编写类时,对外隐藏内部的数据状态和实现细节,仅对外暴露接口,用于外部调用访问。 类的封装,通常类的属性私有(用private修饰符),不对外…

作者头像 李华
网站建设 2026/2/15 16:29:15

leetcode 2147. 分隔长廊的方案数 困难

在一个图书馆的长廊里,有一些座位和装饰植物排成一列。给你一个下标从 0 开始,长度为 n 的字符串 corridor ,它包含字母 S 和 P ,其中每个 S 表示一个座位,每个 P 表示一株植物。在下标 0 的左边和下标 n - 1 的右边 已…

作者头像 李华
网站建设 2026/2/15 16:29:07

学生党必备!这款桌面课表工具太省心了

上课前翻遍手机找课表?担心走神错过上课时间?对于学生党和老师来说,一款顺手的课表工具能省不少事!今天电脑天空要给大家安利的「桌面课表 Class Widget」,就是这样一款精准解决课程管理痛点的桌面小工具。它最戳人的点…

作者头像 李华
网站建设 2026/2/15 16:28:57

深度学习实验14代码

实验前准备 数据集 # make data import random import numpy as np import os # 固定随机种子 random.seed(0) np.random.seed(0)def generate_data(length, k, save_path):if length < 3:raise ValueError("The length of data should be greater than 2.")if k …

作者头像 李华
网站建设 2026/2/15 16:28:46

优化及性能-–-behaviac

原文 优化及性能 宏BEHAVIAC_RELEASE定义的时候是最终版&#xff0c;BEHAVIAC_RELEASE没有定义的时候是为开发版。 在debug版中&#xff0c;BEHAVIAC_RELEASE缺省下没有定义的。诸如logging、socketing、热加载等开发功能是有效的。可以通过behaviac::Config::IsLogging和be…

作者头像 李华