提升Python数据处理性能:从多线程到多进程的优化实践

提升Python数据处理性能:从多线程到多进程的优化实践

本文探讨了在Python中处理大规模数据列表匹配和筛选时的性能瓶颈。针对传统多线程在CPU密集型任务中受限于GIL的局限性,文章提出并详细阐述了如何利用Python的multiprocessing模块,通过创建独立的进程来并行化任务,从而显著提升数据处理效率。文章提供了完整的代码示例和专业解析,帮助读者理解并应用多进程技术优化Python程序的性能。

1. Python并发编程的挑战:GIL与CPU密集型任务

python中,处理大量数据(例如超过23,000条json记录与3,000多个标记进行匹配)往往会面临性能挑战。当需要对这些数据进行复杂计算或字符串相似度比较等cpu密集型操作时,程序的执行时间可能会非常长。

一个常见的优化思路是使用并发编程,例如Python的threading模块。然而,对于CPU密集型任务,Python的全局解释器锁(Global Interpreter Lock, GIL)是一个显著的限制。GIL确保在任何给定时刻,只有一个线程能够执行Python字节码。这意味着即使创建了多个线程,它们也无法真正地并行执行CPU密集型任务,因为它们必须轮流获取GIL,导致多线程在CPU密集型场景下并不能带来显著的性能提升,甚至可能因为线程切换的开销而略微降低性能。

2. 问题场景与初始尝试

假设我们有两个列表:一个包含字典的json_list(例如用户数据,每个字典含有一个”code”字段),另一个是marking列表(包含需要匹配的字符串)。我们的目标是从json_list中找出与marking列表中的每个元素具有高相似度”code”的项,并将匹配到的标记和数据收集起来。

初始的尝试可能如下所示,使用threading模块来尝试并行化匹配过程:

import mathimport threadingfrom difflib import SequenceMatcher# 示例数据(实际数据量远大于此)json_list = [    {"code": "001123", "phone_number": "...", "email": "...", "address": "...", "note": ""},    {"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""},    {"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""},    {"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""},    # ... 更多数据]marking = ["654564", "hj876", "8768"] # ... 更多标记def find_marking(x, y):    """    比较标记x与数据y的'code'字段的相似度。    """    text_match = SequenceMatcher(None, x, y.get('code')).ratio()    if text_match == 1 or (0.98 <= text_match < 0.99):        return y    return Nonedef eliminate_marking_threaded(marking_list, json_list):    result, result_mark = [], []    # 这里的内部函数及对data_scrap的修改存在并发问题和GIL限制    # 实际场景中,对共享列表的pop/remove操作需要更复杂的同步机制    # 且因为GIL,多线程在此处并不能带来性能提升    def __process_eliminate(marking_item, data_scrap_copy):        for data in data_scrap_copy: # 遍历副本            result_data = find_marking(marking_item, data)            if result_data:                # 注意:这里的append操作如果直接对外部result/result_mark进行,                # 需要加锁。且data_scrap_copy的remove只影响副本。                # 实际代码中需要更严谨的共享数据处理。                # 此处仅为说明多线程尝试的局限性。                # result_mark.append(marking_item)                # result.append(result_data)                return    threads = []    # 针对每个marking创建线程,但由于GIL,实际不会并行执行    for m in marking_list:        # 传递json_list的副本以避免部分并发问题,但仍受GIL限制        th = threading.Thread(target=__process_eliminate, args=(m, json_list[:]))        th.start()        threads.append(th)    for thread in threads:        thread.join()    return result_mark, result # 在这个简单的多线程示例中,result/result_mark不会被正确填充# 运行此代码会发现性能提升不明显,甚至可能更慢# eliminated_markings, eliminated_data = eliminate_marking_threaded(marking, json_list)

如上所述,尽管使用了threading,但由于GIL的存在,这种方法在CPU密集型任务中无法实现真正的并行计算,耗时依然较长。

立即学习“Python免费学习笔记(深入)”;

3. 利用multiprocessing实现真正的并行计算

