Java并发编程:使用ExecutorService限制线程数量的教程

Java并发编程:使用ExecutorService限制线程数量的教程

本教程详细介绍了如何在java中利用`executors`框架和`executorservice`来限制并发执行的线程数量。通过将任务封装为`runnable`,并使用`executors.newfixedthreadpool()`创建固定大小的线程池,可以有效地管理资源并控制并发级别。文章还涵盖了任务提交、线程池的优雅关闭机制以及相关的最佳实践,旨在提供一个清晰、专业的并发编程指南。

引言

在Java应用程序开发中,面对需要并行处理大量任务的场景时,合理地管理并发线程至关重要。直接创建无限数量的线程可能导致系统资源耗尽、性能下降甚至程序崩溃。为了解决这一问题,Java 5引入了java.util.concurrent包,其中的Executors框架为我们提供了一套强大的工具来管理线程池,从而有效地限制和控制并发线程的数量。本教程将指导您如何使用ExecutorService来创建一个固定大小的线程池,以处理并发任务,并确保资源的有效利用。

核心概念:Executors框架与ExecutorService

Executors框架是Java并发编程的核心组件之一,它提供了一系列工厂方法来创建不同类型的ExecutorService实例。ExecutorService是一个高级接口,用于管理线程的生命周期和任务的提交。通过使用线程池,我们可以重用线程,而不是为每个任务都创建新线程,这大大降低了线程创建和销毁的开销。

为了限制并发线程的数量,最常用的方法是使用Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。当有新任务提交时,如果池中所有线程都在忙碌,那么新任务将被放入一个等待队列中,直到有空闲线程可用。

定义并发任务:Runnable接口

在使用ExecutorService之前,我们需要将要并行执行的逻辑封装成一个任务。Java提供了两个核心接口来定义任务:Runnable和Callable。

立即学习“Java免费学习笔记(深入)”;

Runnable: 适用于不需要返回结果且不抛出受检异常的任务。它只包含一个run()方法。Callable: 适用于需要返回结果且可能抛出受检异常的任务。它包含一个call()方法,并返回一个Future对象。

在本教程中,我们将以一个文件序列化任务为例,使用Runnable接口来定义任务。假设我们有一个EventuelleDestination对象列表,需要为每个对象执行序列化操作,并将结果写入文件。

首先,定义一个EventuelleDestination及其相关依赖的模拟类,以便构建完整的示例:

易优微信教育培训小程序模板 易优微信教育培训小程序模板

易优微信教育培训小程序模板是基于前端开源小程序+后端易优cms+标签化API接口,是一套开源、快速搭建个性化需求的小程序CMS。轻量级TP底层框架,前后端分离,标签化API接口可对接所有小程序,支持二次开发。即使小白用户也能轻松搭建制作一套完整的线上版小程序。 微信教育培训小程序模板主要特点:1、代码开源,支持二次修改2、微信原生写法,兼容性更好,代码可读性更强3、功能接口完整,支持eyoucms

易优微信教育培训小程序模板 0 查看详情 易优微信教育培训小程序模板

// EventuelleDestination.java - 模拟业务对象package com.example.concurrency;import java.util.Objects;public class EventuelleDestination {    private int id;    private Acceuillant eventuelAcceuillant;    public EventuelleDestination(int id, Acceuillant acceuillant) {        this.id = id;        this.eventuelAcceuillant = acceuillant;    }    public int getId() { return id; }    public Acceuillant getEventuelAcceuillant() { return eventuelAcceuillant; }    @Override    public String toString() {        return "EventuelleDestination{" + "id=" + id + ", acceuillantId=" + eventuelAcceuillant.getId() + '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        EventuelleDestination that = (EventuelleDestination) o;        return id == that.id && Objects.equals(eventuelAcceuillant, that.eventuelAcceuillant);    }    @Override    public int hashCode() {        return Objects.hash(id, eventuelAcceuillant);    }}// Acceuillant.java - 模拟嵌套对象package com.example.concurrency;import java.util.Objects;public class Acceuillant {    private int id;    public Acceuillant(int id) { this.id = id; }    public int getId() { return id; }    @Override    public String toString() {        return "Acceuillant{" + "id=" + id + '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        Acceuillant that = (Acceuillant) o;        return id == that.id;    }    @Override    public int hashCode() {        return Objects.hash(id);    }}// EmployeDao.java - 模拟数据访问层package com.example.concurrency;public class EmployeDao {    public Employe getEmploye() { return new Employe(100); }}// Employe.java - 模拟员工对象package com.example.concurrency;public class Employe {    private int id;    public Employe(int id) { this.id = id; }    public int getId() { return id; }}// EntrepriseDao.java - 模拟数据访问层package com.example.concurrency;public class EntrepriseDao {    public int retrouveEmplacementIdParDepartementId(int departmentId) {        // 模拟耗时操作或业务逻辑        try {            Thread.sleep(5 + (int)(Math.random() * 95)); // 模拟随机耗时 5-100ms        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            throw new RuntimeException("Thread interrupted during mock DAO call", e);        }        return departmentId + 500;    }}

