Java中利用CompletableFuture高效并行处理大型列表数据

Java中利用CompletableFuture高效并行处理大型列表数据

本文深入探讨了在Java中如何利用CompletableFuture和ExecutorService高效并行处理大型列表数据。针对将耗时操作并行化的常见需求,文章分析了在并行处理中可能遇到的陷阱,特别是过早调用CompletableFuture::join导致任务串行执行的问题。通过提供正确的并行处理策略和示例代码,指导读者实现真正的并发执行,并有效聚合结果,从而显著提升数据处理性能。

1. 引言:并行处理大型列表的必要性

在现代数据处理场景中,我们经常需要对包含数万甚至数十万条记录的大型列表执行耗时操作,例如网络请求、数据库查询、复杂计算或文件i/o。如果采用传统的顺序处理方式,即使单条记录的处理时间很短,累积起来也可能导致整个流程耗时数小时,严重影响系统吞吐量和用户体验。

Java 8引入的CompletableFuture为异步编程和并行处理提供了强大的支持,它能够帮助我们有效地将这些耗时任务分解并并行执行,从而显著缩短总处理时间。然而,不恰当的使用方式也可能导致并行能力无法充分发挥,甚至退化为串行执行。

2. 初始尝试与常见陷阱分析

许多开发者在尝试使用CompletableFuture进行并行处理时,可能会遇到一个常见问题:尽管代码看起来像是并行的,但实际执行却仍然是串行的。以下是一个典型的错误示例:

import com.google.common.collect.Lists;import java.util.List;import java.util.Optional;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.Collectors;// 假设存在以下辅助类和方法// class ListItem { /* ... */ }// class ProcessResult { /* ... */ }// class OutputBean { /* ... */ }// class MyService { public Optional methodA(ListItem item) { /* ... */ } }// class MyProcessor {//     private MyService service = new MyService();//     private OutputBean mapToBean(ProcessResult result, ListItem originalItem) { /* ... */ }//     public List executeListPart(List subList) {//         return subList.stream()//                       .map(listItem -> service.methodA(listItem)//                                               .map(result -> mapToBean(result, listItem)))//                       .flatMap(Optional::stream)//                       .collect(Collectors.toList());//     }// }public class ParallelProcessingIncorrect {    // 假设这是您的列表和处理器实例    private static List largeList = /* 初始化一个包含50k ListItem的列表 */;    private static MyProcessor processor = new MyProcessor();    public static void main(String[] args) {        int noOfCores = Runtime.getRuntime().availableProcessors();        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);        try {            long startTime = System.currentTimeMillis();            List results = Lists.partition(largeList, 500).stream()                    .map(item -> CompletableFuture.supplyAsync(() -> processor.executeListPart(item), service))                    // 核心问题:在这里调用 CompletableFuture::join                    .map(CompletableFuture::join)                    .flatMap(List::stream)                    .collect(Collectors.toList());            long endTime = System.currentTimeMillis();            System.out.println("Incorrect approach total time: " + (endTime - startTime) + " ms");            System.out.println("Processed " + results.size() + " items.");        } finally {            service.shutdown();        }    }}

上述代码的问题在于 .map(CompletableFuture::join) 这一行。CompletableFuture.join() 方法是一个阻塞操作,它会等待当前 CompletableFuture 完成并返回其结果。这意味着,当 Stream 处理第一个分区的 CompletableFuture 时,它会立即阻塞并等待该分区的所有任务完成,然后才能继续处理下一个分区的 CompletableFuture。结果是,尽管每个分区内部的任务可能在单独的线程中执行,但不同分区之间的处理却是严格串行的,从而失去了并行处理的优势。

3. 正确的CompletableFuture并行处理策略

要实现真正的并行执行,关键在于将异步任务的创建和结果的等待(join)分离。我们应该首先创建并提交所有异步任务,将它们的CompletableFuture实例收集到一个列表中,然后在一个单独的步骤中等待所有这些CompletableFuture完成。

立即学习“Java免费学习笔记(深入)”;

以下是正确的并行处理代码示例:

用Apache Spark进行大数据处理 用Apache Spark进行大数据处理

