news 2026/2/28 4:02:40

Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

1. Procedures 是什么,适合做什么

Procedure 可以理解为:SQL 世界里的“存储过程”,但执行体可以启动 Flink 作业

典型用途

  • 管理类:生成测试数据、重建/维护某些资源、触发后台作业
  • 数据操作类:一键跑一个数据准备/清洗/校验 Job,并把结果以表的形式返回
  • 平台化:把一堆“运维脚本/管理逻辑”收敛到 Catalog 中,让用户统一用 SQLCALL调用

2. 实现规则:必须实现Procedure接口 + 定义call(...)

2.1 类要求

  • 实现org.apache.flink.table.procedures.Procedure
  • 类必须public、非抽象、全局可访问
  • 不能是匿名类、非 static 内部类

2.2call(...)方法要求(最关键)

接口本身不定义方法,你需要自己定义名为call的方法:

硬性规则

  • call必须public

  • 第一个参数必须是ProcedureContext

    • context.getExecutionEnvironment()拿到StreamExecutionEnvironment
  • 返回类型必须是数组int[]String[]Row[]Long[]

而且 JVM 普通重载规则都适用

  • 支持重载:call(ctx, int)/call(ctx, String)
  • 支持 varargs:call(ctx, Integer...)
  • 支持继承入参:call(ctx, Object)

如果你用 Scala

  • varargs 需要加scala.annotation.varargs
  • 建议用装箱类型(java.lang.Integer而不是Int)以支持 NULL

3. 一个最小可用的 Procedure 示例:生成序列

下面这个示例展示了:Procedure 拿到StreamExecutionEnvironment,用fromSequence跑一个小 Job,然后把结果收集为数组返回。

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.util.CloseableIterator;publicclassGenerateSequenceProcedureimplementsProcedure{publiclong[]call(ProcedureContextcontext,intn)throwsException{returngenerate(context.getExecutionEnvironment(),n);}publiclong[]call(ProcedureContextcontext,Stringn)throwsException{returngenerate(context.getExecutionEnvironment(),Integer.parseInt(n));}privatelong[]generate(StreamExecutionEnvironmentenv,intn)throwsException{long[]sequenceN=newlong[n];inti=0;try(CloseableIterator<Long>it=env.fromSequence(0,n-1).executeAndCollect()){while(it.hasNext()){sequenceN[i++]=it.next();}}returnsequenceN;}}

要点

  • call可以重载
  • context.getExecutionEnvironment()获取执行环境
  • 结果必须是数组

4. 类型推断 Type Inference:为什么你有时必须加注解

Flink Table/SQL 是强类型生态,因此 Procedure 的参数/返回值需要映射成 Flink DataType。