接下来,我们将序列化逻辑封装到SerializationTask类中,它实现了Runnable接口:

// SerializationTask.java - 封装序列化任务package com.example.concurrency;import com.google.gson.Gson;import java.io.FileWriter;import java.io.IOException;import java.io.Writer;import java.nio.file.Path;import java.nio.file.Files;public class SerializationTask implements Runnable {    private final EventuelleDestination destination;    private final Path outputDirectory;    private final EmployeDao employeDao;    private final EntrepriseDao entrepriseDao;    public SerializationTask(EventuelleDestination destination, Path outputDirectory, EmployeDao employeDao, EntrepriseDao entrepriseDao) {        this.destination = destination;        this.outputDirectory = outputDirectory;        this.employeDao = employeDao;        this.entrepriseDao = entrepriseDao;    }    @Override    public void run() {        Gson gson = new Gson();        try {            // 确保输出目录存在            Files.createDirectories(outputDirectory);            String filename = employeDao.getEmploye().getId() + "_" +                              entrepriseDao.retrouveEmplacementIdParDepartementId(destination.getEventuelAcceuillant().getId()) + "_" +                              destination.getEventuelAcceuillant().getId() + ".json";            Path filePath = outputDirectory.resolve(filename);            try (Writer writer = new FileWriter(filePath.toFile())) {                gson.toJson(destination, writer);                System.out.println(Thread.currentThread().getName() + ": " + destination + " has been serialized to " + filePath);            }        } catch (IOException e) {            System.err.println(Thread.currentThread().getName() + ": Error serializing " + destination + ": " + e.getMessage());            e.printStackTrace();        } catch (RuntimeException e) { // 捕获模拟DAO中可能抛出的RuntimeException            System.err.println(Thread.currentThread().getName() + ": Runtime error during serialization of " + destination + ": " + e.getMessage());            e.printStackTrace();        }    }}

使用ExecutorService管理线程池

现在我们有了定义好的任务,接下来将使用ExecutorService来创建固定大小的线程池,并提交这些任务。

