
本文探讨了在Python中对NumPy密集型计算进行多进程加速时遇到的常见性能瓶颈。通过分析数据序列化和复制的开销,我们揭示了为何传统的process_map可能适得其反。文章提供了一种基于multiprocessing.Manager共享内存的优化方案,有效避免了重复数据复制,从而显著提升了计算效率,并给出了详细的实现代码和最佳实践。
理解Python多进程/多线程加速的挑战
在python中处理大量计算密集型任务时,利用多核cpu进行并行计算是提高效率的常见方法。对于cpu密集型任务,由于python的全局解释器锁(gil)限制,多线程通常无法实现真正的并行计算,而多进程(multiprocessing)则通过创建独立的python解释器进程来绕过gil,从而实现并行执行。
然而,在使用multiprocessing库或其高级封装(如tqdm.contrib.concurrent.process_map)时,开发者有时会发现性能不升反降,尤其是在处理大型数据结构(如NumPy数组)时。这背后的主要原因在于进程间通信(IPC)的开销,特别是数据序列化和反序列化(即所谓的“pickling”和“unpickling”)过程。
当我们将一个Python对象作为参数传递给一个新创建的子进程时,该对象不会直接在进程间共享内存。相反,它会被序列化(pickled),然后复制到子进程的内存空间中,子进程再对其进行反序列化(unpickled)。对于小型数据,这个开销可以忽略不计,但对于像numpy.ndarray这样的大型数据结构,每次任务调用都进行这种复制操作会消耗大量CPU时间和内存带宽,最终成为整个并行计算的瓶颈。
考虑以下一个模拟NumPy密集型计算的例子,它展示了process_map在处理大型数组时的效率问题:
import timeimport numpy as npfrom tqdm.auto import tqdmfrom tqdm.contrib.concurrent import process_map, thread_map# 模拟生成大型数据集def mydataset(size, length): for ii in range(length): yield np.random.rand(*size)# 模拟耗时计算函数def calc(mat): # 模拟一些耗时的NumPy计算 for ii in range(1000): avg = np.mean(mat) std = np.std(mat) return avg, stddef main_original_test(): ds = list(mydataset((500, 500), 100)) # 100个500x500的NumPy数组 print("--- 原始测试结果 ---") t0 = time.time() res1 = [] for mat in tqdm(ds): res1.append(calc(mat)) print(f'for loop: {time.time() - t0:.2f}s') t0 = time.time() res2 = list(map(calc, tqdm(ds))) print(f'native map: {time.time() - t0:.2f}s') t0 = time.time() res3 = process_map(calc, ds) # 使用process_map print(f'process map: {time.time() - t0:.2f}s') t0 = time.time() res4 = thread_map(calc, ds) # 使用thread_map print(f'thread map: {time.time() - t0:.2f}s')if __name__ == '__main__': main_original_test()
上述代码在某些环境下可能产生如下结果:
立即学习“Python免费学习笔记(深入)”;
for loop: 51.88snative map: 52.49sprocess map: 71.06s # 明显慢于for循环thread map: 42.04s # 略快,但未充分利用多核
可以看到,process_map的执行时间甚至超过了简单的for循环,这正是由于每次调用calc函数时,整个NumPy数组mat都需要被序列化并复制到子进程,导致了巨大的性能开销。thread_map虽然略快,但由于GIL的存在,其加速效果有限。
优化策略:利用共享内存避免数据复制
解决上述问题的关键在于避免在每次任务调用时重复复制大型数据。Python的multiprocessing模块提供了一种解决方案:Manager。Manager对象可以创建一个服务进程,该进程管理共享的Python对象,并允许其他进程通过代理对象来访问这些共享对象。这样,大型数据只需复制一次到Manager的内存中,后续的子进程通过引用来访问,大大减少了进程间通信的开销。
对于我们的NumPy数组列表,我们可以使用Manager().list()来创建一个共享列表。然后,子进程通过列表的索引来访问特定的NumPy数组,而不是直接传递整个数组。
以下是使用multiprocessing.Manager和Pool.starmap进行优化的示例代码:
import timeimport numpy as npfrom multiprocessing import Pool, Manager# 模拟生成大型数据集def mydataset(size, length): for ii in range(length): yield np.random.rand(*size)# 适应共享内存的计算函数# 现在接收数据索引和共享列表作为参数def calc_optimized(idx, mat_list): # 从共享列表中获取NumPy数组 mat = mat_list[idx] # 模拟一些耗时的NumPy计算 for ii in range(1000): avg = np.mean(mat) std = np.std(mat) return avg, stddef main_optimized_test(): ds = list(mydataset((500, 500), 100)) # 原始数据集 # 1. 创建进程池 # 建议根据CPU核心数设置,例如os.cpu_count() num_processes = 4 mypool = Pool(num_processes) # 2. 创建Manager并生成共享列表 manager = Manager() # 将原始数据集一次性复制到Manager管理的共享列表中 mylist = manager.list(ds) print(f"n--- 优化后测试结果 ({num_processes} 进程) ---") t0 = time.time() # 使用starmap传递多个参数:数据索引和共享列表 # zip(range(len(ds)), [mylist]*len(ds)) 为每个任务生成 (索引, 共享列表) 对 res_optimized = mypool.starmap(calc_optimized, zip(range(len(ds)), [mylist]*len(ds))) print(f"map with manager: {time.time() - t0:.2f}s") # 关闭进程池 mypool.close() mypool.join() manager.shutdown() # 关闭Manager进程if __name__ == '__main__': main_optimized_test()
运行上述优化后的代码,其输出结果可能如下:
map with manager: 1.94s
与原始的for循环和process_map相比,性能提升是巨大的。这验证了通过Manager实现共享内存,避免重复数据复制,是解决此类问题的有效途径。
实现细节与注意事项
multiprocessing.Manager: Manager创建了一个单独的进程,该进程负责管理共享对象(如列表、字典等)。其他进程通过代理对象与Manager进程通信来访问这些共享对象。Manager().list(): 当你将一个可迭代对象(如ds)传递给manager.list()时,Manager会将ds中的所有元素一次性复制到其管理的共享列表中。此后,子进程通过mylist[idx]访问数据时,无需再次进行序列化和复制。Pool.starmap(): starmap适用于需要向目标函数传递多个参数的情况。在我们的例子中,calc_optimized函数需要idx(数据索引)和mat_list(共享列表)。zip(range(len(ds)), [mylist]*len(ds))生成了一个迭代器,其中每个元素都是一个元组(idx, mylist),starmap会将这些元组解包作为calc_optimized的参数。进程数量: Pool(num_processes)中的num_processes应根据你的CPU核心数进行调整。对于CPU密集型任务,通常设置为CPU的核心数或核心数减一可以获得最佳性能。资源管理: 在使用Pool和Manager后,务必调用mypool.close()、mypool.join()和manager.shutdown()来正确关闭进程池和Manager进程,释放系统资源。数据可变性: Manager管理的共享对象是可变的。如果多个进程需要修改同一个共享对象,需要额外考虑同步机制(如锁),以避免竞态条件。在我们的例子中,calc_optimized只是读取数据,因此无需额外同步。适用场景: 这种方法最适用于:处理大量相同结构但数据不同的任务。每个任务都需要访问一个或多个大型数据集。数据在任务执行期间是只读的或修改后不需要立即同步回主进程的。
总结
在Python中对NumPy等库进行计算密集型任务的并行加速时,简单地使用multiprocessing.Pool或process_map可能因数据序列化和反序列化的开销而导致性能下降。通过深入理解其背后的机制,我们发现对于大型数据集,利用multiprocessing.Manager创建共享内存是避免重复数据复制、显著提升并行计算效率的关键。这种方法将数据一次性加载到共享内存,后续子进程通过索引访问,从而消除了主要的性能瓶颈,实现了高效的并行处理。在实际应用中,务必根据任务特性和数据规模选择合适的并行策略。
以上就是加速Python中NumPy密集型计算的多进程优化策略的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1372954.html
微信扫一扫
支付宝扫一扫