
本文旨在解决在pandas中循环合并大量csv文件时遇到的性能瓶颈。通过分析循环中使用`pd.concat`的低效性,文章提出两种优化策略:一是将所有数据收集到字典中,最后进行一次性`pd.concat`;二是利用`concurrent.futures.threadpoolexecutor`实现文件读取的并行化。这些方法显著提升了处理效率,避免了随着文件数量增加而导致的性能急剧下降。
1. 问题背景与循环concat的性能瓶颈
在数据处理过程中,我们经常需要合并来自多个文件的信息。当面对大量(例如上千个)中等大小(例如每个15MB,10,000行)的CSV文件时,一种直观的做法是使用循环迭代读取每个文件,进行必要的转换,然后通过pd.concat将其追加到一个主DataFrame中。然而,这种方法存在严重的性能问题。
考虑以下原始代码示例:
import pandas as pdimport os# 假设 df 是一个包含文件路径信息的DataFrame# root_path 是根目录# df 示例:# File ID File Name# 0 folderA file001.txt# 1 folderB file002.txtmerged_data = pd.DataFrame()count = 0for index, row in df.iterrows(): folder_name = row['File ID'].strip() file_name = row['File Name'].strip() file_path = os.path.join(root_path, folder_name, file_name) # 读取、转置并插入新列 file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='t') file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True) file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name)) # 在循环中进行拼接 merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True) count = count + 1 print(count)
上述代码的性能问题在于,每次循环调用pd.concat时,Pandas都需要创建一个新的DataFrame来容纳旧数据和新数据。这意味着大量的内存重新分配和数据复制操作,随着merged_data的增大,这些操作的开销会呈指数级增长,导致处理速度越来越慢。对于上千个文件,这种方法是不可持续的。
2. 优化策略一:收集数据并进行单次pd.concat
解决循环中pd.concat低效问题的核心思想是:避免在每次迭代中都进行拼接操作。取而代之的是,将每个文件处理后的数据收集到一个数据结构(如列表或字典)中,然后在循环结束后,一次性地调用pd.concat来合并所有数据。
以下是优化后的代码示例:
import pathlibimport pandas as pd# 假设 df 是一个包含文件路径信息的DataFrame# root_path 是根目录,使用 pathlib 替换 os.pathroot_path = pathlib.Path('root') data_parts = {} # 使用字典收集每个文件的处理结果# 使用 enumerate 简化计数器,并迭代 dffor count, (_, row) in enumerate(df.iterrows(), 1): folder_name = row['File ID'].strip() file_name = row['File Name'].strip() file_path = root_path / folder_name / file_name # pathlib 的路径拼接 folder_file_id = f'{folder_name}_{file_name}' # 读取CSV文件,并进行初步处理 # header=None 因为文件没有表头 # memory_map=True 可以提高大文件读取效率,low_memory=False 避免混合类型警告 file_data = pd.read_csv(file_path, header=None, sep='t', names=['Case', folder_file_id], memory_map=True, low_memory=False) # 将 'Case' 列设置为索引,并使用 squeeze() 将 DataFrame 转换为 Series # 这样可以方便后续的 unstack 操作 data_parts[folder_file_id] = file_data.set_index('Case').squeeze() print(count)# 循环结束后,一次性合并所有数据# pd.concat 传入字典,字典的键将作为新的层级索引 (names=['folder_file_id'])# unstack('Case') 将 'Case' 索引转换为列# reset_index() 将多级索引扁平化,并重置索引merged_data = (pd.concat(data_parts, names=['folder_file_id']) .unstack('Case').reset_index())
代码改进点说明:
pathlib替代os.path: pathlib模块提供了面向对象的路径操作,代码更简洁、可读性更强。字典收集数据: data_parts = {}用于存储每个文件处理后的Series对象。enumerate: 代替手动维护count变量,使代码更简洁。pd.read_csv参数优化:header=None: 明确指定CSV文件没有表头。memory_map=True: 对于大文件,这可以提高读取效率,因为它将文件映射到内存,而不是一次性加载所有数据。low_memory=False: 禁用Pandas的低内存解析器,这在处理混合数据类型的列时可以避免警告,并可能提高解析大型文件的速度。数据转换:file_data.set_index(‘Case’).squeeze(): 将Case列设置为索引,然后使用squeeze()将单列DataFrame转换为Series。这样做是为了在最终pd.concat时,能够利用Pandas的特性,通过字典键自动创建folder_file_id的索引层级,并方便后续的unstack操作。单次pd.concat: 循环结束后,通过pd.concat(data_parts, names=[‘folder_file_id’])一次性合并所有Series。unstack和reset_index: unstack(‘Case’)将原始的Case索引(现在是Series的索引)转换为列,reset_index()将生成的MultiIndex扁平化,得到最终所需的DataFrame结构。
示例输入数据:
>>> df File ID File Name0 folderA file001.txt1 folderB file002.txt>>> cat root/folderA/file001.txt0 12341 56782 90123 34564 7890>>> cat root/folderB/file002.txt0 45671 89012 23453 6789
示例输出结果:
>>> merged_dataCase folder_file_id 0 1 2 3 40 folderA_file001.txt 1234.0 5678.0 9012.0 3456.0 7890.01 folderB_file002.txt 4567.0 8901.0 2345.0 6789.0 NaN
3. 优化策略二:利用多线程并行处理文件读取
对于I/O密集型任务(如读取大量文件),即使避免了循环内concat,文件读取本身也可能成为瓶颈。在这种情况下,可以考虑使用多线程来并行化文件读取过程。Python的concurrent.futures模块提供了一个ThreadPoolExecutor,非常适合处理这类场景。
from concurrent.futures import ThreadPoolExecutorimport pathlibimport pandas as pdroot_path = pathlib.Path('root')# 定义一个函数,用于处理单个文件的读取和转换逻辑def read_csv_and_process(args): count, row_dict = args # 展开传入的参数 folder_name = row_dict['File ID'].strip() file_name = row_dict['File Name'].strip() file_path = root_path / folder_name / file_name folder_file_id = f'{folder_name}_{file_name}' file_data = pd.read_csv(file_path, header=None, sep='t', names=['Case', folder_file_id], memory_map=True, low_memory=False) print(f"Processing {count}: {folder_file_id}") return folder_file_id, file_data.set_index('Case').squeeze()# 使用 ThreadPoolExecutor# max_workers 参数控制并行执行的线程数量,通常根据CPU核心数或I/O特性调整with ThreadPoolExecutor(max_workers=4) as executor: # 可以根据系统资源调整 max_workers # 将 df 转换为字典列表,以便每个字典作为参数传递给 read_csv_and_process # enumerate 用于在并行处理时也能追踪进度 batch_args = enumerate(df[['File ID', 'File Name']].to_dict('records'), 1) # executor.map 会将 batch_args 中的每个元素应用到 read_csv_and_process 函数 # 并以提交的顺序返回结果 processed_results_iterator = executor.map(read_csv_and_process, batch_args) # 将迭代器转换为字典,以便进行最终的 concat data_parts_threaded = dict(processed_results_iterator)# 最终合并步骤与单线程版本相同merged_data_threaded = (pd.concat(data_parts_threaded, names=['folder_file_id']) .unstack('Case').reset_index())
多线程代码改进点说明:
read_csv_and_process函数: 将单个文件的处理逻辑封装成一个独立的函数,方便多线程调用。这个函数接收一个元组args,其中包含计数和行数据字典。ThreadPoolExecutor: 创建一个线程池。max_workers参数决定了同时运行的线程数量。对于I/O密集型任务,可以适当增加max_workers,但过多的线程也可能导致上下文切换开销增加。df.to_dict(‘records’): 将DataFrame的每一行转换为一个字典,这样可以方便地将行数据作为参数传递给并行函数。executor.map: 这是ThreadPoolExecutor的核心方法之一,它将一个函数和一个可迭代对象作为输入,并行地对可迭代对象中的每个元素应用该函数,并返回一个迭代器,按提交顺序提供结果。结果收集: dict(processed_results_iterator)将并行处理的结果(键值对元组)收集成一个字典。
注意事项:
多线程主要适用于I/O密集型任务(如文件读写、网络请求)。对于CPU密集型任务,由于Python的全局解释器锁(GIL),多线程并不能真正实现并行计算,此时应考虑使用多进程(ProcessPoolExecutor)。在实际应用中,max_workers的设置需要根据系统资源和任务特性进行调优。
4. 总结与最佳实践
处理大量文件合并的场景时,避免在循环中频繁调用pd.concat是提升性能的关键。
核心原则:将数据收集起来,然后进行一次性的大规模合并操作。这减少了内存重新分配和数据复制的开销。优化方案一(收集数据后单次concat):适用于大多数情况,是处理大量文件合并的首选优化。它简单有效,并且对内存管理更友好。优化方案二(多线程并行处理):当文件读取本身成为瓶颈时,可以进一步引入多线程来加速文件I/O。这在文件分散在不同物理磁盘或网络存储上时尤其有效。
在实际开发中,应首先采用第一种优化方案,如果性能仍不满足要求,再考虑引入多线程或多进程等并行化技术。同时,合理利用pd.read_csv等函数的参数(如memory_map、low_memory、chunksize等)也能进一步提升数据加载效率。
以上就是高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1381198.html
微信扫一扫
支付宝扫一扫