// AppExecutorDemo.java - 主应用程序package com.example.concurrency;import java.nio.file.Path;import java.nio.file.Paths;import java.time.Instant;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.stream.IntStream;public class AppExecutorDemo {    // 定义输出目录    private final Path outputDir = Paths.get("serialized_data");    public static void main(String[] args) {        AppExecutorDemo app = new AppExecutorDemo();        app.runDemo();    }    private void runDemo() {        // 准备模拟数据和依赖        EmployeDao employeDao = new EmployeDao();        EntrepriseDao entrepriseDao = new EntrepriseDao();        // 创建20个 EventuelleDestination 对象作为任务数据        List destinations = IntStream.rangeClosed(1, 20)                                                        .mapToObj(i -> new EventuelleDestination(i, new Acceuillant(i * 10)))                                                        .toList();        // 创建一个固定大小为3的线程池        ExecutorService executorService = Executors.newFixedThreadPool(3);        System.out.println("ExecutorService created with 3 threads. Submitting tasks...");        // 提交每个序列化任务到线程池        for (EventuelleDestination dest : destinations) {            executorService.submit(new SerializationTask(dest, outputDir, employeDao, entrepriseDao));        }        System.out.println("All tasks submitted. Awaiting termination...");        // 优雅地关闭线程池        shutdownAndAwaitTermination(executorService);        System.out.println("ExecutorService terminated. All tasks completed or cancelled.");    }    /**     * 优雅地关闭ExecutorService,等待已提交任务完成。     * 此方法基于JavaDoc中ExecutorService的推荐关闭模式。     *     * @param executorService 要关闭的ExecutorService实例     */    private void shutdownAndAwaitTermination(ExecutorService executorService) {        executorService.shutdown(); // 禁用新任务提交        try {            // 等待已提交任务在指定时间内完成            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {                executorService.shutdownNow(); // 如果超时,则取消当前正在执行的任务                // 再次等待,确保任务响应中断                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {                    System.err.println("Executor service did not terminate completely after forced shutdown. " + Instant.now());                }            }        } catch (InterruptedException ex) {            // 如果当前线程在等待期间被中断,则重新取消任务            executorService.shutdownNow();            // 重新设置中断状态            Thread.currentThread().interrupt();        }    }}

运行上述AppExecutorDemo类,您将看到类似以下的输出(具体的线程ID和时间戳会有所不同,但关键是pool-1-thread-X的数量不会超过3):

ExecutorService created with 3 threads. Submitting tasks...All tasks submitted. Awaiting termination...pool-1-thread-1: EventuelleDestination{id=1, acceuillantId=10} has been serialized to serialized_data/100_510_10.jsonpool-1-thread-2: EventuelleDestination{id=2, acceuillantId=20} has been serialized to serialized_data/100_520_20.jsonpool-1-thread-3: EventuelleDestination{id=3, acceuillantId=30} has been serialized to serialized_data/100_530_30.jsonpool-1-thread-1: EventuelleDestination{id=4, acceuillantId=40} has been serialized to serialized_data/100_540_40.jsonpool-1-thread-2: EventuelleDestination{id=5, acceuillantId=50} has been serialized to serialized_data/100_550_50.jsonpool-1-thread-3: EventuelleDestination{id=6, acceuillantId=60} has been serialized to serialized_data/100_560_60.json... (输出将继续,但始终只有3个线程在活跃地执行任务)ExecutorService terminated. All tasks completed or cancelled.

从输出中可以看到,尽管我们提交了20个任务,但实际执行任务的线程(例如pool-1-thread-1、pool-1-thread-2、pool-1-thread-3)只有3个,这正是newFixedThreadPool(3)所实现的效果。

优雅关闭ExecutorService

正确关闭ExecutorService是并发编程中的一个重要环节。如果不在应用程序退出前关闭线程池,可能会导致程序无法正常终止,或者资源泄漏。shutdownAndAwaitTermination方法提供了一种优雅的关闭机制:

executorService.shutdown(): 启动有序关闭,不再接受新任务,但会允许已提交的任务(包括等待队列中的任务)完成执行。executorService.awaitTermination(timeout, unit): 阻塞当前线程,直到所有任务完成执行,或者超时发生,或者当前线程被中断。executorService.shutdownNow(): 如果awaitTermination超时,表示任务未能及时完成,此时可以调用shutdownNow()尝试立即停止所有正在执行的任务,并清空等待队列。此方法会向所有正在执行的线程发送中断信号。

注意事项与最佳实践

选择合适的线程池大小: newFixedThreadPool()的线程数量应根据CPU核心数、任务类型(CPU密集型或I/O密集型)和系统资源进行权衡。对于CPU密集型任务,通常建议线程数接近CPU核心数;对于I/O密集型

以上就是Java并发编程:使用ExecutorService限制线程数量的教程的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1058850.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月2日 05:14:06
下一篇 2025年12月2日 05:14:27