本文档主要讲述的是用Apache Spark进行大数据处理——第一部分:入门介绍;Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。 在这个Apache Spark文章系列的第一部分中,我们将了解到什么是Spark,它与典型的MapReduce解决方案的比较以及它如何为大数据处理提供了一套完整的工具。希望本文档会给有需要的朋友带来帮助;感

用Apache Spark进行大数据处理 0 查看详情 用Apache Spark进行大数据处理

import com.google.common.collect.Lists;import java.util.ArrayList;import java.util.List;import java.util.Optional;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;// 辅助类定义(与上述示例相同,此处省略以保持简洁)// class ListItem { /* ... */ }// class ProcessResult { /* ... */ }// class OutputBean { /* ... */ }// class MyService { /* ... */ }// class MyProcessor { /* ... */ }public class ParallelProcessingCorrect {    private static List largeList; // 假设已初始化,例如:    static {        largeList = new ArrayList();        for (int i = 0; i < 50000; i++) {            largeList.add(new ListItem("item_" + i));        }    }    private static MyProcessor processor = new MyProcessor();    public static void main(String[] args) throws InterruptedException {        int noOfCores = Runtime.getRuntime().availableProcessors();        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1); // 推荐线程池大小为核心数-1或根据IO/CPU密集型任务调整        try {            long startTime = System.currentTimeMillis();            // 1. 创建并提交所有异步任务,收集CompletableFuture实例            List<CompletableFuture<List>> futures = Lists.partition(largeList, 500).stream()                    .map(itemPart -> CompletableFuture.supplyAsync(() -> processor.executeListPart(itemPart), service))                    .collect(Collectors.toList());            // 2. 等待所有CompletableFuture完成并获取结果            // 使用 CompletableFuture.allOf() 可以等待所有Future完成,但其本身不返回结果            // 更好的做法是遍历futures列表,逐个join或使用allof().join()后,再map获取结果            // 方法一:遍历futures列表,逐个join(更直接,但仍然是顺序join)            // List results = futures.stream()            //                                  .map(CompletableFuture::join) // 此时join是等待所有任务提交后才开始            //                                  .flatMap(List::stream)            //                                  .collect(Collectors.toList());            // 方法二:使用 CompletableFuture.allOf() 结合 thenApply/thenCompose(更优雅,推荐)            CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));            List results = allOf.thenApply(v -> futures.stream()                                                .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的                                                .flatMap(List::stream)                                                .collect(Collectors.toList()))                                        .join(); // 等待所有结果收集完成            long endTime = System.currentTimeMillis();            System.out.println("Correct approach total time: " + (endTime - startTime) + " ms");            System.out.println("Processed " + results.size() + " items.");        } finally {            // 确保线程池关闭            service.shutdown();            if (!service.awaitTermination(60, TimeUnit.SECONDS)) {                System.err.println("ExecutorService did not terminate in the specified time.");                service.shutdownNow();            }        }    }}

工作原理:

任务提交: Lists.partition(largeList, 500).stream().map(…) 这部分会遍历所有分区,并为每个分区创建一个 CompletableFuture 任务。CompletableFuture.supplyAsync() 会将任务提交给 ExecutorService 立即执行,而不会阻塞当前的流处理。Future收集: 所有的 CompletableFuture 实例被收集到一个 List<CompletableFuture<List>> futures 中。此时,所有任务可能已经开始并行执行了。统一等待:CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 创建了一个新的 CompletableFuture,它会在所有传入的 CompletableFuture 都完成后才完成。.thenApply(…) 定义了在 allOf 完成后执行的逻辑,即遍历 futures 列表,对每个 CompletableFuture 调用 join()。此时,由于 allOf 已经确保所有子任务都已完成,因此这些 join() 调用将是非阻塞的,能够立即获取结果。最后的 .join() 是等待整个结果聚合过程完成。

通过这种方式,所有分区的处理任务几乎同时开始执行,只有在需要聚合所有结果时,主线程才会被阻塞,从而实现了真正的并行加速。

4. ExecutorService的管理与考量

ExecutorService 是管理线程池的核心组件。在并行处理中,它的配置和生命周期管理至关重要。

