news 2026/2/2 3:30:18

Flink源码阅读:Mailbox线程模型

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:Mailbox线程模型

本文我们来梳理 Flink 的线程模型——Mailbox。

写在前面

在以前的线程模型中,Flink 通过 checkpointLock 来隔离保证不同线程在修改内部状态时的正确性。通过 checkpointLock 控制并发会在代码中出现大量的synchronize(lock)这样非常不利于阅读和调试。Flink 也提供了一些 API 将锁对象暴露给用户,如果没有正确使用锁,很容易导致线程安全问题。

为了解决这些问题,Flink 社区提出了基于 Mailbox 的线程模型。它是通过单线程加阻塞队列来实现。这样内部状态的修改就由单线程来完成了。

旧的线程模型中,checkpointLock 主要用在三个地方:

  • Event Process:包括 event、watermark、barrier 的处理和发送

  • Checkpoint:包括 Checkpoint 的触发和完成通知

  • ProcessTime Timer:ProcessTime 的回调通常涉及对状态的修改

在 Mailbox 模型中,将所有需要处理的事件都封装成 Mail 投递到 Mailbox 中,然后由单线程按照顺序处理。

相关定义

下面我们来看 Mailbox 的具体实现,具体涉及到以下这些类。

我们来逐个看一下这些类的定义和作用。

Mail

在 Mailbox 线程模型中,Mail 是最基础的一个类,它用来封装需要处理的消息和执行的动作。Checkpoint Trigger 和 ProcessTime Trigger 都是通过 Mail 来触发的。Mail 中包含以下属性:

// 选项,包括两个选项:isUrgent 和 deferrableprivatefinalMailOptionsImplmailOptions;// 要执行的动作privatefinalThrowingRunnable<?extendsException>runnable;// 优先级,这里的优先级不决定执行顺序,而是避免上下游之间的死锁问题privatefinalintpriority;// 描述信息privatefinalStringdescriptionFormat;privatefinalObject[]descriptionArgs;// 用于执行 runnable 的执行器privatefinalStreamTaskActionExecutoractionExecutor;

TaskMailbox

有了 Mail 之后,Flink 用 TaskMailbox 来存储它,在需要执行时,再从 TaskMailbox 中取出。具体的处理逻辑在 TaskMailboxImpl 中。

// 内部对于 queue 和 state 的并发访问都需要被这个锁保护privatefinalReentrantLocklock=newReentrantLock();// 实际存储 Mail 的队列@GuardedBy("lock")privatefinalDeque<Mail>queue=newArrayDeque<>();// 与 lock 关联的 Condition,主要用于队列从空变为非空时唤醒等待获取 Mail 的线程@GuardedBy("lock")privatefinalConditionnotEmpty=lock.newCondition();// 状态,包括 OPEN/QUIESCED/CLOSED@GuardedBy("lock")privateStatestate=OPEN;// 指定的邮箱线程的引用@NonnullprivatefinalThreadtaskMailboxThread;// 用于性能优化的设计privatefinalDeque<Mail>batch=newArrayDeque<>();// queue队列是否为空,用于性能优化,避免频繁访问主队列privatevolatilebooleanhasNewMail=false;// 是否有紧急邮件,同样用于性能优化,减少检查队列中是否有紧急邮件的次数privatevolatilebooleanhasNewUrgentMail=false;

通过上面的属性,我们知道 TaskMailbox 底层是用 ArrayDeque 来存储 Mail 的,它内部包含了一个状态字段 state,state 的状态包括:

  • OPEN:可以正常访问,接收和发送 Mail。

  • QUIESCED:处于静默状态,不接收新的 Mail,已有的 Mail 仍然可以被取出。

  • CLOSED:关闭状态,不能进行任何操作。

在 TaskMailbox 内部,并发访问 queue 队列和 state 状态都需要 lock 这个锁的保护。此外 TaskMailbox 内部还做了一些性能优化,比如增加了 batch 队列,在处理 Mail 时,先将一批 Mail 从 queue 队列转移到 batch,之后会优先从 batch 队列中取,这样就减少了访问 queue 队列的次数,缓解了锁竞争压力。

MailboxProcessor