相关推荐

  • Python 包管理:你知道它们在哪里吗?

    另一天,我在那里高兴地用 pip 安装一些 python 包,突然我想到了:我刚刚下载了 3gb 的数据,但我不知道它去了哪里!如果您发现自己处于这种情况,欢迎来到俱乐部。让我们一起揭开python包缺失的谜团,并学习如何更好地管理我们宝贵的磁盘空间。 python 包的迷人世界 在我们深入探讨“我…

    2025年12月13日
    000
  • python爬虫怎么过滤超链接

    如何使用 Python 爬虫过滤超链接?有多种方法可以过滤 Python 爬虫中的超链接:正则表达式:使用正则表达式匹配特定模式的 URL。Xpath 查询:使用 Xpath 根据特定的 XML 或 HTML 条件进行选择。CSS 选择器:使用 CSS 选择器从 HTML 文档中选择超链接。函数过滤…

    2025年12月13日
    000
  • 怎么是python爬虫停下来

    有四种方法可以停止 Python 爬虫:通过键盘中断 (Ctrl+C 或 Ctrl+Break)、使用信号处理、使用事件或使用条件变量。 如何停止 Python 爬虫 在进行网络爬取时,有必要能够控制爬虫的运行,包括让它停止。以下是实现此目的的几种方法: 1. 通过键盘中断 最简单的方法是通过键盘中…

    2025年12月13日
    000
  • 如何在共享主机的子目录中托管 Laravel 项目而不在 URL 中暴露“/public”

    在共享主机上托管 laravel 项目时,一个常见的挑战是确保 url 不需要 /public 目录。这是在子目录中托管 laravel 应用程序同时保持 url 干净的分步指南。 第 1 步:将 laravel 项目上传到服务器 登录您的托管帐户并访问您的文件管理器。导航到 public_html…

    2025年12月13日
    000
  • python爬虫怎么拼接网址

    在 Python 爬虫中,网址拼接可通过以下步骤实现:导入库并连接路径组件;添加查询参数,使用 urlparse 和 urlencode 函数;处理特殊字符,使用 quote() 函数进行编码。 如何使用 Python 爬虫拼接网址 拼接网址是指将多个字符串连接起来形成一个完整的网址。在 Pytho…

    2025年12月13日
    000
  • python爬虫怎么保存图片

    Python爬虫保存图片步骤:导入requests、Image、io库。发送请求获取图片。检查响应状态码,200表示成功。转换响应内容为Image对象。保存图片,指定文件名。 Python爬虫图片保存 如何使用Python爬虫保存图片? 使用Python爬虫保存图片可以遵循以下步骤: 1. 导入必要…

    2025年12月13日
    000
  • Python人脸匹配:如何使用百度人脸识别接口进行人脸匹配?

    python人脸匹配推荐 问题: 如何使用python进行人脸匹配?是否有哪些可用的接口,最好附带python调用示例或演示? 答案: 立即学习“Python免费学习笔记(深入)”; 推荐使用百度的人脸识别接口。该接口集成了先进的人脸识别技术,并支持多种人脸识别功能,包括人脸检测、人脸特征提取和人脸…

    2025年12月13日
    000
  • C# 开发者转行:学 Python 还是 Go 更适合未来发展?

    Python vs. Go:哪种语言更适合未来发展? 问题:对于希望转行的 C# 开发人员来说,选择 Python 还是 Go 更好? 回答: Python: 立即学习“Python免费学习笔记(深入)”; 拥有庞大且活跃的社区,丰富的扩展和资源。广泛应用于爬虫、人工智能和机器学习领域。在就业市场上…

    2025年12月13日
    000
  • C#程序员转行:Python和Go,哪个更适合你?

    Python和Go:转行者该如何抉择职业发展? 对于打算转行的程序员来说,Python和Go无疑是备受关注的两种语言。作为C#开发人员,如何在这两者之间做出选择? Python Python以其庞大的生态系统和完善的社区而著称。它在人工智能、机器学习和数据科学等领域应用广泛。Python的易用性和对…

    2025年12月13日
    000
  • Python 线程加锁:范围越小越好,但这真的总是对的吗?

    python线程中加锁范围越小越好 问题说明 在python线程中,使用锁保证多个线程对共享数据的并发访问时,加锁的范围是一个重要的问题,是将锁放在循环外面还是里面。 对比两种情况 立即学习“Python免费学习笔记(深入)”; 把锁放在循环外面: 代码如下: from threading impo…

    2025年12月13日
    000
  • 为什么我在 PyCharm 中无法使用 nltk 包?

    pycharm 下 nltk 包无法使用 在 pycharm 中即使下载了 nltk 包,却发现无法调用其功能。例如,运行以下代码会导致错误: import nltkfor_test = ‘Xi is the chairman of China in the year 2013.’print(nlt…

    2025年12月13日
    000
  • 在 PyCharm 中安装 NLTK 后,为何无法使用 word_tokenize 函数?

    无法调用 nltk 包 在 pycharm 中安装 nltk 后,但无法调用其功能怎么办? 问题详情: import nltkfor_test = ‘xi is the chairman of china in the year 2013.’print(nltk.word_tokenize(for_…

    2025年12月13日
    000
  • Python 类中调用自身时,为什么每次输出的 id 都不相同?

    python类的内存分配问题:类调用自身时为什么id不同? 在python类中,当调用自身并返回时,每次输出的id都不同,这与我们的直观理解相悖。 问题所在是Chain(‘%s/%s’%(self._path, path))这一行代码。它在调用自身时创建了一个新的Chain实…

    2025年12月13日
    000
  • Python 类中的链式调用:为什么每次输出对象的 id 都不同?

    类中的内存分配问题 在 python 中创建类时,每个对象的内存分配方式可能令初学者感到困惑。例如,在链式调用的情况下,每次输出对象的 id 时都会看到不同的值。 让我们从一个示例代码开始: # 链式调用class chain(object): def __init__(self, path=”)…

    2025年12月13日
    000
  • Python 类链式调用时,为何每次输出的 id 都不一样?

    python 类的内存分配问题 在 python 中,当使用类创建对象时,每个对象都占据着独立的内存空间。但是,有时在对类进行链式调用时,会发现每次输出的 id 都不同,这是为什么呢? 让我们看一个代码示例: class chain(object): def __init__(self, path=…

    2025年12月13日
    000
  • 了解工厂和工厂方法设计模式

    什么是工厂类?工厂类是一种创建一个或多个不同类的对象的类。 工厂模式可以说是软件工程中最常用的设计模式。在本文中,我将使用一个简单的示例问题深入解释简单工厂和工厂方法设计模式。 简单工厂模式 假设我们要创建一个支持两种动物(例如狗和猫)的系统,每个动物类别都应该有一个方法来发出动物的声音类型。现在,…

    2025年12月13日
    000
  • Python 类中链式调用为何导致 ID 改变?

    Python 类中 ID 变更的原因:链式调用 在类中,调用自身并返回时,发现每次输出的 ID 不同。这是因为在 Python 中,链式调用会创建一个新实例。 在提供的代码示例中,Chain 类使用特殊方法 __getattr__ 实现链式调用。当调用 chain.Wenzhou 时,__getat…

    2025年12月13日
    000
  • Python 链式调用中,为什么每次调用__getattr__都会创建新的实例?

    pyhton类的内存分配问题:链式调用 在使用__getattr__魔术方法实现链式调用时,每次调用返回的chain实例的id会不同。这是因为每当调用__getattr__时,都会创建一个新的chain实例。 原因分析 在提供的代码示例中,__getattr__方法中的代码会创建一个新的chain实…

    2025年12月13日
    000
  • Python 中“can’t set attribute”错误:如何解决属性设置问题?

    python中“can’t set attribute”错误的解决 在python开发中,有时会遇到类似“can’t set attribute”这样的错误。这种错误通常与属性的设置或访问相关。 你的代码片段中出现了以下问题: 属性名称不一致:你定义的属性是“gettest1…

    2025年12月13日
    000
  • Django 部署时无法识别自定义模板标签:如何解决?

    django 部署时出现自定义模板标签无法识别错误 在使用 uwsgi 和 nginx 部署 django 项目时,可能会遇到无法识别自定义模板标签的错误。 错误信息 错误信息通常类似于: templatesyntaxerror: could not parse the remainder: ‘{x…

    2025年12月13日
    000

发表回复

登录后才能评论
关注微信