创建: Executors.newFixedThreadPool(noOfCores – 1) 是一个常见的选择,它创建一个固定大小的线程池。线程池的大小应根据任务类型(CPU密集型或I/O密集型)和系统可用资源进行调整。对于CPU密集型任务,通常设置为 CPU核心数 – 1 或 CPU核心数;对于I/O密集型任务,可以设置得更大,因为线程在等待I/O时不会占用CPU。生命周期: ExecutorService 是一个重量级资源,应该在不再需要时显式关闭。service.shutdown():启动有序关闭,不再接受新任务,但会完成已提交的任务。service.awaitTermination(timeout, unit):等待已提交任务完成,或直到超时。这通常与 shutdown() 配合使用,以确保所有任务在程序退出前完成。service.shutdownNow():尝试立即停止所有正在执行的任务,并停止等待中的任务。这通常在 awaitTermination 超时后作为强制关闭的手段。

在应用程序的整个生命周期中,如果会频繁地进行并行处理,通常推荐复用同一个 ExecutorService 实例,而不是每次都创建和关闭新的实例,以减少资源开销。

5. 注意事项与性能优化

任务粒度: 适当的任务分块大小(如 Lists.partition(list, 500))非常重要。如果分块过小,会增加任务提交和线程调度的开销;如果分块过大,则可能导致某些线程负载过重,无法充分利用并行性。最佳分块大小通常需要根据实际任务的复杂度和执行时间进行测试和调整。异常处理: 在并行任务中,异常处理变得更为复杂。CompletableFuture 提供了 exceptionally() 和 handle() 等方法来处理异步任务中可能抛出的异常。在 join() 或 get() 时,如果任务抛出异常,它们会将异常重新抛出(通常是 CompletionException 或 ExecutionException),因此需要捕获并处理。结果聚合: CompletableFuture.allOf() 结合 thenApply 是一个优雅的聚合方式。如果需要聚合不同类型的 CompletableFuture 结果,可以使用 CompletableFuture.supplyAsync(() -> future1.join()).thenCombine(future2, (r1, r2) -> …) 等组合方法。异步上下文: 确保传递给 CompletableFuture.supplyAsync() 的 ExecutorService 是合适的,避免使用默认的 ForkJoinPool.commonPool(),因为它可能被其他库或系统任务占用,导致资源争抢。

6. 总结

通过 CompletableFuture 进行大型列表的并行处理是提升Java应用性能的有效手段。核心在于避免在任务提交阶段就阻塞性地等待结果。正确的做法是先将所有任务异步提交并收集其 CompletableFuture 实例,待所有任务均已启动或完成时,再统一进行结果的聚合。合理配置 ExecutorService、选择合适的任务粒度以及完善异常处理机制,将确保您的并行处理方案既高效又健壮。

以上就是Java中利用CompletableFuture高效并行处理大型列表数据的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/750889.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月25日 20:12:18
下一篇 2025年11月25日 20:17:11