为了克服GIL的限制,Python提供了multiprocessing模块。它允许程序创建独立的进程,每个进程都有自己的Python解释器和内存空间,因此它们可以真正地并行执行CPU密集型任务,不受GIL的影响。

3.1 核心思路

进程而非线程:使用Process代替Thread。数据共享:由于每个进程有独立的内存空间,共享数据需要特殊机制,例如multiprocessing.Manager来创建可在进程间共享的数据结构(如列表、字典)。任务分发:将大型任务(如marking_list)分割成更小的块(chunk),然后将这些块分发给不同的进程进行处理。

3.2 multiprocessing实现示例

import mathfrom difflib import SequenceMatcherfrom multiprocessing import Process, Managerimport time # 用于计时演示# 模拟大规模数据# 注意:实际运行时请替换为您的真实数据json_list_large = []for i in range(25000):    json_list_large.append({"code": f"{i:06d}", "phone_number": "...", "email": "...", "address": "...", "note": ""})json_list_large.append({"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""})json_list_large.append({"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""})json_list_large.append({"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""})marking_large = []for i in range(3500):    marking_large.append(f"{i:06d}")marking_large.extend(["654564", "hj876", "8768"])def find_marking(x, y):    """    比较标记x与数据y的'code'字段的相似度。    """    text_match = SequenceMatcher(None, x, y.get('code')).ratio()    if text_match == 1 or (0.98 <= text_match < 0.99):        return y    return Nonedef eliminate_marking_multiprocess(marking_list, json_list):    """    使用多进程并行处理标记列表,从json_list中查找匹配项。    """    manager = Manager()    result_mark = manager.list() # 共享列表,用于存储匹配的标记    result = manager.list()      # 共享列表,用于存储匹配的数据    def __process_eliminate_chunk(sub_marking_list, data_scrap_copy, shared_result_mark, shared_result):        """        每个进程执行的函数,处理一部分标记列表。        data_scrap_copy 是 json_list 的一个副本,进程对其的修改不会影响原始 json_list。        """        for marking_item in sub_marking_list:            for data in data_scrap_copy: # 遍历json_list的副本                result_data = find_marking(marking_item, data)                if result_data:                    # 将结果添加到共享列表中                    shared_result_mark.append(marking_item)                    shared_result.append(result_data)                    # 注意:这里从data_scrap_copy中移除元素,只影响当前进程的副本,                    # 且为了避免重复匹配,一旦找到一个匹配就跳出内层循环。                    # 如果需要从原始json_list中“消除”,则需要更复杂的同步机制或在主进程中处理。                    # data_scrap_copy.remove(data) # 如果需要确保每个标记只匹配一次,且从副本中移除                    break # 找到匹配后,当前marking_item处理完毕,检查下一个marking_item    processes = []    # 根据CPU核心数或经验值设置chunk_size和num_processes    # chunk_size决定了每个进程处理多少个marking    chunk_size = max(1, len(marking_list) // (2 * (len(marking_list) // 1000 + 1))) # 动态调整chunk_size    num_processes = math.ceil(len(marking_list) / chunk_size)    print(f"Total markings: {len(marking_list)}, Chunk size: {chunk_size}, Number of processes: {num_processes}")    for i in range(num_processes):        start_idx = i * chunk_size        end_idx = min((i + 1) * chunk_size, len(marking_list))        sub_marking_list = marking_list[start_idx:end_idx]        if not sub_marking_list:            continue        p = Process(            target=__process_eliminate_chunk,            # 传递json_list的副本给每个进程,避免进程间直接修改原始大列表的复杂同步问题            args=(sub_marking_list, json_list[:], result_mark, result)        )        processes.append(p)        p.start() # 启动进程    for p in processes:        p.join() # 等待所有进程完成    manager.shutdown() # 关闭Manager,释放资源    return list(result_mark), list(result) # 将Manager.list转换为普通Python列表# 运行多进程版本print("Starting multiprocessing elimination...")start_time = time.time()eliminated_markings, eliminated_data = eliminate_marking_multiprocess(marking_large, json_list_large)end_time = time.time()print(f"Multiprocessing finished in {end_time - start_time:.2f} seconds.")print(f"Found {len(eliminated_markings)} matches.")# print("Eliminated Markings:", eliminated_markings[:5]) # 打印前5个示例# print("Eliminated Data:", eliminated_data[:5]) # 打印前5个示例

