news 2026/6/23 5:40:19

Flink源码阅读:如何生成StreamGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成StreamGraph

Flink 中有四种执行图,分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。

在开始读代码之前,我们先来简单介绍一下四种图之间的关系和区别。

StreamGraph 是根据用户用 Stream API 编写的代码生成的图,用来表示整个程序的拓扑结构。

JobGraph 是由 StreamGraph 生成的,它在 StreamGraph 的基础上,对链化了部分算子,将其合并成为一个节点,减少数据在节点之间传输时序列化和反序列化这些消耗。

ExecutionGraph 是由 JobGraph 生成的,它的主要特点是并行,将多并发的节点拆分。

PhysicalGraph 是 ExecutionGraph 实际部署后的图,它并不是一种数据结构。

StreamExecutionEnvironment

OK,了解了 Flink 四种执行图之后,我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手,在编写 Flink 程序时,它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境(如并行度、Checkpoint 配置、时间属性等)。

本文我们主要关注 StreamGraph 的生成,首先是数据流的入口,即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法,它们用来定义从哪个数据源读取数据,然后返回一个 DataStreamSource (继承自 DataStream),得到 DataStream 之后,它会在各个算子之间流转,最终到 Sink 端输出。

我们从 addSource 方法入手,addSource 方法中主要做了三件事:

1、处理数据类型,优先使用用户执行的数据类型,也可以自动推断

2、闭包清理,使用户传入的 function 能被序列化并发布到分布式环境执行

3、创建 DataStreamSource 并返回

private<OUT>DataStreamSource<OUT>addSource(finalSourceFunction<OUT>function,finalStringsourceName,@NullablefinalTypeInformation<OUT>typeInfo,finalBoundednessboundedness){checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);TypeInformation<OUT>resolvedTypeInfo=getTypeInfo(function,sourceName,SourceFunction.class,typeInfo);booleanisParallel=functioninstanceofParallelSourceFunction;clean(function);finalStreamSource<OUT,?>sourceOperator=newStreamSource<>(function);returnnewDataStreamSource<>(this,resolvedTypeInfo,sourceOperator,isParallel,sourceName,boundedness);}

现在我们有了 DataStream 了,那如何知道后续要进行哪些转换逻辑呢?答案在 transformations 这个变量中,它保存了后续所有的转换。

protectedfinalList<Transformation<?>>transformations=newArrayList<>();

Transformation

我们来看 Transformation 是如何生成和描述 DataStream 的转换流程的。以最常见的 map 方法为例。

public<R>SingleOutputStreamOperator<R>map(MapFunction<T,R>mapper,TypeInformation<R>outputType){returntransform("Map",outputType,newStreamMap<>(clean(mapper)));}

它调用了 transform 方法,transform 又调用了 doTransform 方法。