MailboxProcessor 可以认为是 Mailbox 相关的核心入口,MailboxProcessor 的核心方法就是事件循环,这个循环中主要是从 TaskMailbox 中取出 Mail 执行相应动作和执行默认动作(MailboxDefaultAction)。

MailboxProcessor 还对外提供了 MailboxExecutor,其他组件可以利用 MailboxExecutor 来提交事件。

MailboxExecutor

我们接着来看 MailboxExecutor,它的实现类是 MailboxExecutorImpl。包括以下属性:

// 实际存储的 mailbox 实例@NonnullprivatefinalTaskMailboxmailbox;// 优先级,MailboxExecutor 提供的默认优先级,提交 mail 时会带上这个字段privatefinalintpriority;// 执行器,运行 mail 的动作privatefinalStreamTaskActionExecutoractionExecutor;// 执行 MailboxProcessor,主要用于 isIdle 方法privatefinalMailboxProcessormailboxProcessor;

MailboxExecutor 的主要作用是向 TaskMailbox 中投递 mail,核心方法是 execute。这个方法可以在任意线程中执行,因为 mailbox 内部控制了并发。

publicvoidexecute(MailOptionsmailOptions,finalThrowingRunnable<?extendsException>command,finalStringdescriptionFormat,finalObject...descriptionArgs){try{mailbox.put(newMail(mailOptions,command,priority,actionExecutor,descriptionFormat,descriptionArgs));}catch(MailboxClosedExceptionmbex){thrownewRejectedExecutionException(mbex);}}

除了 execute 方法以外,MailboxExecutor 中还有一个重要的方法,就是 yield。

publicvoidyield()throwsInterruptedException{Mailmail=mailbox.take(priority);try{mail.run();}catch(Exceptionex){throwWrappingRuntimeException.wrapIfNecessary(ex);}}

这个方法的主要目的是为了让出对当前事件的处理。这么做的原因有二:

  1. 如果不考虑优先级的因素,Mailbox 队列是 FIFO 的顺序处理,如果当前事件依赖后面的事件完成,则有可能造成”死锁“。

  2. 当前事件处理事件较长,会阻塞其他事件。因此需要让出执行权,让相同或更高优先级的事件有机会执行。

需要注意的是 yield 方法只能有 mailbox 线程自身调用。另外,Flink 也提供了非阻塞版本的方法,就是 tryYield。

执行流程

主流程

在创建 StreamTask 时,会创建 mailboxProcessor,同时也会持有 mainMailboxExecutor。

newTaskMailboxImpl(Thread.currentThread()));...this.mailboxProcessor=newMailboxProcessor(this::processInput,mailbox,actionExecutor,mailboxMetricsControl);...this.mainMailboxExecutor=mailboxProcessor.getMainMailboxExecutor();

可以看到这里将 processInput 作为 MailboxDefaultAction 传入 MailboxProcessor。在 StreamTask 启动时,会调用 MailboxProcessor 的核心方法。

