news 2026/2/15 9:35:19

Java并发编程——ExecutorCompletionService原理详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java并发编程——ExecutorCompletionService原理详解

一、简介

在JDK并发包中有这么一个类ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果。

该类实现了接口CompletionService:

public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }

该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。

CompletionService接口定义了一组任务管理接口:

  • submit() - 提交任务

  • take() - 获取任务结果

  • poll() - 获取任务结果

  • ExecutorCompletionService类是CompletionService接口的实现

  • ExecutorCompletionService内部管理者一个已完成任务的阻塞队列

  • ExecutorCompletionService引用了一个Executor,用来执行任务

  • submit()方法最终会委托给内部的executor去执行任务

  • take/poll方法的工作都委托给内部的已完成任务阻塞队列

  • 如果阻塞队列中有已完成的任务,take方法就返回任务的结果,否则阻塞等待任务完成

  • poll与take方法不同,poll有两个版本:

    • 无参的poll方法 --- 如果完成队列中有数据就返回, 否则返回null
    • 有参数的poll方法 --- 如果完成队列中有数据就直接返回,否则等待指定的时间,到时间后如果还是没有数据就返回null
    • ExecutorCompletionService主要用与管理异步任务 (有结果的任务,任务完成后要处理结果)

关于CompletionService和ExecutorCompletionService的类图如下:

ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范。其中,Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。另外,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。

QueueingFuture为内部类:

private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; protected void done() { completionQueue.add(task); } }

其中,done()方法就是在任务执行完毕后,将任务放入队列中。

在提交任务时,将任务封装成QueueingFuture:

public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; }

在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。

二、原理

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。

ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。

ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。

下面是ExecutorCompletionService的原理图:

  • 1、在使用ExecutorCompletionService时需要提供一个自定义的线程池Executor,构造ExecutorCompletionService。同时,也可以指定一个自定义的队列作为线程执行结果的容器,当线程执行完成时,通过重写FutureTask#done()将结果压入队列中。

  • 2、当用户把所有的任务都提交了以后,可通过ExecutorCompletionService#poll方法来弹出已完成的结果,这样做的好处是可以节省获取完成结果的时间。

下面是使用队列和不使用队列的流程对比,从图中我们可以看出,在使用队列的场景下,我们可以优先获取到完成的线程,当我们要汇总所有的执行结果时,这无疑会缩减我们的汇总时间。

而不使用队列时,我们需要对FutureTask进行遍历,因为我们不知道哪个线程先执行完了,只能挨个去获取结果,这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总。

如果算上汇总结果的耗时时间:

在使用队列的场景下,我们可以在其他任务线程执行的过程中汇总已完成的结果,节省汇总时间。不使用队列的场景下,只用等到当前的线程执行完成才能汇总。

代码演示