protected<R>SingleOutputStreamOperator<R>doTransform(StringoperatorName,TypeInformation<R>outTypeInfo,StreamOperatorFactory<R>operatorFactory){// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();OneInputTransformation<T,R>resultTransform=newOneInputTransformation<>(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism(),false);@SuppressWarnings({"unchecked","rawtypes"})SingleOutputStreamOperator<R>returnStream=newSingleOutputStreamOperator(environment,resultTransform);getExecutionEnvironment().addOperator(resultTransform);returnreturnStream;}

在 doTransform 方法中,就是创建 Transformation 和 SingleOutputStreamOperator(DataStream 的一个子类),然后调用 addOperator 方法将 transform 存到 StreamExecutionEnviroment 中的 transformations 变量中。

每个 Transformation 都有 id、name、parallelism 和 slotSharingGroup 等信息。其子类也记录有输入信息,如 OneInputTransformation 和 TwoInputTransformation。

StreamOperator

我们在调用 map 方法时,会传入一个自定义的处理函数,它也会保存在 Transformation 中。在 Flink 中定义了 StreamOperator 方法来抽象这类处理函数。在 map 方法中,它将我们传入的函数转成了 StreamMap,它继承了 AbstractUdfStreamOperator,同时实现了 OneInputStreamOperator 接口。

StreamOperator 定义了对算子生命周期管理的函数。

voidopen()throwsException;voidfinish()throwsException;voidclose()throwsException;OperatorSnapshotFuturessnapshotState(longcheckpointId,longtimestamp,CheckpointOptionscheckpointOptions,CheckpointStreamFactorystorageLocation)throwsException;voidinitializeState(StreamTaskStateInitializerstreamTaskStateManager)throwsException;

OneInputStreamOperator 是 StreamOperator 的子接口。在其基础上增加了对具体元素的处理,主要是对 key 的提取。

defaultvoidsetKeyContextElement(StreamRecord<IN>record)throwsException{setKeyContextElement1(record);}

AbstractUdfStreamOperator 则是提供了对自定义函数生命周期管理的实现。

@Overridepublicvoidopen()throwsException{super.open();FunctionUtils.openFunction(userFunction,DefaultOpenContext.INSTANCE);}@Overridepublicvoidfinish()throwsException{super.finish();if(userFunctioninstanceofSinkFunction){((SinkFunction<?>)userFunction).finish();}}@Overridepublicvoidclose()throwsException{super.close();FunctionUtils.closeFunction(userFunction);}

到这里,我们就知道了 Flink 中 DataStream 是如何转换的。处理逻辑保存在 Transformation 中。下面我们来看一组 Transformation 是如何生成 StreamGraph 的。

StreamGraph

生成 StreamGraph 的入口在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#generateStreamGraph

在 generate 方法中,会遍历所有 Transformation 并调用 transform 方法。在调用节点的 transform 方法之前,会先确保它的输入节点都已经转换成功。

目前定义了以下 Transformation:

static{@SuppressWarnings("rawtypes")Map<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>>tmp=newHashMap<>();tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator<>());tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator<>());tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(SourceTransformation.class,newSourceTransformationTranslator<>());tmp.put(SinkTransformation.class,newSinkTransformationTranslator<>());tmp.put(GlobalCommitterTransform.class,newGlobalCommitterTransformationTranslator<>());tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator<>());tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator<>());tmp.put(UnionTransformation.class,newUnionTransformationTranslator<>());tmp.put(StubTransformation.class,newStubTransformationTranslator<>());tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator<>());tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator<>());tmp.put(ReduceTransformation.class,newReduceTransformationTranslator<>());tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator<>());tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator<>());tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator<>());tmp.put(CacheTransformation.class,newCacheTransformationTranslator<>());translatorMap=Collections.unmodifiableMap(tmp);}

Flink 会根据不同的 Transformation 类调用其 translateInternal 方法。在 translateInternal 方法中就会去添加节点和边。

streamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());for(IntegerinputId:context.getStreamNodeIds(parentTransformations.get(0))){streamGraph.addEdge(inputId,transformationId,0);}

在 addOperator 方法中,它通过调用 addNode 来创建 StreamNode。

protectedStreamNodeaddNode(IntegervertexID,@NullableStringslotSharingGroup,@NullableStringcoLocationGroup,Class<?extendsTaskInvokable>vertexClass,@NullableStreamOperatorFactory<?>operatorFactory,StringoperatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException("Duplicate vertexID "+vertexID);}StreamNodevertex=newStreamNode(vertexID,slotSharingGroup,coLocationGroup,operatorFactory,operatorName,vertexClass);streamNodes.put(vertexID,vertex);isEmpty=false;returnvertex;}

在 addEdgeInternal 方法中,对于 sideOutput 和 partition 这类虚拟节点,会先解析出原始节点,再建立实际的边。

privatevoidaddEdgeInternal(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,List<String>outputNames,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualSideOutputNodes.get(virtualId).f0;if(outputTag==null){outputTag=virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,null,outputTag,exchangeMode,intermediateDataSetId);}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualPartitionNodes.get(virtualId).f0;if(partitioner==null){partitioner=virtualPartitionNodes.get(virtualId).f1;}exchangeMode=virtualPartitionNodes.get(virtualId).f2;addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,exchangeMode,intermediateDataSetId);}else{createActualEdge(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputTag,exchangeMode,intermediateDataSetId);}}

