
本教程详细介绍了如何在java中利用`executors`框架和`executorservice`来限制并发执行的线程数量。通过将任务封装为`runnable`,并使用`executors.newfixedthreadpool()`创建固定大小的线程池,可以有效地管理资源并控制并发级别。文章还涵盖了任务提交、线程池的优雅关闭机制以及相关的最佳实践,旨在提供一个清晰、专业的并发编程指南。
引言
在Java应用程序开发中,面对需要并行处理大量任务的场景时,合理地管理并发线程至关重要。直接创建无限数量的线程可能导致系统资源耗尽、性能下降甚至程序崩溃。为了解决这一问题,Java 5引入了java.util.concurrent包,其中的Executors框架为我们提供了一套强大的工具来管理线程池,从而有效地限制和控制并发线程的数量。本教程将指导您如何使用ExecutorService来创建一个固定大小的线程池,以处理并发任务,并确保资源的有效利用。
核心概念:Executors框架与ExecutorService
Executors框架是Java并发编程的核心组件之一,它提供了一系列工厂方法来创建不同类型的ExecutorService实例。ExecutorService是一个高级接口,用于管理线程的生命周期和任务的提交。通过使用线程池,我们可以重用线程,而不是为每个任务都创建新线程,这大大降低了线程创建和销毁的开销。
为了限制并发线程的数量,最常用的方法是使用Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。当有新任务提交时,如果池中所有线程都在忙碌,那么新任务将被放入一个等待队列中,直到有空闲线程可用。
定义并发任务:Runnable接口
在使用ExecutorService之前,我们需要将要并行执行的逻辑封装成一个任务。Java提供了两个核心接口来定义任务:Runnable和Callable。
立即学习“Java免费学习笔记(深入)”;
Runnable: 适用于不需要返回结果且不抛出受检异常的任务。它只包含一个run()方法。Callable: 适用于需要返回结果且可能抛出受检异常的任务。它包含一个call()方法,并返回一个Future对象。
在本教程中,我们将以一个文件序列化任务为例,使用Runnable接口来定义任务。假设我们有一个EventuelleDestination对象列表,需要为每个对象执行序列化操作,并将结果写入文件。
首先,定义一个EventuelleDestination及其相关依赖的模拟类,以便构建完整的示例:
易优微信教育培训小程序模板
易优微信教育培训小程序模板是基于前端开源小程序+后端易优cms+标签化API接口,是一套开源、快速搭建个性化需求的小程序CMS。轻量级TP底层框架,前后端分离,标签化API接口可对接所有小程序,支持二次开发。即使小白用户也能轻松搭建制作一套完整的线上版小程序。 微信教育培训小程序模板主要特点:1、代码开源,支持二次修改2、微信原生写法,兼容性更好,代码可读性更强3、功能接口完整,支持eyoucms
0 查看详情
// EventuelleDestination.java - 模拟业务对象package com.example.concurrency;import java.util.Objects;public class EventuelleDestination { private int id; private Acceuillant eventuelAcceuillant; public EventuelleDestination(int id, Acceuillant acceuillant) { this.id = id; this.eventuelAcceuillant = acceuillant; } public int getId() { return id; } public Acceuillant getEventuelAcceuillant() { return eventuelAcceuillant; } @Override public String toString() { return "EventuelleDestination{" + "id=" + id + ", acceuillantId=" + eventuelAcceuillant.getId() + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; EventuelleDestination that = (EventuelleDestination) o; return id == that.id && Objects.equals(eventuelAcceuillant, that.eventuelAcceuillant); } @Override public int hashCode() { return Objects.hash(id, eventuelAcceuillant); }}// Acceuillant.java - 模拟嵌套对象package com.example.concurrency;import java.util.Objects;public class Acceuillant { private int id; public Acceuillant(int id) { this.id = id; } public int getId() { return id; } @Override public String toString() { return "Acceuillant{" + "id=" + id + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Acceuillant that = (Acceuillant) o; return id == that.id; } @Override public int hashCode() { return Objects.hash(id); }}// EmployeDao.java - 模拟数据访问层package com.example.concurrency;public class EmployeDao { public Employe getEmploye() { return new Employe(100); }}// Employe.java - 模拟员工对象package com.example.concurrency;public class Employe { private int id; public Employe(int id) { this.id = id; } public int getId() { return id; }}// EntrepriseDao.java - 模拟数据访问层package com.example.concurrency;public class EntrepriseDao { public int retrouveEmplacementIdParDepartementId(int departmentId) { // 模拟耗时操作或业务逻辑 try { Thread.sleep(5 + (int)(Math.random() * 95)); // 模拟随机耗时 5-100ms } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted during mock DAO call", e); } return departmentId + 500; }}
接下来,我们将序列化逻辑封装到SerializationTask类中,它实现了Runnable接口:
// SerializationTask.java - 封装序列化任务package com.example.concurrency;import com.google.gson.Gson;import java.io.FileWriter;import java.io.IOException;import java.io.Writer;import java.nio.file.Path;import java.nio.file.Files;public class SerializationTask implements Runnable { private final EventuelleDestination destination; private final Path outputDirectory; private final EmployeDao employeDao; private final EntrepriseDao entrepriseDao; public SerializationTask(EventuelleDestination destination, Path outputDirectory, EmployeDao employeDao, EntrepriseDao entrepriseDao) { this.destination = destination; this.outputDirectory = outputDirectory; this.employeDao = employeDao; this.entrepriseDao = entrepriseDao; } @Override public void run() { Gson gson = new Gson(); try { // 确保输出目录存在 Files.createDirectories(outputDirectory); String filename = employeDao.getEmploye().getId() + "_" + entrepriseDao.retrouveEmplacementIdParDepartementId(destination.getEventuelAcceuillant().getId()) + "_" + destination.getEventuelAcceuillant().getId() + ".json"; Path filePath = outputDirectory.resolve(filename); try (Writer writer = new FileWriter(filePath.toFile())) { gson.toJson(destination, writer); System.out.println(Thread.currentThread().getName() + ": " + destination + " has been serialized to " + filePath); } } catch (IOException e) { System.err.println(Thread.currentThread().getName() + ": Error serializing " + destination + ": " + e.getMessage()); e.printStackTrace(); } catch (RuntimeException e) { // 捕获模拟DAO中可能抛出的RuntimeException System.err.println(Thread.currentThread().getName() + ": Runtime error during serialization of " + destination + ": " + e.getMessage()); e.printStackTrace(); } }}
使用ExecutorService管理线程池
现在我们有了定义好的任务,接下来将使用ExecutorService来创建固定大小的线程池,并提交这些任务。
// AppExecutorDemo.java - 主应用程序package com.example.concurrency;import java.nio.file.Path;import java.nio.file.Paths;import java.time.Instant;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.stream.IntStream;public class AppExecutorDemo { // 定义输出目录 private final Path outputDir = Paths.get("serialized_data"); public static void main(String[] args) { AppExecutorDemo app = new AppExecutorDemo(); app.runDemo(); } private void runDemo() { // 准备模拟数据和依赖 EmployeDao employeDao = new EmployeDao(); EntrepriseDao entrepriseDao = new EntrepriseDao(); // 创建20个 EventuelleDestination 对象作为任务数据 List destinations = IntStream.rangeClosed(1, 20) .mapToObj(i -> new EventuelleDestination(i, new Acceuillant(i * 10))) .toList(); // 创建一个固定大小为3的线程池 ExecutorService executorService = Executors.newFixedThreadPool(3); System.out.println("ExecutorService created with 3 threads. Submitting tasks..."); // 提交每个序列化任务到线程池 for (EventuelleDestination dest : destinations) { executorService.submit(new SerializationTask(dest, outputDir, employeDao, entrepriseDao)); } System.out.println("All tasks submitted. Awaiting termination..."); // 优雅地关闭线程池 shutdownAndAwaitTermination(executorService); System.out.println("ExecutorService terminated. All tasks completed or cancelled."); } /** * 优雅地关闭ExecutorService,等待已提交任务完成。 * 此方法基于JavaDoc中ExecutorService的推荐关闭模式。 * * @param executorService 要关闭的ExecutorService实例 */ private void shutdownAndAwaitTermination(ExecutorService executorService) { executorService.shutdown(); // 禁用新任务提交 try { // 等待已提交任务在指定时间内完成 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); // 如果超时,则取消当前正在执行的任务 // 再次等待,确保任务响应中断 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { System.err.println("Executor service did not terminate completely after forced shutdown. " + Instant.now()); } } } catch (InterruptedException ex) { // 如果当前线程在等待期间被中断,则重新取消任务 executorService.shutdownNow(); // 重新设置中断状态 Thread.currentThread().interrupt(); } }}
运行上述AppExecutorDemo类,您将看到类似以下的输出(具体的线程ID和时间戳会有所不同,但关键是pool-1-thread-X的数量不会超过3):
ExecutorService created with 3 threads. Submitting tasks...All tasks submitted. Awaiting termination...pool-1-thread-1: EventuelleDestination{id=1, acceuillantId=10} has been serialized to serialized_data/100_510_10.jsonpool-1-thread-2: EventuelleDestination{id=2, acceuillantId=20} has been serialized to serialized_data/100_520_20.jsonpool-1-thread-3: EventuelleDestination{id=3, acceuillantId=30} has been serialized to serialized_data/100_530_30.jsonpool-1-thread-1: EventuelleDestination{id=4, acceuillantId=40} has been serialized to serialized_data/100_540_40.jsonpool-1-thread-2: EventuelleDestination{id=5, acceuillantId=50} has been serialized to serialized_data/100_550_50.jsonpool-1-thread-3: EventuelleDestination{id=6, acceuillantId=60} has been serialized to serialized_data/100_560_60.json... (输出将继续,但始终只有3个线程在活跃地执行任务)ExecutorService terminated. All tasks completed or cancelled.
从输出中可以看到,尽管我们提交了20个任务,但实际执行任务的线程(例如pool-1-thread-1、pool-1-thread-2、pool-1-thread-3)只有3个,这正是newFixedThreadPool(3)所实现的效果。
优雅关闭ExecutorService
正确关闭ExecutorService是并发编程中的一个重要环节。如果不在应用程序退出前关闭线程池,可能会导致程序无法正常终止,或者资源泄漏。shutdownAndAwaitTermination方法提供了一种优雅的关闭机制:
executorService.shutdown(): 启动有序关闭,不再接受新任务,但会允许已提交的任务(包括等待队列中的任务)完成执行。executorService.awaitTermination(timeout, unit): 阻塞当前线程,直到所有任务完成执行,或者超时发生,或者当前线程被中断。executorService.shutdownNow(): 如果awaitTermination超时,表示任务未能及时完成,此时可以调用shutdownNow()尝试立即停止所有正在执行的任务,并清空等待队列。此方法会向所有正在执行的线程发送中断信号。
注意事项与最佳实践
选择合适的线程池大小: newFixedThreadPool()的线程数量应根据CPU核心数、任务类型(CPU密集型或I/O密集型)和系统资源进行权衡。对于CPU密集型任务,通常建议线程数接近CPU核心数;对于I/O密集型
以上就是Java并发编程:使用ExecutorService限制线程数量的教程的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1058850.html
微信扫一扫
支付宝扫一扫