相关推荐

  • 币圈行情哪里看?十大免费看行情网站必备清单

    对于加密货币投资者而言,实时掌握市场行情是做出明智决策的关键。本文精选了十个功能强大且免费的行情网站,它们不仅提供实时的价格数据,还包含丰富的图表工具和市场分析功能,帮助你轻松追踪价格动态、分析市场趋势。 十大免费行情网站推荐 1. 币安 (Binance) 作为全球交易量最大的加密货币交易所,币安…

    2025年12月11日
    000
  • 全球主流币圈行情网站大全,全部免费使用!

    在瞬息万变的加密货币市场中,及时获取准确的行情数据是做出明智决策的关键。本文为您精选了全球主流且免费使用的币圈行情网站,无论您是新手还是资深投资者,都能从中找到适合自己的工具,轻松掌握市场脉搏。 综合交易所行情平台 这些平台不仅是交易场所,更是强大的一站式行情数据中心,提供与其交易生态系统紧密集成的…

    2025年12月11日
    000
  • 币圈人都在用的免费行情工具:十大网站全解析

    在瞬息万变的加密货币市场,实时准确的行情数据是做出明智决策的基石。本文将为你全面解析币圈交易者和投资者最常使用的十大免费行情工具网站,它们不仅提供价格信息,更是强大的市场分析利器,帮助你洞察先机。 1. 币安 (Binance) 作为全球交易量最大的加密货币交易所,币安不仅是交易平台,其本身也是一个…

    2025年12月11日
    000
  • 交易者的选择:盘点币圈最实用的8个免费行情数据网站

    在信息爆炸的加密货币市场,快速获取准确、全面的行情数据是每位交易者成功的关键。本文为你整理了币圈最实用且免费的8个行情数据网站,它们不仅提供实时价格,还包含深度图表、链上数据和市场情绪分析,帮助你做出更明智的交易决策。 专业交易者的行情数据工具箱 1. 币安 (Binance) 作为全球领先的加密货…

    2025年12月11日
    000
  • 10个最佳免费加密货币行情网站推荐

    在快节奏的加密货币市场中,实时准确地获取行情信息是做出明智投资决策的关键。本文为您精选了10个功能强大且免费的加密货币行情网站,它们不仅提供价格数据,还包含丰富的图表、分析工具和市场动态,帮助您轻松掌握市场脉搏。 1. 币安 (Binance) 作为全球领先的加密货币交易所,币安不仅提供交易服务,其…

    2025年12月11日
    000
  • 币圈十大免费行情网站盘点(2025最新版)

    对于%ignore_a_1%投资者而言,一个实时、准确且全面的行情网站至关重要。本文为您精选了2025年最受欢迎的十大免费加密货币行情网站,这些平台不仅提供基础的价格信息,还包含深度图表、市场分析和海量数据,帮助您快速获取市场动态,做出明智的投资决策。 币圈十大免费行情网站 1. 币安 (Binan…

    2025年12月11日
    000
  • 虚拟币庄家怎么赚钱?庄家如何座庄收割韭菜?币圈最稳挣钱的九大方法

    庄家是%ignore_a_1%市场的一个重要概念,通常是指在加密货币市场中拥有雄厚资金实力和信息优势的主体,通过刻意制造市场涨跌节奏,以割散户韭菜为目的获利。对于投资者来说,要分析市场发展趋势、制定合理投资计划,了解虚拟币庄家怎么赚钱?至关重要,一般来说,就是通过集中建仓、拉盘、洗盘、砸盘等操作手法…

    2025年12月11日
    000
  • 币安2025最新下载_币安APP官方正版手机端入口

    欢迎来到币安2025年最新指南。在您通过官方渠道成功下载并安装币安app后,接下来的首要任务便是创建并保障您的账户安全。本教程将为您详细解析从注册到安全设置的全过程,助您轻松开启数字资产之旅。 币安官网直达: 币安官方app: 一、币安APP官方正版:新用户注册全流程 1、成功安装并打开币安官方正版…

    2025年12月11日 好文分享
    000
  • 比特币是什么?比特币买卖渠道及价格行情查看软件推荐

    binance币安交易所 注册入口: APP下载: 欧易OKX交易所 注册入口: APP下载: 火币HTX交易所: 注册入口: APP下载: 本文旨在简要介绍比特币的基本概念,并为初学者推荐一些主流可靠的交易渠道和行情查询工具,帮助您安全、高效地进入数字资产领域。 一、了解比特币(BTC) 1、比特…

    好文分享 2025年12月11日
    000
  • 比特币btc最新资讯在哪里看?比特币最新资讯查看软件大全

    及时获取准确的比特币(btc)资讯,对于市场参与者至关重要。本文将为您盘点几个主流且可靠的资讯平台,帮助您轻松掌握行业动态和价格走势,做出更明智的决策。 Binance币安交易所 注册入口: APP下载: 欧易OKX交易所 注册入口: APP下载: 火币HTX交易所: 注册入口: APP下载: 一、…

    好文分享 2025年12月11日
    000
  • 比特币交易软件有哪些平台?比特币交易所新手小白开户步骤

    比特币交易软件的选择众多,主要包括各大交易所提供的官方应用以及第三方交易工具。对于新手小白来说,选择一个安全、易用的平台至关重要,它能帮助你更好地理解市场并进行首次交易,从而踏上加密货币的投资之旅。 主流比特币交易平台 1、币安(Binance):作为全球最大的加密货币交易所之一,币安提供多种交易对…

    2025年12月11日 好文分享
    000
  • 为什么x402协议没有昙花一现? 爆火的x402还有哪些创业机会?

    x402协议作为近期备受关注的焦点,以其独特的机制和广泛的应用前景,成功避免了“昙花一现”的命运。它不仅在技术层面上实现了突破,更在社群构建、价值流通以及新颖的激励模式上展现出强大的生命力。 x402协议的成功并非偶然,它深刻洞察了用户对公平、透明和高效的需求,并通过一系列精妙的设计,将这些需求转化…

    2025年12月11日 好文分享
    000
  • 402bridge被盗事件警示:200+用户USDC遭窃,如何避免授权风险?

    近期,%ignore_a_1%领域再次响起了警钟,402bridge被盗事件牵动了无数投资者的心弦。据报道,超过200名用户的usdc在这次攻击中不幸失窃,总金额令人触目惊心。这起事件不仅仅是简单的资产损失,更深层次地揭示了去中心化金融(defi)领域中智能合约授权所带来的潜在风险。在数字资产日益普…

    好文分享 2025年12月11日
    000
  • Tectum(TET)币是什么?TET币2025年能涨到多少钱一枚?

    tet币是tectum区块链的原生代币,在其生态系统中发挥重要作用,包括治理、质押等。而tectum则是当前市场上速度最快的区块链之一,为用户提供了一个快速、高效、安全的区块链平台,对一般用户有利。简单介绍项目基本信息之后,投资者更想了解代币未来市场,想知道tet币2025年能涨到多少钱一枚?以便调…

    2025年12月11日 好文分享
    000
  • Orochi Network (ON) 币空投与上市日期:如何参加获得?

    orochi 网络 (on) 正在为其期待已久的空投和上市做准备。binance阿尔法,定于2025年10月24日举行。这次事件标志着该项目迈出了一步,专注于推进web3数据验证使用零知识证明 (zkps)您已训练至2023年10月的数据。 Binance币安 欧易OKX ️ Huobi火币️ 参与…

    好文分享 2025年12月11日
    000
  • Semantic Layer(42)币是什么?怎么样?Semantic Layer项目概述和空投领取指南

    Semantic Layer 是什么? 这是一个专注于 Web3 基础设施的协议,旨在通过创新机制提升 dApp 的运行效率与自主性。其核心在于提出了一种名为 dApp 应用程式控制执行(ACE) 的全新模式,以解决传统区块链执行层存在的固有问题。不同于以往依赖矿工或验证者来决定交易顺序的方式,Se…

    好文分享 2025年12月11日
    000
  • 什么是Ping(PING)币?它是如何工作的?Ping工作原理、主要功能和价格预测

    ping (ping) 不仅仅是一个代币,它象征着早期加密互联网的实验性和叛逆精神。在人工智能驱动和链上支付领域,它将 meme 文化与 coinbase 突破性的 x402 协议融合,开启了互联网原生、代理对代理交易的新时代。 Binance币安 欧易OKX ️ Huobi火币️ Ping (PI…

    好文分享 2025年12月11日
    000
  • 2025年加密货币交易所格局重塑:Top10排名与核心发展趋势解析

    随着数字资产市场的不断成熟,加密货币交易所的竞争格局正在被合规化、技术创新和用户体验深刻重塑。展望2025年,能够引领行业的平台不仅需要提供丰富的交易产品,更需在安全性、生态系统建设和拥抱web3趋势方面展现出卓越的领导力。本文将为您揭示预计在2025年主导市场的十大加密货币交易所,并解析其背后的核…

    好文分享 2025年12月11日
    000
  • 2025年币圈主流交易所排行榜与发展路径预测

    随着数字资产市场的不断成熟和演变,选择一个安全、可靠且功能强大的加密货币交易所对投资者至关重要。本文将为您盘点并预测2025年币圈主流交易所的格局,并探讨它们未来的发展路径,帮助您在变幻莫测的市场中找到最适合自己的交易平台。 2025年主流交易所排行榜预测 1. 币安 (Binance) 作为全球交…

    好文分享 2025年12月11日
    000
  • 抹茶交易所官网注册链接 MEXC官方网站安全登录地址

    binance币安交易所 注册入口: APP下载: 欧易OKX交易所 注册入口: APP下载: 火币交易所: 注册入口: APP下载: 抹茶交易所官网注册链接 MEXC官方网站安全登录地址在哪里?这是不少网友都关注的,接下来由PHP小编为大家带来抹茶交易所官网注册链接及安全登录入口信息,感兴趣的网友…

    好文分享 2025年12月11日
    000

发表回复

登录后才能评论
关注微信