
本教程将指导您如何利用python的`pathlib`模块递归遍历复杂目录结构,并结合`pandas`库高效地将多个子文件夹中的csv文件合并成一个统一的csv文件。我们将通过一个实际示例,展示如何定位、读取并整合分散的数据,最终生成一个便于分析的汇总数据集。
理解需求:多层目录下的CSV文件合并挑战
在数据处理的日常工作中,我们经常会遇到数据文件分散存储在多层子目录中的情况。例如,日志文件、传感器数据或实验结果可能按照日期、类型或批次等维度,被组织成类似 Sessions/day1/weather/weather1.csv、Sessions/day2/weather/weather2.csv 这样的结构。当需要对所有这些分散的数据进行统一分析时,首要任务就是将它们合并成一个完整的数据集。
传统的文件操作方法,如使用 os.walk 遍历目录树,或者 glob 模块进行模式匹配,虽然能够实现文件查找,但在处理复杂路径和递归查找时,代码可能显得冗长且不够直观。结合 pandas 进行数据合并,则需要一个高效且简洁的机制来获取所有目标文件的路径。
核心工具:pathlib与pandas
Python标准库中的 pathlib 模块提供了一种面向对象的方式来处理文件系统路径,极大地简化了路径操作。其 Path.rglob() 方法尤其适用于递归地查找符合特定模式的文件。而 pandas 库作为数据分析的核心工具,提供了 read_csv() 用于读取CSV文件,以及 concat() 函数用于将多个DataFrame对象高效地合并在一起。
本教程将展示如何巧妙地结合这两个库,以简洁高效的方式完成多层子目录下的CSV文件合并任务。
立即学习“Python免费学习笔记(深入)”;
实现步骤与代码示例
我们将以一个典型的场景为例:假设有一个名为 Sessions 的父目录,其中包含多个 dayX 子目录,每个 dayX 子目录又包含一个 weather 子目录,最终在 weather 目录中存放着 weatherX.csv 文件。我们的目标是将所有 weatherX.csv 文件合并到 Sessions 目录下的一个 weather_all.csv 文件中。
步骤1:导入必要库
首先,我们需要导入 pathlib 用于路径操作,以及 pandas 用于数据处理。
梅子Ai论文
无限免费生成千字论文大纲-在线快速生成论文初稿-查重率10%左右
66 查看详情
from pathlib import Pathimport pandas as pd
步骤2:定义目标父目录
指定包含所有子目录和CSV文件的父目录路径。在本例中,我们假设当前脚本与 Sessions 目录处于同一级别,或者提供 Sessions 的绝对路径。
parent_directory = Path('Sessions') # 相对路径,如果Sessions在当前工作目录下# 或者 parent_directory = Path('/path/to/your/Sessions') # 绝对路径,请替换为实际路径
步骤3:递归查找并收集CSV文件路径
使用 Path.rglob(‘*.csv’) 方法,我们可以递归地在 parent_directory 及其所有子目录中查找所有扩展名为 .csv 的文件。
csv_files = list(parent_directory.rglob('*.csv'))print(f"找到 {len(csv_files)} 个CSV文件。")if csv_files: for file in csv_files[:5]: # 打印前5个文件路径作为示例 print(file)
步骤4:逐一读取CSV文件并存储到列表中
遍历找到的所有CSV文件路径,使用 pd.read_csv() 将每个文件读取为一个 pandas.DataFrame,并将这些DataFrame对象收集到一个列表中。
dfs_to_combine = []for file_path in csv_files: try: df = pd.read_csv(file_path) dfs_to_combine.append(df) except pd.errors.EmptyDataError: print(f"警告: 文件 '{file_path}' 为空,跳过。") except Exception as e: print(f"错误: 读取文件 '{file_path}' 失败: {e},跳过。")
步骤5:合并所有数据框并保存
当所有DataFrame都读取完毕并存储在 dfs_to_combine 列表中后,使用 pd.concat() 函数将它们一次性合并成一个大的DataFrame。最后,将合并后的DataFrame保存为一个新的CSV文件。
if dfs_to_combine: # 确保列表不为空 combined_df = pd.concat(dfs_to_combine, ignore_index=True) destination_path = parent_directory / 'weather_all.csv' # 合并后的文件路径 # 确保输出目录存在,如果destination_path包含子目录 destination_path.parent.mkdir(parents=True, exist_ok=True) combined_df.to_csv(destination_path, index=False, encoding='utf-8-sig') print(f"所有CSV文件已成功合并并保存到: {destination_path}")else: print("没有可合并的数据。")
ignore_index=True 参数在合并时会重置索引,这对于新的合并数据集通常是期望的行为。encoding=’utf-8-sig’ 确保了在不同系统上的兼容性,尤其是在处理包含特殊字符的数据时。
完整代码示例
将上述步骤整合在一起,形成一个完整的脚本:
from pathlib import Pathimport pandas as pddef combine_nested_csvs(parent_dir_path, output_filename='weather_all.csv'): """ 递归查找指定父目录及其子目录中的所有CSV文件, 并将它们合并成一个单一的CSV文件。 Args: parent_dir_path (str or Path): 包含CSV文件的父目录路径。 output_filename (str): 合并后CSV文件的名称。 """ parent_directory = Path(parent_dir_path) if not parent_directory.is_dir(): print(f"错误: 目录 '{parent_dir_path}' 不存在。") return csv_files = list(parent_directory.rglob('*.csv')) if not csv_files: print(f"在 '{parent_dir_path}' 中没有找到任何CSV文件。") return print(f"找到 {len(csv_files)} 个CSV文件,开始读取和合并...") dfs_to_combine = [] for file_path in csv_files: try: # 尝试读取文件,可以根据需要添加更多pd.read_csv的参数 df = pd.read_csv(file_path) dfs_to_combine.append(df) except pd.errors.EmptyDataError: print(f"警告: 文件 '{file_path}' 为空,跳过。") except Exception as e: print(f"错误: 读取文件 '{file_path}' 失败: {e},跳过。") if dfs_to_combine: combined_df = pd.concat(dfs_to_combine, ignore_index=True) destination_path = parent_directory / output_filename # 确保输出目录存在 destination_path.parent.mkdir(parents=True, exist_ok=True) combined_df.to_csv(destination_path, index=False, encoding='utf-8-sig') print(f"所有CSV文件已成功合并并保存到: {destination_path}") else: print("没有可合并的数据。")# 示例调用# 假设你的 'Sessions' 目录与你的脚本在同一目录下# combine_nested_csvs('Sessions', 'weather_all.csv')# 如果 'Sessions' 在其他位置,请提供完整路径# combine_nested_csvs('/Users/YourUser/Documents/Sessions', 'weather_all.csv')
注意事项与最佳实践
列名一致性: pd.concat() 在合并时会尝试对齐列。如果所有CSV文件的列名和顺序完全一致,合并将非常顺利。如果列名不一致,pd.concat 会创建所有列的并集,并在缺失值处填充 NaN。在合并前,建议检查并标准化所有CSV文件的列结构。内存管理: 对于数量巨大或单个文件很大的CSV数据集,将所有DataFrame一次性加载到内存中可能会导致内存溢出。在这种情况下,可以考虑以下策略:分块读取: 使用 pd.read_csv(…, chunksize=…) 分块读取并处理数据。逐步写入: 读取一个文件,写入输出文件(追加模式),然后读取下一个。使用 dask.dataframe: 对于超大数据集,dask 提供了类似于 pandas 的API,但能够处理超出内存限制的数据。错误处理: 在读取CSV文件时,应考虑文件可能为空 (pd.errors.EmptyDataError)、文件损坏或编码问题等异常情况。在示例代码中已加入了基本的 try-except 块来处理这些情况。文件编码: 确保 pd.read_csv() 和 df.to_csv() 使用正确的编码(如 ‘utf-8’ 或 ‘utf-8-sig’),以避免乱码问题。文件复制: 如果除了合并之外,你确实需要将每个单独的CSV文件复制到一个目标目录,可以在读取文件的循环中添加 `shutil
以上就是Python教程:递归查找并合并多个子文件夹中的CSV文件的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/577746.html
微信扫一扫
支付宝扫一扫