3.3 代码解析与注意事项

multiprocessing.Manager:Manager() 创建一个管理器对象,它允许你创建可在不同进程间共享的Python对象。manager.list() 创建一个可以在多个进程中安全访问和修改的列表。这解决了传统列表在多进程环境下修改时可能出现的竞争条件和数据不一致问题。result_mark 和 result 就是通过这种方式创建的共享列表。任务分块 (chunk_size):marking_list 被分割成若干个子列表(sub_marking_list)。每个进程负责处理一个子列表。合理设置 chunk_size 很重要。过小的块可能导致进程创建和管理的开销过大;过大的块可能导致某些进程负载不均。可以根据实际CPU核心数和任务特性进行调整。进程创建与执行:Process(target=__process_eliminate_chunk, args=(…)) 创建一个新进程,并指定其执行的函数和传递的参数。p.start() 启动进程。p.join() 等待子进程完成。主进程会阻塞,直到所有子进程都执行完毕。json_list[:] 的作用:在 args=(sub_marking_list, json_list[:], …) 中,json_list[:] 创建了 json_list 的一个浅拷贝。这意味着每个子进程都会收到 json_list 的一个独立副本。重要提示:如果子进程内部对 data_scrap_copy(即 json_list 的副本)进行 remove 操作,这只会影响该进程自身的副本,而不会修改原始的 json_list。如果目标是实际从原始 json_list 中移除匹配项,则需要更复杂的策略,例如让每个进程返回其匹配到的项的索引,然后在主进程中统一处理移除,或者使用 Manager().list() 来包装 json_list 并进行同步操作,但这会引入更多的复杂性和潜在的性能瓶颈。在当前示例中,我们主要关注的是收集匹配的标记和数据,而不是原地修改原始 json_list。manager.shutdown():在所有进程完成工作后,调用 manager.shutdown() 来关闭管理器并释放其资源。结果转换:Manager().list() 返回的对象是特殊的代理对象。为了在主进程中像普通列表一样操作它们,通常需要将其转换为标准的Python列表,例如 list(result_mark)。

4. 总结

当Python程序遇到CPU密集型任务,且多线程无法带来性能提升时,multiprocessing模块是更优的选择。通过创建独立的进程,multiprocessing能够绕过GIL的限制,实现真正的并行计算,从而显著缩短程序的执行时间。在使用multiprocessing时,需要注意进程间数据共享的机制(如Manager)以及任务分发策略,以确保程序的正确性和高效性。合理地应用多进程技术,可以有效提升Python在处理大规模数据和计算密集型任务时的性能表现。

以上就是提升Python数据处理性能:从多线程到多进程的优化实践的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1375098.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 14:42:55
下一篇 2025年12月14日 14:43:12

