1. Procedures 是什么,适合做什么
Procedure 可以理解为:SQL 世界里的“存储过程”,但执行体可以启动 Flink 作业。
典型用途
- 管理类:生成测试数据、重建/维护某些资源、触发后台作业
- 数据操作类:一键跑一个数据准备/清洗/校验 Job,并把结果以表的形式返回
- 平台化:把一堆“运维脚本/管理逻辑”收敛到 Catalog 中,让用户统一用 SQL
CALL调用
2. 实现规则:必须实现Procedure接口 + 定义call(...)
2.1 类要求
- 实现
org.apache.flink.table.procedures.Procedure - 类必须
public、非抽象、全局可访问 - 不能是匿名类、非 static 内部类
2.2call(...)方法要求(最关键)
接口本身不定义方法,你需要自己定义名为call的方法:
硬性规则
call必须public第一个参数必须是
ProcedureContext- 用
context.getExecutionEnvironment()拿到StreamExecutionEnvironment
- 用
返回类型必须是数组:
int[]、String[]、Row[]、Long[]等
而且 JVM 普通重载规则都适用
- 支持重载:
call(ctx, int)/call(ctx, String) - 支持 varargs:
call(ctx, Integer...) - 支持继承入参:
call(ctx, Object)等
如果你用 Scala
- varargs 需要加
scala.annotation.varargs - 建议用装箱类型(
java.lang.Integer而不是Int)以支持 NULL
3. 一个最小可用的 Procedure 示例:生成序列
下面这个示例展示了:Procedure 拿到StreamExecutionEnvironment,用fromSequence跑一个小 Job,然后把结果收集为数组返回。
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.util.CloseableIterator;publicclassGenerateSequenceProcedureimplementsProcedure{publiclong[]call(ProcedureContextcontext,intn)throwsException{returngenerate(context.getExecutionEnvironment(),n);}publiclong[]call(ProcedureContextcontext,Stringn)throwsException{returngenerate(context.getExecutionEnvironment(),Integer.parseInt(n));}privatelong[]generate(StreamExecutionEnvironmentenv,intn)throwsException{long[]sequenceN=newlong[n];inti=0;try(CloseableIterator<Long>it=env.fromSequence(0,n-1).executeAndCollect()){while(it.hasNext()){sequenceN[i++]=it.next();}}returnsequenceN;}}要点
call可以重载- 用
context.getExecutionEnvironment()获取执行环境 - 结果必须是数组
4. 类型推断 Type Inference:为什么你有时必须加注解
Flink Table/SQL 是强类型生态,因此 Procedure 的参数/返回值需要映射成 Flink DataType。
Flink 会通过反射自动推断(Automatic Type Inference),但在以下情况经常需要你“补注解”
- 小数精度/scale(DECIMAL)
- 嵌套 Row 类型(ROW<…>)
- RAW/自定义序列化对象
- 一个
call想吃多种输入类型、但希望统一输出类型(用@ProcedureHint)
4.1@DataTypeHint:给参数或返回值补充类型信息
注意:call的返回值必须是T[],如果你给返回值加@DataTypeHint,其实标注的是数组元素类型 T。
importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.InputGroup;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;importjava.math.BigDecimal;importjava.nio.ByteBuffer;importjava.time.Instant;publicclassOverloadedProcedureimplementsProcedure{publicLong[]call(ProcedureContextcontext,longa,longb){returnnewLong[]{a+b};}public@DataTypeHint("DECIMAL(12, 3)")BigDecimal[]call(ProcedureContextcontext,doublea,doubleb){returnnewBigDecimal[]{BigDecimal.valueOf(a+b)};}@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")publicRow[]call(ProcedureContextcontext,inti){returnnewRow[]{Row.of(String.valueOf(i),Instant.ofEpochSecond(i))};}@DataTypeHint(value="RAW",bridgedTo=ByteBuffer.class)publicByteBuffer[]call(ProcedureContextcontext,@DataTypeHint(inputGroup=InputGroup.ANY)Objecto){returnnewByteBuffer[]{MyUtils.serializeToByteBuffer(o)};}}4.2@ProcedureHint:把“输入类型 -> 输出类型”的映射说清楚
适合场景
- 一个
call想统一处理多输入类型(例如Object...) - 多个重载
call的输出类型一致,想全局声明一次
importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.ProcedureHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;@ProcedureHint(output=@DataTypeHint("ROW<s STRING, i INT>"))publicclassSumProcedureimplementsProcedure{publicRow[]call(ProcedureContextcontext,inta,intb){returnnewRow[]{Row.of("Sum",a+b)};}publicRow[]call(ProcedureContextcontext){returnnewRow[]{Row.of("Empty args",-1)};}}更极端的用法:完全用 hint 决定类型,call只要 JVM 能调用即可。
5. 命名参数 Named Parameters:让 CALL 更可读,也能省参数
调用 procedure 时可以用“命名参数”,好处
- 不怕参数顺序写错
- 可选参数可省略(默认补
null) - 可读性强(适合平台化)
通过@ArgumentHint标注参数名、类型、是否可选。
参数级别标注示例
importorg.apache.flink.table.annotation.ArgumentHint;importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;publicclassNamedParameterProcedureimplementsProcedure{public@DataTypeHint("INT")Integer[]call(ProcedureContextcontext,@ArgumentHint(name="a",isOptional=true)Integera,@ArgumentHint(name="b")Integerb){returnnewInteger[]{a+(b==null?0:b)};}}重要限制
- 命名参数仅在没有重载、没有可变参数(varargs)时生效
@ArgumentHint已包含@DataTypeHint,在某些组合场景下不能混用(按文档要求)
6. 把 Procedure 放进 Catalog:getProcedure+listProcedures
Procedure 必须存在于 Catalog 才能被CALL。
你需要在 Catalog 中实现:
Catalog.getProcedure(ObjectPath procedurePath):返回 procedure 实例Catalog.listProcedures(String dbName):列出该库下有哪些 procedure
示例(内存 catalog 内置 procedure)
importorg.apache.flink.table.catalog.GenericInMemoryCatalog;importorg.apache.flink.table.catalog.ObjectPath;importorg.apache.flink.table.catalog.exceptions.CatalogException;importorg.apache.flink.table.catalog.exceptions.DatabaseNotExistException;importorg.apache.flink.table.catalog.exceptions.ProcedureNotExistException;importorg.apache.flink.table.procedures.Procedure;importjava.util.*;importjava.util.stream.Collectors;publicclassCatalogWithBuiltInProcedureextendsGenericInMemoryCatalog{privatestaticfinalMap<ObjectPath,Procedure>PROCEDURE_MAP=newHashMap<>();static{PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"),newGenerateSequenceProcedure());}publicCatalogWithBuiltInProcedure(Stringname){super(name);}@OverridepublicList<String>listProcedures(StringdbName)throwsDatabaseNotExistException,CatalogException{if(!databaseExists(dbName)){thrownewDatabaseNotExistException(getName(),dbName);}returnPROCEDURE_MAP.keySet().stream().filter(p->p.getDatabaseName().equals(dbName)).map(ObjectPath::getObjectName).collect(Collectors.toList());}@OverridepublicProceduregetProcedure(ObjectPathprocedurePath)throwsProcedureNotExistException,CatalogException{Procedurep=PROCEDURE_MAP.get(procedurePath);if(p!=null){returnp;}thrownewProcedureNotExistException(getName(),procedurePath);}}7. SQL 调用:CALL catalog.db.proc(args...)
注册 Catalog 后就能调用:
TableEnvironmenttEnv=TableEnvironment.create(...);tEnv.registerCatalog("my_catalog",newCatalogWithBuiltInProcedure("my_catalog"));// 调用tEnv.executeSql("CALL my_catalog.`system`.generate_n(5)");SQL 侧一般就是
CALL my_catalog.\system.generate_n(5)- 或者用命名参数(如果你的 procedure 支持且没有重载/varargs)
8. 实战建议:什么时候用 Procedure,什么时候别用
推荐用 Procedure
- 平台里做“管理命令”:一键生成数据、触发离线/流式任务、数据质量检查
- 把复杂逻辑隐藏在 Procedure 里,让用户只写
CALL ...
不太推荐(或要谨慎)
- 你只是想做查询内的行级/集合级变换:那是 UDF/PTF 的领域
- Procedure 内部启动长周期作业时,要考虑资源、权限、隔离和可观测性(日志/指标/审计)