publicfinalvoidinvoke()throwsException{// Allow invoking method 'invoke' without having to call 'restore' before it.if(!isRunning){LOG.debug("Restoring during invoke will be called.");restoreInternal();}// final check to exit early before starting to runensureNotCanceled();scheduleBufferDebloater();// let the task do its workgetEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();runMailboxLoop();// if this left the run() method cleanly despite the fact that this was canceled,// make sure the "clean shutdown" is not attemptedensureNotCanceled();afterInvoke();}publicvoidrunMailboxLoop()throwsException{mailboxProcessor.runMailboxLoop();}

runMailboxLoop 的核心逻辑是一个 while 循环,在循环中处理 mail 并执行默认动作。

publicvoidrunMailboxLoop()throwsException{suspended=!mailboxLoopRunning;finalTaskMailboxlocalMailbox=mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assertlocalMailbox.getState()==TaskMailbox.State.OPEN:"Mailbox must be opened!";finalMailboxControllermailboxController=newMailboxController(this);while(isNextLoopPossible()){// The blocking `processMail` call will not return until default action is available.processMail(localMailbox,false);if(isNextLoopPossible()){mailboxDefaultAction.runDefaultAction(mailboxController);// lock is acquired inside default action as needed}}}privatebooleanisNextLoopPossible(){// 'Suspended' can be false only when 'mailboxLoopRunning' is true.return!suspended;}

首先是做了前置检查,包括确保 TaskMailbox 是指定的 mailbox 线程,TaskMailbox 的状态是 OPEN。接着创建了 MailboxController,它用于 MailboxDefaultAction 与 MailboxProcessor 的交互。

然后就进入到while (isNextLoopPossible())循环了,循环中调用了 processMail,在这个方法中对 mail 进行处理。

privatebooleanprocessMail(TaskMailboxmailbox,booleansingleStep)throwsException{// Doing this check is an optimization to only have a volatile read in the expected hot// path, locks are only// acquired after this point.booleanisBatchAvailable=mailbox.createBatch();// Take mails in a non-blockingly and execute them.booleanprocessed=isBatchAvailable&&processMailsNonBlocking(singleStep);if(singleStep){returnprocessed;}// If the default action is currently not available, we can run a blocking mailbox execution// until the default action becomes available again.processed|=processMailsWhenDefaultActionUnavailable();returnprocessed;}

processMail 方法中先创建 batch,然后非阻塞的处理这批 mail。

privatebooleanprocessMailsNonBlocking(booleansingleStep)throwsException{longprocessedMails=0;Optional<Mail>maybeMail;while(isNextLoopPossible()&&(maybeMail=mailbox.tryTakeFromBatch()).isPresent()){if(processedMails++==0){maybePauseIdleTimer();}runMail(maybeMail.get());if(singleStep){break;}}if(processedMails>0){maybeRestartIdleTimer();returntrue;}else{returnfalse;}}privatevoidrunMail(Mailmail)throwsException{mailboxMetricsControl.getMailCounter().inc();mail.run();if(!suspended){// start latency measurement on first mail that is not suspending mailbox execution,// i.e., on first non-poison mail, otherwise latency measurement is not started to avoid// overheadif(!mailboxMetricsControl.isLatencyMeasurementStarted()&&mailboxMetricsControl.isLatencyMeasurementSetup()){mailboxMetricsControl.startLatencyMeasurement();}}}

processMailsNonBlocking 直接调用 runMail 方法,最终是调用mail.run执行具体动作。

processMailsWhenDefaultActionUnavailable 的逻辑是如果当前默认动作不可用,会接着调用 runMail 尝试处理 Mail,这里会阻塞的等待,直到有新的需要处理的 Mail 或者默认动作可用。

当默认动作可用时,就会执行默认动作,也就是Stream.processInput,这里就是处理 StreamRecord 了。

protectedvoidprocessInput(MailboxDefaultAction.Controllercontroller)throwsException{DataInputStatusstatus=inputProcessor.processInput();switch(status){caseMORE_AVAILABLE:if(taskIsAvailable()){return;}break;caseNOTHING_AVAILABLE:break;caseEND_OF_RECOVERY:thrownewIllegalStateException("We should not receive this event here.");caseSTOPPED:endData(StopMode.NO_DRAIN);return;caseEND_OF_DATA:endData(StopMode.DRAIN);notifyEndOfData();return;caseEND_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction();mailboxProcessor.suspend();return;}...}

当 status 是 MORE_AVAILABLE,表示还有更多数据可用立即处理,判断当前任务可用就立即返回。当 status 是 END_OF_INPUT 时,表示所有的输入都结束了,这时就会暂停循环事件的调用。

Checkpoint 流程

触发 Checkpoint 的流程是调用Stream.triggerCheckpointAsync方法。

publicCompletableFuture<Boolean>triggerCheckpointAsync(CheckpointMetaDatacheckpointMetaData,CheckpointOptionscheckpointOptions){checkForcedFullSnapshotSupport(checkpointOptions);MailboxExecutor.MailOptionsmailOptions=CheckpointOptions.AlignmentType.UNALIGNED==checkpointOptions.getAlignment()?MailboxExecutor.MailOptions.urgent():MailboxExecutor.MailOptions.options();CompletableFuture<Boolean>result=newCompletableFuture<>();mainMailboxExecutor.execute(mailOptions,()->{try{booleannoUnfinishedInputGates=Arrays.stream(getEnvironment().getAllInputGates()).allMatch(InputGate::isFinished);if(noUnfinishedInputGates){result.complete(triggerCheckpointAsyncInMailbox(checkpointMetaData,checkpointOptions));}else{result.complete(triggerUnfinishedChannelsCheckpoint(checkpointMetaData,checkpointOptions));}}catch(Exceptionex){// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throwex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);returnresult;}

通过调用mainMailboxExecutor.execute方法来向 Mailbox 中提交 Mail。Checkpoint 完成的通知也是一样放在 Mailbox 中执行的,不过这里提交的是一个高优先级的操作。

privateFuture<Void>notifyCheckpointOperation(RunnableWithExceptionrunnable,Stringdescription){CompletableFuture<Void>result=newCompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(()->{try{runnable.run();}catch(Exceptionex){result.completeExceptionally(ex);throwex;}result.complete(null);},description);returnresult;}

总结

本文我们梳理了 Mailbox 相关的源码。Flink 通过 Mailbox 线程模型来简化相关代码逻辑。我们先了解了几个核心类:Mail、TaskMailbox、MailboxProcessor、MailboxExecutor。然后梳理了具体的事件处理和触发的流程。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/30 8:26:00

Dify工作流实战指南:从零到一玩转AI自动化

Dify工作流实战指南&#xff1a;从零到一玩转AI自动化 【免费下载链接】Awesome-Dify-Workflow 分享一些好用的 Dify DSL 工作流程&#xff0c;自用、学习两相宜。 Sharing some Dify workflows. 项目地址: https://gitcode.com/GitHub_Trending/aw/Awesome-Dify-Workflow …

作者头像 李华
网站建设 2026/1/30 22:47:02

教育场景实战:用DeepSeek-R1-Distill-Qwen-1.5B快速开发数学解题助手

教育场景实战&#xff1a;用DeepSeek-R1-Distill-Qwen-1.5B快速开发数学解题助手 1. 引言&#xff1a;轻量模型在教育场景中的价值 随着大模型技术的快速发展&#xff0c;如何将高性能AI能力部署到资源受限的教学环境中&#xff0c;成为教育科技领域的重要课题。传统大模型虽…

作者头像 李华
网站建设 2026/2/1 8:34:43

Citra模拟器完整使用指南:从新手到精通

Citra模拟器完整使用指南&#xff1a;从新手到精通 【免费下载链接】citra 项目地址: https://gitcode.com/GitHub_Trending/ci/citra 想要在电脑上重温任天堂3DS的经典游戏体验吗&#xff1f;Citra模拟器为你提供了完美的解决方案。这款开源模拟器不仅支持Windows、ma…

作者头像 李华
网站建设 2026/1/30 5:33:08

5分钟搞定电子课本下载:智慧教育平台PDF获取全攻略

5分钟搞定电子课本下载&#xff1a;智慧教育平台PDF获取全攻略 【免费下载链接】tchMaterial-parser 国家中小学智慧教育平台 电子课本下载工具 项目地址: https://gitcode.com/GitHub_Trending/tc/tchMaterial-parser 还在为找不到合适的电子教材而烦恼吗&#xff1f;备…

作者头像 李华
网站建设 2026/2/2 18:39:02

支持混合语言与注释优化,HY-MT1.5-7B让翻译更精准

支持混合语言与注释优化&#xff0c;HY-MT1.5-7B让翻译更精准 1. 引言&#xff1a;面向复杂场景的下一代翻译模型 随着全球化进程加速&#xff0c;跨语言交流的需求日益增长&#xff0c;传统翻译系统在面对混合语言输入、带格式文本以及专业术语密集内容时表现乏力。尽管通用…

作者头像 李华
网站建设 2026/1/25 19:09:44

Llama3-8B轻量级部署:边缘设备运行可行性分析

Llama3-8B轻量级部署&#xff1a;边缘设备运行可行性分析 1. 技术背景与部署挑战 随着大语言模型&#xff08;LLM&#xff09;在自然语言理解、代码生成和对话系统中的广泛应用&#xff0c;如何将高性能模型部署到资源受限的边缘设备成为工程落地的关键问题。传统千亿参数模型…

作者头像 李华