
本文旨在解决Python处理大数据量列表匹配与筛选时遇到的性能瓶颈,特别是当传统多线程方案效果不佳时。我们将深入探讨如何利用Python的multiprocessing模块,结合Manager实现进程间数据共享,以及合理的任务分块策略,显著提升CPU密集型任务的执行效率,从而将耗时数十分钟的操作缩短至可接受的范围。
1. 问题背景与挑战
在处理大规模数据集时,例如需要在一个包含数万条记录的json列表中(json_list)查找并匹配另一个包含数千个标记(marking)的列表中的元素,性能往往成为一个关键挑战。具体场景是,json_list中的每个字典包含一个code字段,我们需要将marking列表中的每个字符串与json_list中元素的code字段进行相似度匹配。匹配规则是使用difflib.sequencematcher计算相似度,当相似度为1(完全匹配)或介于0.98到0.99之间时,认为匹配成功。
原始的单线程或简单的多线程(threading)实现,在数据量庞大时(json_list超过23,000条,marking超过3,000条),可能需要20分钟甚至更长时间才能完成。这主要是因为Python的全局解释器锁(GIL)限制了多线程在CPU密集型任务上的并行执行能力。即使创建了多个线程,它们也无法同时在多个CPU核心上运行Python字节码,导致性能提升不明显。
2. 多进程(Multiprocessing)的解决方案
为了克服GIL的限制,Python提供了multiprocessing模块,它允许创建独立的进程,每个进程都有自己的Python解释器和内存空间。这意味着不同的进程可以在不同的CPU核心上真正并行执行CPU密集型任务,从而显著提高性能。
本教程将展示如何利用multiprocessing库来优化上述数据匹配和筛选过程。
2.1 核心组件介绍
multiprocessing.Process: 用于创建和管理新的进程。multiprocessing.Manager: 用于创建可以在不同进程之间共享的数据结构(如列表、字典等)。这是解决进程间通信和数据共享的关键,因为普通Python对象在进程间默认不共享。difflib.SequenceMatcher: 用于计算两个序列(字符串)的相似度。
2.2 匹配逻辑函数 find_marking
这个函数负责执行单个标记与JSON数据项的匹配逻辑。它保持不变,因为它是一个纯计算函数,不涉及并发问题。
立即学习“Python免费学习笔记(深入)”;
from difflib import SequenceMatcherdef find_marking(x: str, y: dict) -> dict | None: """ 比较标记字符串x与JSON数据项y中的'code'字段的相似度。 如果相似度为1或在0.98到0.99之间,则返回y,否则返回None。 """ text_match = SequenceMatcher(None, x, y.get('code', '')).ratio() if text_match == 1 or (0.98 <= text_match < 0.99): return y return None
注意: 确保y字典中包含’code’键,否则y.get(‘code’, ”)可以提供一个默认值,避免KeyError。
2.3 多进程筛选主函数 eliminate_marking
这个函数是整个解决方案的核心,它协调多个进程来并行处理匹配任务。
import mathfrom multiprocessing import Process, Managerdef eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]: """ 使用多进程并行地从json_list中匹配和筛选marking_list中的标记。 Args: marking_list: 待匹配的标记字符串列表。 json_list: 包含'code'字段的JSON字典列表。 Returns: 一个元组,包含两个列表: - result_mark: 成功匹配的标记列表。 - result: 成功匹配的JSON数据项列表。 """ # 1. 初始化Manager和共享数据结构 # Manager用于创建可在进程间共享的列表,以收集结果。 manager = Manager() result_mark = manager.list() # 共享列表,用于存储成功匹配的标记 result = manager.list() # 共享列表,用于存储成功匹配的JSON数据项 def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict], shared_result_mark: Manager.list, shared_result: Manager.list): """ 每个进程执行的任务函数。 它遍历分配给它的标记子列表,并尝试在data_scrap中找到匹配项。 """ # data_scrap是json_list的一个副本,每个进程独立操作。 # 注意:这里的data_scrap是json_list的浅拷贝,对其内部字典的修改会影响原始字典 # 但对其列表结构(如remove操作)的修改仅影响当前进程的副本。 # 鉴于我们的目标是收集匹配项,这种拷贝方式是安全的。 for marking_item in sub_marking_list: # 遍历当前进程负责的标记子列表 for data in data_scrap: # 遍历json_list的副本 result_data = find_marking(marking_item, data) if result_data: # 找到匹配项后,将其添加到共享列表中 shared_result_mark.append(marking_item) shared_result.append(result_data) # 这里的remove操作只影响当前进程的data_scrap副本, # 并不影响其他进程的副本或原始json_list。 # 如果目标是真正从原始json_list中移除,需要更复杂的同步机制。 # 在当前场景下,我们主要关注收集匹配结果。 # data_scrap.remove(data) # 如果一个标记只需要匹配一次,可以在找到后跳出内层循环 break # 一个marking_item找到一个匹配后就跳出,避免重复匹配 # 2. 任务分块与进程创建 processes = [] chunk_size = 100 # 每个进程处理的marking_list块的大小 # 计算需要创建的进程数量 # 这里将marking_list分成块,每个进程处理一个或多个块。 # 另一种常见策略是基于CPU核心数创建进程。 num_chunks = math.ceil(len(marking_list) / chunk_size) for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, len(marking_list)) sub_marking_list = marking_list[start_idx:end_idx] if not sub_marking_list: continue # 避免创建空任务的进程 p = Process( target=__process_eliminate, # args参数传递给目标函数。 # json_list[:] 创建了一个json_list的浅拷贝,确保每个进程有独立的副本。 args=(sub_marking_list, json_list[:], result_mark, result) ) processes.append(p) p.start() # 启动进程 # 3. 等待所有进程完成 for p in processes: p.join() # 阻塞主进程,直到当前进程执行完毕 # 4. 关闭Manager并返回结果 manager.shutdown() # 在所有进程完成后关闭Manager return list(result_mark), list(result) # 将Manager.list转换为普通list返回
2.4 完整示例代码
为了方便测试,我们创建一些模拟数据:
import mathimport timeimport randomimport stringfrom difflib import SequenceMatcherfrom multiprocessing import Process, Manager# 模拟数据def generate_fake_data(num_json, num_marking): json_list = [] for i in range(num_json): code_val = ''.join(random.choices(string.digits, k=6)) json_list.append({ "code": code_val, "phone_number": f"1{random.randint(1000000000, 9999999999)}", "email": f"user{i}@example.com", "address": f"address_fake_{i}", "note": f"note dummy {i}" }) marking = [] # 确保有一些匹配项 for i in range(num_marking // 2): # 从json_list中随机取一个code作为marking marking.append(random.choice(json_list)['code']) # 添加一些不匹配的marking for i in range(num_marking // 2, num_marking): marking.append(''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5, 8)))) random.shuffle(marking) # 打乱顺序 return json_list, marking# 假设的 find_marking 函数def find_marking(x: str, y: dict) -> dict | None: text_match = SequenceMatcher(None, x, y.get('code', '')).ratio() if text_match == 1 or (0.98 <= text_match tuple[list[str], list[dict]]: manager = Manager() result_mark = manager.list() result = manager.list() def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict], shared_result_mark: Manager.list, shared_result: Manager.list): for marking_item in sub_marking_list: for data in data_scrap: result_data = find_marking(marking_item, data) if result_data: shared_result_mark.append(marking_item) shared_result.append(result_data) break # 一个marking_item找到一个匹配后就跳出 processes = [] # 这里的chunk_size可以根据实际CPU核心数和任务复杂度进行调整 # 较小的chunk_size可能导致更多的进程创建和管理开销 # 较大的chunk_size可能导致部分核心利用率不足 chunk_size = 50 # 调整为50,以创建更多进程进行测试,更细粒度的任务分配 # 优化:根据CPU核心数来决定进程数量,而不是简单地按chunk_size分块 # 理想情况下,进程数不应超过CPU核心数 # num_processes = os.cpu_count() or 1 # marking_per_process = math.ceil(len(marking_list) / num_processes) # # for i in range(num_processes): # start_idx = i * marking_per_process # end_idx = min((i + 1) * marking_per_process, len(marking_list)) # sub_marking_list = marking_list[start_idx:end_idx] # ... # 当前实现是按chunk_size分块 num_chunks = math.ceil(len(marking_list) / chunk_size) for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, len(marking_list)) sub_marking_list = marking_list[start_idx:end_idx] if not sub_marking_list: continue p = Process( target=__process_eliminate, args=(sub_marking_list, json_list[:], result_mark, result) ) processes.append(p) p.start() for p in processes: p.join() manager.shutdown() return list(result_mark), list(result)if __name__ == "__main__": # 生成模拟数据 NUM_JSON = 23000 NUM_MARKING = 3000 print(f"生成 {NUM_JSON} 条JSON数据和 {NUM_MARKING} 条标记数据...") test_json_list, test_marking_list = generate_fake_data(NUM_JSON, NUM_MARKING) print("数据生成完毕。") start_time = time.time() eliminated_markings, eliminated_data = eliminate_marking(test_marking_list, test_json_list) end_time = time.time() print(f"n多进程处理完成。") print(f"总耗时: {end_time - start_time:.2f} 秒") print(f"找到 {len(eliminated_markings)} 个匹配标记。") print(f"找到 {len(eliminated_data)} 条匹配数据。") # 验证部分结果 if eliminated_markings: print(f"部分匹配标记示例: {eliminated_markings[:5]}") if eliminated_data: print(f"部分匹配数据示例: {eliminated_data[:2]}")
3. 注意事项与最佳实践
GIL与多进程: 理解Python GIL是关键。对于CPU密集型任务,multiprocessing是比threading更好的选择,因为它绕过了GIL的限制,实现了真正的并行计算。数据共享: 进程间数据共享比线程间复杂。普通Python对象在进程间默认不共享,需要使用multiprocessing.Manager创建共享数据结构,或者通过管道/队列进行通信。本例中Manager.list用于收集结果,避免了复杂的同步机制。数据拷贝: 在eliminate_marking函数中,我们通过json_list[:]将json_list的浅拷贝传递给每个进程。这意味着每个进程都在自己的json_list副本上进行查找。这样做的好处是避免了对同一共享json_list进行并发读写和删除操作的复杂同步问题。缺点是,如果目标是修改原始的json_list(例如,从中删除匹配项),这种方法不会直接实现。但对于“获取数据”的需求,收集匹配结果是更安全和常见的模式。任务分块: 合理地划分任务(marking_list的chunk_size)对性能至关重要。过小的块可能导致过多的进程创建和管理开销;过大的块可能导致部分进程负载不均。通常,进程数量不应超过CPU的核心数。可以根据实际情况调整chunk_size或直接根据os.cpu_count()来分配任务。进程开销: 进程的创建和销毁比线程更耗资源。对于非常短小的任务,多进程的开销可能抵消并行带来的收益。但在本例这种大数据量、CPU密集型任务中,多进程的优势非常明显。错误处理: 在生产环境中,应考虑在进程中加入错误处理机制,例如使用try-except块,并记录异常,以提高程序的健壮性。Manager的生命周期: 确保在所有子进程完成后调用manager.shutdown()来清理Manager创建的资源。
4. 总结
通过将任务分解为独立的子任务并在多个进程中并行执行,结合multiprocessing.Manager实现结果的有效收集,我们成功地将大数据量列表匹配和筛选的性能提升了一个数量级。这种方法特别适用于那些受限于Python GIL的CPU密集型计算任务。理解multiprocessing的工作原理和最佳实践,能够帮助开发者在处理大规模数据时构建出更高效、更健壮的Python应用程序。
以上就是使用Python多进程优化大数据量匹配与筛选性能的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1375116.html
微信扫一扫
支付宝扫一扫