Flink 会通过反射自动推断(Automatic Type Inference),但在以下情况经常需要你“补注解”

  • 小数精度/scale(DECIMAL)
  • 嵌套 Row 类型(ROW<…>)
  • RAW/自定义序列化对象
  • 一个call想吃多种输入类型、但希望统一输出类型(用@ProcedureHint

4.1@DataTypeHint:给参数或返回值补充类型信息

注意:call的返回值必须是T[],如果你给返回值加@DataTypeHint,其实标注的是数组元素类型 T

importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.InputGroup;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;importjava.math.BigDecimal;importjava.nio.ByteBuffer;importjava.time.Instant;publicclassOverloadedProcedureimplementsProcedure{publicLong[]call(ProcedureContextcontext,longa,longb){returnnewLong[]{a+b};}public@DataTypeHint("DECIMAL(12, 3)")BigDecimal[]call(ProcedureContextcontext,doublea,doubleb){returnnewBigDecimal[]{BigDecimal.valueOf(a+b)};}@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")publicRow[]call(ProcedureContextcontext,inti){returnnewRow[]{Row.of(String.valueOf(i),Instant.ofEpochSecond(i))};}@DataTypeHint(value="RAW",bridgedTo=ByteBuffer.class)publicByteBuffer[]call(ProcedureContextcontext,@DataTypeHint(inputGroup=InputGroup.ANY)Objecto){returnnewByteBuffer[]{MyUtils.serializeToByteBuffer(o)};}}

4.2@ProcedureHint:把“输入类型 -> 输出类型”的映射说清楚

适合场景

  • 一个call想统一处理多输入类型(例如Object...
  • 多个重载call的输出类型一致,想全局声明一次
importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.ProcedureHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;@ProcedureHint(output=@DataTypeHint("ROW<s STRING, i INT>"))publicclassSumProcedureimplementsProcedure{publicRow[]call(ProcedureContextcontext,inta,intb){returnnewRow[]{Row.of("Sum",a+b)};}publicRow[]call(ProcedureContextcontext){returnnewRow[]{Row.of("Empty args",-1)};}}

更极端的用法:完全用 hint 决定类型,call只要 JVM 能调用即可。

5. 命名参数 Named Parameters:让 CALL 更可读,也能省参数

调用 procedure 时可以用“命名参数”,好处

  • 不怕参数顺序写错
  • 可选参数可省略(默认补null
  • 可读性强(适合平台化)

通过@ArgumentHint标注参数名、类型、是否可选。

参数级别标注示例

importorg.apache.flink.table.annotation.ArgumentHint;importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;publicclassNamedParameterProcedureimplementsProcedure{public@DataTypeHint("INT")Integer[]call(ProcedureContextcontext,@ArgumentHint(name="a",isOptional=true)Integera,@ArgumentHint(name="b")Integerb){returnnewInteger[]{a+(b==null?0:b)};}}

重要限制

  • 命名参数仅在没有重载、没有可变参数(varargs)时生效
  • @ArgumentHint已包含@DataTypeHint,在某些组合场景下不能混用(按文档要求)

6. 把 Procedure 放进 Catalog:getProcedure+listProcedures

Procedure 必须存在于 Catalog 才能被CALL

你需要在 Catalog 中实现:

  • Catalog.getProcedure(ObjectPath procedurePath):返回 procedure 实例
  • Catalog.listProcedures(String dbName):列出该库下有哪些 procedure

示例(内存 catalog 内置 procedure)

importorg.apache.flink.table.catalog.GenericInMemoryCatalog;importorg.apache.flink.table.catalog.ObjectPath;importorg.apache.flink.table.catalog.exceptions.CatalogException;importorg.apache.flink.table.catalog.exceptions.DatabaseNotExistException;importorg.apache.flink.table.catalog.exceptions.ProcedureNotExistException;importorg.apache.flink.table.procedures.Procedure;importjava.util.*;importjava.util.stream.Collectors;publicclassCatalogWithBuiltInProcedureextendsGenericInMemoryCatalog{privatestaticfinalMap<ObjectPath,Procedure>PROCEDURE_MAP=newHashMap<>();static{PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"),newGenerateSequenceProcedure());}publicCatalogWithBuiltInProcedure(Stringname){super(name);}@OverridepublicList<String>listProcedures(StringdbName)throwsDatabaseNotExistException,CatalogException{if(!databaseExists(dbName)){thrownewDatabaseNotExistException(getName(),dbName);}returnPROCEDURE_MAP.keySet().stream().filter(p->p.getDatabaseName().equals(dbName)).map(ObjectPath::getObjectName).collect(Collectors.toList());}@OverridepublicProceduregetProcedure(ObjectPathprocedurePath)throwsProcedureNotExistException,CatalogException{Procedurep=PROCEDURE_MAP.get(procedurePath);if(p!=null){returnp;}thrownewProcedureNotExistException(getName(),procedurePath);}}

7. SQL 调用:CALL catalog.db.proc(args...)

注册 Catalog 后就能调用:

TableEnvironmenttEnv=TableEnvironment.create(...);tEnv.registerCatalog("my_catalog",newCatalogWithBuiltInProcedure("my_catalog"));// 调用tEnv.executeSql("CALL my_catalog.`system`.generate_n(5)");

SQL 侧一般就是

  • CALL my_catalog.\system.generate_n(5)
  • 或者用命名参数(如果你的 procedure 支持且没有重载/varargs)

8. 实战建议:什么时候用 Procedure,什么时候别用

推荐用 Procedure

  • 平台里做“管理命令”:一键生成数据、触发离线/流式任务、数据质量检查
  • 把复杂逻辑隐藏在 Procedure 里,让用户只写CALL ...

不太推荐(或要谨慎)

  • 你只是想做查询内的行级/集合级变换:那是 UDF/PTF 的领域
  • Procedure 内部启动长周期作业时,要考虑资源、权限、隔离和可观测性(日志/指标/审计)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/25 10:03:17

云计算运维VS网络安全:2025年最值得入行的IT赛道,3个真实转行故事it职业规划告诉你如何选择好就业前景好薪资高待遇好?

在数字化浪潮席卷全球的2025年&#xff0c;IT行业始终站在变革最前沿。当传统岗位逐渐被自动化取代&#xff0c;两个赛道却逆势崛起——云计算运维与网络安全&#xff0c;成为企业数字化转型的“刚需双雄”。据工信部数据&#xff0c;2025年我国云计算市场规模突破1万亿元&…

作者头像 李华
网站建设 2026/2/22 6:36:54

15、Windows Azure 存储服务入门与 REST API 详解

Windows Azure 存储服务入门与 REST API 详解 1. Windows Azure 存储服务概述 在 Windows Azure 中,有多种存储服务可供选择,不同的服务有不同的特点和适用场景。 1.1 表存储(Table Storage) 应用开发者可以使用分区键精确控制数据的物理分区。选择合适的分区键至关重要…

作者头像 李华
网站建设 2026/2/28 4:58:41

17、探索Windows Azure存储:从基础到应用

探索Windows Azure存储:从基础到应用 1. 构建存储客户端的挑战与实现 在为新的语言或平台实现存储客户端库时,可能会遇到一些难题。部分主流语言不支持SHA - 256(不过HMAC部分实现起来较为简单)。例如,若要实现该库的Erlang版本,就需要自行研究SHA - 256和HMAC的实现(…

作者头像 李华
网站建设 2026/2/22 11:32:32

20、高速网络中的缓冲区管理策略解析

高速网络中的缓冲区管理策略解析 1. 缓冲区管理概述 缓冲区管理是一种决定何时以及如何丢弃数据包以避免网络拥塞的策略。其性能可通过在拥塞期间公平且高效地控制流量的能力来衡量。通常,数据包丢弃决策要么在新数据包到达时做出,要么在拥塞开始时做出,此时可能会丢弃当前…

作者头像 李华
网站建设 2026/2/22 17:49:34

21、ATM网络与互联网缓冲管理技术解析

ATM网络与互联网缓冲管理技术解析 1. ATM网络的信元处理 在ATM网络中,对于相同损失优先级(LP)的服务类别,信元丢失率存在一定情况。虽然可以分别精确计算这些服务类别的信元丢失率,但这会增加实现复杂度。由于差异较小,为简化实现,可将I类和III类的信元丢失情况合并,…

作者头像 李华
网站建设 2026/2/27 13:04:01

22、网络缓冲区管理机制深度解析

网络缓冲区管理机制深度解析 在网络通信中,缓冲区管理是确保网络高效、稳定运行的关键环节。不同的缓冲区管理机制各有特点,适用于不同的网络场景。下面将详细介绍几种常见的缓冲区管理机制。 1. RED与尾丢弃路由器对比 尾丢弃(Tail Drop)路由器在处理TCP连接时存在一些…

作者头像 李华