public class ExecutorCompletionServiceTest { public static void main(String[] args) throws Exception { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); ExecutorCompletionService ecs = new ExecutorCompletionService(fixedThreadPool); Future future = ecs.submit(new Callable() { @Override public Integer call() throws Exception { return ThreadLocalRandom.current().nextInt(100); } }); System.out.println("future:" + future.get()); //用于取出最新的线程执行结果,注意这里是阻塞的 //Future future1 = ecs.take(); //System.out.println("future1:" + future1.get()); //用于取出最新的线程执行结果,是非阻塞的,如果没有结果就返回null Future future2 = ecs.poll(); if (future2.isDone()) { System.out.println(future2.get()); } } } //多个线程,先执行完的进阻塞队列,然后可以按执行顺序获取结果 public class ExecutorCompletionServiceDemo { public static void main(String[] args) { //这里只是为了方便,真正项目中不要这样创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService); completionService.submit(() -> { System.out.println("执行任务1开始"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务1结束"); return "任务1执行成功"; }); completionService.submit(() -> { System.out.println("执行任务2开始"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务2结束"); return "任务2执行成功"; }); completionService.submit(() -> { System.out.println("执行任务3开始"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务3结束"); return "任务3执行成功"; }); for (int i = 0; i < 3; i++) { try { String result = completionService.take().get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); } }

源码

public class ExecutorCompletionService<V> implements CompletionService<V> { //执行任务的线程池 private final Executor executor; //用于调用AbstractExecutorService的newTaskFor方法,来实例化一个实现了RunnableFuture接口的对象 //如果executor继承了AbstractExecutorService ,则直接调用executor的newTaskFor方法 //否则直接创建一个FutureTask对象 private final AbstractExecutorService aes; //任务完成后放入该阻塞队列中 private final BlockingQueue<Future<V>> completionQueue; /** * 用于放入执行完成的任务 */ private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; //重写了FutureTask的done方法,任务完成后,将任务放入阻塞队列中 protected void done() { completionQueue.add(task); } } //将传入的Callable包装为RunnableFuture private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } //将传入的Callable包装为RunnableFuture private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * * @param executor the executor to use * @throws NullPointerException if executor is {@code null} */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //completionQueue默认为LinkedBlockingQueue this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * * @param executor the executor to use * @param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service. This * queue is treated as unbounded -- failed attempted * {@code Queue.add} operations for completed tasks cause * them not to be retrievable. * @throws NullPointerException if executor or completionQueue are {@code null} */ public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } /** * 提交任务,任务被包装为QueueingFuture对象,主要重写FutureTask的done方法, * 使得任务执行完毕后被执行任务的线程放入到阻塞队列中 * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } //从阻塞队列中获取任务 public Future<V> take() throws InterruptedException { return completionQueue.take(); } //如果完成队列中有数据就返回, 否则返回null public Future<V> poll() { return completionQueue.poll(); } //如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }

AI大模型学习福利

作为一名热心肠的互联网老兵,我决定把宝贵的AI知识分享给大家。 至于能学习到多少就看你的学习毅力和能力了 。我已将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

一、全套AGI大模型学习路线

AI大模型时代的学习之旅:从基础到前沿,掌握人工智能的核心技能!

因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获取

二、640套AI大模型报告合集

这套包含640份报告的合集,涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师,还是对AI大模型感兴趣的爱好者,这套报告合集都将为您提供宝贵的信息和启示。

因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获

三、AI大模型经典PDF籍

随着人工智能技术的飞速发展,AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型,如GPT-3、BERT、XLNet等,以其强大的语言理解和生成能力,正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。


因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获

四、AI大模型商业化落地方案

因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获

作为普通人,入局大模型时代需要持续学习和实践,不断提高自己的技能和认知水平,同时也需要有责任感和伦理意识,为人工智能的健康发展贡献力量

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/10 18:52:17

优化算法matlab实现(一)相关matlab基础

之前实现优化算法用的java、python、c&#xff0c;matlab使用较少&#xff0c;接下来会用matlab来实现。此处记录了一些matlab中常用的函数和需要注意的地方。 Matlab版本2015b1&#xff0e;向量Matlab的名称就是矩阵实验室&#xff0c;其中大多数运算都是矩阵运算。不过&#…

作者头像 李华
网站建设 2026/2/15 1:34:50

降本增效利器!腾讯云云服务器成本优势全解析

对于企业而言&#xff0c;数字化转型的核心诉求之一便是降本增效。传统自建数据中心模式存在前期投入大、运维成本高、资源利用率低等诸多痛点&#xff0c;而腾讯云云服务器通过创新的计费模式、硬件优化和资源调度能力&#xff0c;大幅降低了企业的IT总体拥有成本&#xff08;…

作者头像 李华
网站建设 2026/2/13 0:51:59

如何5分钟搭建跨平台窗口监控系统:终极工具完全指南

如何5分钟搭建跨平台窗口监控系统&#xff1a;终极工具完全指南 【免费下载链接】active-win Get metadata about the active window (title, id, bounds, owner, etc) 项目地址: https://gitcode.com/gh_mirrors/ac/active-win 在前100字内&#xff0c;我们为您介绍一款…

作者头像 李华
网站建设 2026/2/11 13:00:14

NotchDrop:让MacBook刘海变身智能文件中转站

NotchDrop&#xff1a;让MacBook刘海变身智能文件中转站 【免费下载链接】NotchDrop Use your MacBooks notch like Dynamic Island for temporary storing files and AirDrop 项目地址: https://gitcode.com/gh_mirrors/no/NotchDrop 你是否曾想过&#xff0c;那个占据…

作者头像 李华
网站建设 2026/2/15 1:37:19

如何快速解决Nacos数据库升级冲突:5个实用技巧

如何快速解决Nacos数据库升级冲突&#xff1a;5个实用技巧 【免费下载链接】nacos Nacos是由阿里巴巴开源的服务治理中间件&#xff0c;集成了动态服务发现、配置管理和服务元数据管理功能&#xff0c;广泛应用于微服务架构中&#xff0c;简化服务治理过程。 项目地址: https…

作者头像 李华