最后根据两个物理节点创建 StreamEdge 进行连接。

privatevoidcreateActualEdge(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){StreamNodeupstreamNode=getStreamNode(upStreamVertexID);StreamNodedownstreamNode=getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitioner==null&&upstreamNode.getParallelism()==downstreamNode.getParallelism()){partitioner=dynamic?newForwardForUnspecifiedPartitioner<>():newForwardPartitioner<>();}elseif(partitioner==null){partitioner=newRebalancePartitioner<Object>();}if(partitionerinstanceofForwardPartitioner){if(upstreamNode.getParallelism()!=downstreamNode.getParallelism()){if(partitionerinstanceofForwardForConsecutiveHashPartitioner){partitioner=((ForwardForConsecutiveHashPartitioner<?>)partitioner).getHashPartitioner();}else{thrownewUnsupportedOperationException("Forward partitioning does not allow "+"change of parallelism. Upstream operation: "+upstreamNode+" parallelism: "+upstreamNode.getParallelism()+", downstream operation: "+downstreamNode+" parallelism: "+downstreamNode.getParallelism()+" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}}if(exchangeMode==null){exchangeMode=StreamExchangeMode.UNDEFINED;}/** * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link * StreamEdge}. */intuniqueId=getStreamEdges(upstreamNode.getId(),downstreamNode.getId()).size();StreamEdgeedge=newStreamEdge(upstreamNode,downstreamNode,typeNumber,partitioner,outputTag,exchangeMode,uniqueId,intermediateDataSetId);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}

通过 StreamNode 和 StreamEdge,就可以得到所有的节点和边,也就是我们的 StreamGraph 就创建完成了。

总结

本文先介绍了 Flink 的四种执行图以及它们之间的关系。接着又通过源码探索了 StreamGraph 的生成逻辑,Flink 将处理 逻辑保存在 Transformation 中,又由 Transformation 生成了 StreamGraph。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 15:56:24

10.8 总结

10.8 总结 作业回顾 1.1 索引练习节选 s hello 1 world 2 hello 3 Python # 获取s的长度 print(len(s)) # 30 # 获取第4个字符 print(s[3]) # l # 获取最后一个字符 print(s[-1]) # n # 获取第7个字符 print(s[6]) # 1 # 获取倒数第7个字符 print(s[-7]) # 空格【不显…

作者头像 李华
网站建设 2026/6/23 15:18:13

列车售票|基于springboot 列车售票系统(源码+数据库+文档)

列车售票目录 基于springboot vue列车售票系统 一、前言 二、系统功能演示 三、技术选型 四、其他项目参考 五、代码参考 六、测试参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 基于springboot vue列车售票系统 一、前言 博主介绍&#xff1a;✌️大厂…

作者头像 李华
网站建设 2026/6/22 22:33:30

AI驱动的手动测试变革:赋能而非替代

随着大语言模型和智能自动化技术的飞速发展&#xff0c;软件测试领域正迎来前所未有的变革浪潮。传统手动测试作为软件质量保障的基石&#xff0c;面临着效率提升与价值重塑的双重挑战。 AI时代手动测试的困境与机遇 传统手动测试的局限性 手动测试长期面临着测试覆盖率低、…

作者头像 李华
网站建设 2026/6/23 5:58:29

【奶茶Beta专项】【LVGL9.4源码分析】09-core-group

【奶茶Beta专项】【LVGL9.4源码分析】09-core-group焦点组管理1 概述1.1 文档目的1.2 代码版本与范围2 设计意图与总体定位2.1 为什么需要 lv_group2.2 lv_group 在架构中的位置2.3 与全局输入/焦点状态的关系3 使用方式与典型 DEMO3.1 创建 group 并绑定编码器输入3.2 在菜单/…

作者头像 李华