相关推荐

  • Python模块间全局变量共享:理解import *的陷阱与正确实践

    本文深入探讨了在Python和Pygame应用中,跨模块共享全局变量时常遇到的作用域问题。通过分析from module import *语句可能导致的变量副本创建,而非共享同一实例的机制,文章提出并详细阐述了使用import module后通过module.variable_name方式访问变量的…

    好文分享 2025年12月14日
    000
  • Python如何保存py文件

    保存py文件是通过文本编辑器或IDE将Python代码以.py扩展名存储。使用记事本或VS Code等编辑器编写代码后,选择“另存为”,输入文件名如hello.py,保存类型选“所有文件”,编码用UTF-8;在IDLE、PyCharm等IDE中,新建Python文件,编写代码后按Ctrl+S,首次保…

    2025年12月14日
    000
  • 在多台电脑上协同开发:使用Git进行代码同步

    在多台电脑上协同开发,最常见的需求就是如何在不同设备间同步代码,保证开发进度的一致性。传统的做法,例如使用Google Drive等云盘进行手动上传下载,效率低下且容易出错。Git作为一种强大的版本控制系统,可以完美解决这个问题。 使用Git进行版本控制 Git是一个分布式版本控制系统,它允许你跟踪…

    2025年12月14日
    000
  • Python跨模块全局变量管理:避免from import *陷阱

    本文深入探讨Python中跨模块共享全局变量时常见的from module import *陷阱。当使用此语法时,模块会创建变量的本地副本,导致更新不一致。正确的做法是使用import module,并通过module.variable形式直接引用原始模块中的变量,确保所有模块访问和修改的是同一个实…

    2025年12月14日
    000
  • 解决Web抓取中HTML内容显示不完整问题:终端限制与完整数据保存策略

    本教程旨在解决Web抓取过程中,终端显示HTML内容不完整的问题。当抓取到的HTML文本过长时,终端的行数限制可能导致内容截断。文章将详细介绍如何通过将抓取到的完整HTML内容保存到本地文件,从而克服这一限制,确保开发者能够查阅和分析所有抓取到的数据。 理解问题:终端输出限制 在进行web抓取时,开…

    2025年12月14日
    000
  • Python正则表达式:高效提取整数与分数

    本文详细阐述如何利用Python正则表达式从混合文本中准确提取整数和分数。通过构建d+(?:/d+)?等灵活模式,解决了传统d+无法识别分数的问题,并结合pandas和re模块进行实战演示,帮助读者掌握从非结构化文本中提取特定数值数据的专业技巧。 引言 在数据分析和处理中,我们经常需要从非结构化或半…

    2025年12月14日
    000
  • Discord Bot开发:实现交互式问卷并有效收集用户文本回复

    本教程详细指导如何在Discord机器人中实现交互式问卷功能,并确保将用户的文本回答(message.content)正确收集为字符串列表。文章涵盖了bot.wait.for的使用、消息检查机制以及答案存储的关键步骤,旨在帮助开发者构建功能完善的Discord交互应用。 在开发discord机器人时…

    2025年12月14日
    000
  • Python文件读取与字符串比较:解决意外换行符及最佳实践

    本文探讨Python文件读取时因隐含换行符导致字符串比较失败的问题,并提供strip()方法作为解决方案。同时,深入讲解了使用with语句进行文件操作的优势,强调了资源管理的最佳实践,并分享了有效的调试技巧,帮助开发者编写更健壮的代码。 在Python编程中,文件读写是常见的操作。然而,初学者在进行…

    2025年12月14日
    000
  • 使用Python构建弗洛伊德三角形:原理与高效实现

    本教程详细介绍了如何使用Python生成弗洛伊德三角形。文章首先阐释了弗洛伊德三角形的结构特点,随后分析了初学者在实现过程中可能遇到的常见逻辑错误。核心内容展示了一种简洁高效的Python实现方案,利用循环和Python的特性(如切片打印和海象运算符)来按行生成递增的数字序列,确保输出符合预期的三角…

    2025年12月14日
    000
  • python如何删除字符串的特殊字符

    使用isalnum()可保留字母数字,2. 正则表达式灵活过滤特殊字符,3. string.punctuation去除标准标点,按需选择方法。 在Python中删除字符串中的特殊字符,通常是指去除标点符号、控制字符或其他非字母数字的符号。可以通过多种方式实现,下面介绍几种常用且有效的方法。 使用字符…

    2025年12月14日
    000
  • 使用BeautifulSoup从HTML中提取特定标签并生成新页面

    本文详细介绍了如何利用Python的BeautifulSoup库,从现有HTML文档中高效、精准地提取指定标签及其内容,并将其整合到一个全新的HTML页面中。通过初始化新的HTML结构、定义目标标签列表并利用BeautifulSoup的append方法,实现了比传统字符串拼接更优雅、更健壮的解决方案…

    2025年12月14日 好文分享
    000
  • Python文件读取与字符串验证:解决换行符陷阱与优化文件操作

    本文深入探讨Python文件读取时因f.read()方法默认包含换行符,导致字符串比较验证失败的常见问题。教程将详细介绍如何使用strip()方法清除字符串末尾的空白字符,并强调利用with语句作为上下文管理器进行文件操作的最佳实践,以确保资源正确释放。同时,提供实用的调试技巧,帮助开发者编写更健壮…

    2025年12月14日
    000
  • 高效生成稀疏邻接矩阵的COO格式数据

    本文旨在教授如何高效地在Python中生成用于稀疏邻接矩阵(特别是COO格式)的行(row)和列(col)索引,以确保矩阵对角线元素为零(即无自环)。我们将探讨使用NumPy生成所有非对角线索引的方法,以及如何从已有的COO格式数据构建矩阵,并最终将其应用于Scipy的稀疏矩阵构建。 在图论和网络分…

    2025年12月14日
    000
  • Pyheif安装教程:解决缺失libheif依赖的问题

    本教程旨在解决Python pyheif库安装过程中常见的“libheif/heif.h文件未找到”错误。核心在于pyheif是libheif C库的Python接口,因此必须先正确安装libheif及其开发文件。文章将详细指导macOS、Linux用户如何通过包管理器安装libheif,并为Win…

    2025年12月14日
    000
  • 解决PyTorch GAN训练中的梯度计算错误:inplace操作与计算图分离

    本文旨在解决PyTorch GAN训练中常见的RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation错误。该错误通常源于生成器和判别器在共…

    2025年12月14日
    000
  • 解决Web抓取时HTML输出在终端被截断的问题

    本文旨在解决Web抓取过程中,当尝试在终端打印HTML结构时,内容显示不完整的问题。核心原因在于终端显示行数限制,而非抓取代码本身错误。教程将详细介绍如何通过将HTML内容保存到本地文件来完整获取并查看抓取到的网页结构,确保数据完整性。 1. 问题剖析:HTML输出为何在终端被截断? 在进行web抓…

    2025年12月14日
    000
  • 使用 Git 在多台电脑上协同开发

    本文将介绍如何利用 Git 版本控制系统,实现在多台电脑上无缝协同开发同一项目。 告别手动上传下载的繁琐,通过 Git 的推送(push)和拉取(pull)操作,轻松同步代码变更,确保在不同设备上始终保持最新的工作进度。 掌握 Git 的基本操作,提升开发效率,让编码不再受限于单一设备。 在软件开发…

    2025年12月14日
    000
  • Python fileinput模块:高效处理大文件行删除的教程

    本教程旨在解决Python中处理超大文件时,高效删除特定行的挑战。针对内存或硬盘资源受限的环境,传统方法可能效率低下甚至不可行。我们将详细介绍如何利用Python内置的fileinput模块,通过其原地修改(inplace=True)功能,以流式处理方式实现特定行的删除,从而显著减少内存占用并优化I…

    2025年12月14日
    000
  • 解决 Pyheif 安装失败:理解并安装 libheif 核心依赖

    Pyheif库在Python项目中用于处理HEIC/HEIF图像格式,但其安装常因缺少底层的C语言库libheif而失败。本文详细阐述了Pyheif与libheif的依赖关系,并提供了在macOS、Linux和Windows系统上安装libheif的具体步骤,从而解决Pyheif安装时常见的编译错误…

    2025年12月14日
    000
  • 优化滑动窗口中位数:使用惰性删除与双堆策略解决TLE问题

    本文旨在解决使用双堆法计算滑动窗口中位数时遇到的时间限制超出(TLE)问题。通过分析原始实现中元素移除操作的低效性,我们提出了一种基于惰性删除(即只标记不移除)和索引跟踪的优化方案。该方案利用lowindex动态标记过期元素,并修改堆的peek/pop操作以跳过这些标记元素,从而将移除操作的复杂度从…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信