
本文旨在介绍如何利用 Java 并行流高效地处理大型列表,尤其是在每个元素的处理过程耗时较长的情况下。并行流能够将列表分割成多个子任务,并在多个线程上并发执行,从而显著提升处理速度。但同时,并发编程也带来了共享资源同步的问题,需要谨慎处理。
使用并行流并发处理列表
假设我们有一个 Foo 类,其 process 方法需要处理一个 Bar 类型的列表,并且 handle 方法的处理过程比较耗时。为了提高效率,我们可以将列表分割成多个子列表,然后使用并行流并发处理每个子列表。
import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;class Foo { private int len; public Foo(int len) { this.len = len; } public void process(List list) { List<List> sublists = new ArrayList(); for (int i = 0; i < list.size(); i += len) { sublists.add(list.subList(i, Math.min(i + len, list.size()))); } // 并行处理子列表 sublists.parallelStream() .forEach(this::handle); } private void handle(List sublist) { // 耗时的处理逻辑 System.out.println("Processing sublist: " + sublist); try { Thread.sleep(100); // 模拟耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } }}class Bar { private int id; public Bar(int id) { this.id = id; } @Override public String toString() { return "Bar{" + "id=" + id + '}'; }}public class Main { public static void main(String[] args) { List list = new ArrayList(); for (int i = 0; i < 10; i++) { list.add(new Bar(i)); } Foo foo = new Foo(3); foo.process(list); }}
在这个例子中,我们首先将原始列表分割成多个大小为 len 的子列表。然后,我们使用 sublists.parallelStream().forEach(this::handle) 并行处理每个子列表。parallelStream() 方法将列表转换为并行流,forEach() 方法对流中的每个元素执行指定的操作。
收集并行处理的结果
如果 handle 方法返回一个结果,并且我们需要收集所有结果,可以使用 map 和 collect 方法。
import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;class Foo { private int len; public Foo(int len) { this.len = len; } public List process(List list) { List<List> sublists = new ArrayList(); for (int i = 0; i < list.size(); i += len) { sublists.add(list.subList(i, Math.min(i + len, list.size()))); } // 并行处理子列表并收集结果 return sublists.parallelStream() .map(this::handle) .collect(Collectors.toList()); } private String handle(List sublist) { // 耗时的处理逻辑,并返回结果 System.out.println("Processing sublist: " + sublist); try { Thread.sleep(100); // 模拟耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } return "Result of " + sublist; }}class Bar { private int id; public Bar(int id) { this.id = id; } @Override public String toString() { return "Bar{" + "id=" + id + '}'; }}public class Main { public static void main(String[] args) { List list = new ArrayList(); for (int i = 0; i < 10; i++) { list.add(new Bar(i)); } Foo foo = new Foo(3); List results = foo.process(list); System.out.println("Results: " + results); }}
在这个例子中,handle 方法返回一个字符串结果。我们使用 sublists.parallelStream().map(this::handle).collect(Collectors.toList()) 并行处理每个子列表,并将结果收集到一个列表中。map 方法将流中的每个元素转换为另一个元素,collect 方法将流中的所有元素收集到一个集合中。
表单大师AI
一款基于自然语言处理技术的智能在线表单创建工具,可以帮助用户快速、高效地生成各类专业表单。
74 查看详情
并发环境下的同步问题
需要注意的是,当 handle 方法访问共享资源时,需要进行同步处理,以避免出现线程安全问题。例如,如果 handle 方法需要修改一个共享的变量,可以使用 synchronized 关键字或 java.util.concurrent 包中的并发工具类来保证线程安全。
import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import java.util.stream.Collectors;class Foo { private int len; private AtomicInteger counter = new AtomicInteger(0); // 使用 AtomicInteger 保证线程安全 public Foo(int len) { this.len = len; } public void process(List list) { List<List> sublists = new ArrayList(); for (int i = 0; i < list.size(); i += len) { sublists.add(list.subList(i, Math.min(i + len, list.size()))); } // 并行处理子列表 sublists.parallelStream() .forEach(this::handle); } private void handle(List sublist) { // 耗时的处理逻辑,并访问共享资源 System.out.println("Processing sublist: " + sublist); try { Thread.sleep(100); // 模拟耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } // 原子性地增加计数器 counter.addAndGet(sublist.size()); } public int getCounter() { return counter.get(); }}class Bar { private int id; public Bar(int id) { this.id = id; } @Override public String toString() { return "Bar{" + "id=" + id + '}'; }}public class Main { public static void main(String[] args) { List list = new ArrayList(); for (int i = 0; i < 10; i++) { list.add(new Bar(i)); } Foo foo = new Foo(3); foo.process(list); // 等待所有任务完成 try { Thread.sleep(2000); // 确保所有任务都已完成 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Counter: " + foo.getCounter()); }}
在这个例子中,我们使用 AtomicInteger 来保证计数器的线程安全。AtomicInteger 提供了原子性的 addAndGet 方法,可以安全地增加计数器的值。
总结
并行流是 Java 中一种强大的并发处理工具,可以显著提高列表处理的效率。但是,在使用并行流时,需要注意共享资源的同步问题,并选择合适的并发工具类来保证线程安全。同时,需要合理分割任务,避免过多的线程切换带来的性能损耗。
以上就是并发处理共享列表并收集结果的方案的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/211079.html
微信扫一扫
支付宝扫一扫