
本教程将详细介绍如何使用Polars库高效地加载多个结构相同的CSV文件,并为每个文件动态添加一个包含其文件名信息的新列。通过利用Polars的惰性评估(LazyFrame)和并行处理能力,我们能够以高性能的方式整合数据,实现批量文件处理与自定义数据增强的需求,避免逐个文件加载和合并的性能瓶颈。
在数据分析和处理的场景中,我们经常会遇到需要处理大量结构相同但存储在不同文件中的数据。例如,一系列按产品或日期划分的csv文件,如 data_product_1.csv、data_product_2.csv 等。通常,我们希望将这些文件的数据合并到一个统一的dataframe中,并且在此过程中,能够为每条记录添加一个标识其来源文件(或从中提取的产品代码)的额外列。虽然polars提供了方便的通配符加载功能 pl.read_csv(“data_*.csv”) 来合并文件,但它不直接支持在加载时自动添加文件名作为列。本文将介绍如何利用polars的惰性api来实现这一高级功能。
批量加载与自定义列添加的Polars解决方案
Polars的惰性(Lazy)API是处理大规模数据的强大工具,它允许我们构建计算图,延迟实际的数据加载和计算,直到 collect() 方法被调用。这使得Polars能够进行查询优化,并在可能的情况下并行处理任务,从而显著提高性能。
为了实现批量加载CSV文件并添加文件名作为新列,我们将结合使用 polars.scan_csv、Python的 pathlib 模块和 polars.concat。
步骤详解
文件准备:首先,确保您的工作目录下有如下结构的CSV文件。例如,创建三个文件:data_product_1.csv, data_product_2.csv, data_product_3.csv。
data_product_1.csv:
data,value2000-01-01,12000-01-02,2
data_product_2.csv:
data,value2000-01-01,32000-01-02,4
data_product_3.csv:
data,value2000-01-01,42000-01-02,5
导入必要的库:我们需要 polars 进行数据操作,以及 pathlib 来方便地查找文件。
import polars as plfrom pathlib import Path
构建惰性DataFrame列表:遍历所有符合模式的CSV文件,对每个文件执行以下操作:
使用 pl.scan_csv(f) 创建一个惰性DataFrame。使用 with_columns(product_code=pl.lit(f.name)) 添加一个名为 product_code 的新列。pl.lit(f.name) 将当前文件的名称作为字面量值赋给新列的所有行。
# 获取当前目录下所有匹配 "data_*.csv" 模式的文件路径csv_files = Path().glob("data_*.csv")# 为每个文件创建一个LazyFrame,并添加文件名作为新列lazy_frames = [ pl.scan_csv(f).with_columns(product_code=pl.lit(f.name)) for f in csv_files]
合并惰性DataFrame并执行计算:使用 pl.concat() 将所有惰性DataFrame合并成一个单一的惰性DataFrame。默认情况下,pl.concat 会并行处理这些惰性DataFrame,从而提高效率。最后,调用 .collect() 来触发实际的数据加载和计算,将惰性DataFrame转换为一个急切(Eager)的DataFrame。
# 合并所有LazyFrame,并在collect()时并行读取和处理df = pl.concat(lazy_frames).collect()# 打印结果print(df)
完整代码示例
import polars as plfrom pathlib import Pathimport os# --- 准备测试文件 (如果您的环境没有这些文件,请运行此段代码) ---# 创建一个临时目录来存放CSV文件temp_dir = "temp_csv_data"os.makedirs(temp_dir, exist_ok=True)# 写入测试CSV文件file_contents = { "data_product_1.csv": "data,valuen2000-01-01,1n2000-01-02,2", "data_product_2.csv": "data,valuen2000-01-01,3n2000-01-02,4", "data_product_3.csv": "data,valuen2000-01-01,4n2000-01-02,5"}for filename, content in file_contents.items(): with open(Path(temp_dir) / filename, "w") as f: f.write(content)# --- 测试文件准备结束 ---# 切换到临时目录以查找文件original_cwd = Path.cwd()os.chdir(temp_dir)try: # 获取当前目录下所有匹配 "data_*.csv" 模式的文件路径 csv_files = Path().glob("data_*.csv") # 为每个文件创建一个LazyFrame,并添加文件名作为新列 lazy_frames = [ pl.scan_csv(f).with_columns(product_code=pl.lit(f.name)) for f in csv_files ] # 合并所有LazyFrame,并在collect()时并行读取和处理 # 如果没有文件,lazy_frames可能为空,需要处理 if lazy_frames: df = pl.concat(lazy_frames).collect() # 打印结果 print(df) else: print("未找到匹配的CSV文件。")finally: # 切换回原始工作目录并清理临时文件 os.chdir(original_cwd) import shutil shutil.rmtree(temp_dir)
输出结果
执行上述代码后,您将得到一个合并后的DataFrame,其中包含原始数据以及一个名为 product_code 的新列,该列存储了每条记录对应的源文件名:
shape: (6, 3)┌────────────┬───────┬────────────────────┐│ data ┆ value ┆ product_code ││ --- ┆ --- ┆ --- ││ str ┆ i64 ┆ str │╞════════════╪═══════╪════════════════════╡│ 2000-01-01 ┆ 1 ┆ data_product_1.csv ││ 2000-01-02 ┆ 2 ┆ data_product_1.csv ││ 2000-01-01 ┆ 3 ┆ data_product_2.csv ││ 2000-01-02 ┆ 4 ┆ data_product_2.csv ││ 2000-01-01 ┆ 4 ┆ data_product_3.csv ││ 2000-01-02 ┆ 5 ┆ data_product_3.csv │└────────────┴───────┴────────────────────┘
如果您需要从 product_code 列中提取更精简的产品名称(例如,将 data_product_1.csv 转换为 product_1),可以在 with_columns 之后或 collect() 之后进一步使用字符串操作,例如 df.with_columns(pl.col(“product_code”).str.extract(r”product_(d+).csv”).alias(“product_id”))。
核心概念解析
惰性评估 (LazyFrame):pl.scan_csv() 返回的是一个 LazyFrame 对象,而不是立即加载数据的 DataFrame。这意味着Polars不会立即读取文件内容或执行任何计算。它只是构建一个表示未来计算步骤的计划。这种惰性特性对于处理大型数据集至关重要,因为它允许Polars优化整个查询,例如只读取所需列,或在内存不足时进行批处理。
并行处理:当使用 pl.concat() 合并多个 LazyFrame 并最终调用 collect() 时,Polars会尝试并行地读取和处理这些文件。这充分利用了多核CPU的优势,显著加快了数据加载和初始转换的速度,特别是在文件数量众多或单个文件较大时。
pathlib 模块:pathlib 提供了面向对象的路径操作方式,使得文件路径的查找、遍历和处理变得更加简洁和Pythonic。Path().glob(“data_*.csv”) 能够方便地获取所有符合通配符模式的文件路径对象。
注意事项与最佳实践
文件路径:确保 Path().glob(“data_*.csv”) 能正确找到您的文件。如果文件在子目录中,您可能需要调整路径或使用更复杂的 glob 模式。内存管理:尽管惰性评估有助于优化,但最终的 collect() 操作仍会将所有数据加载到内存中。对于超大型数据集,如果单次 collect() 导致内存溢出,可能需要考虑分批处理或使用更高级的流式处理技术。错误处理:在实际应用中,您可能需要添加错误处理机制,例如检查 lazy_frames 列表是否为空,或者在文件读取失败时进行捕获。性能考量:对于极少量的小文件,逐个加载并合并可能性能差异不明显。但对于成百上千个文件或单个文件较大时,本文介绍的惰性并行处理方法将展现出显著的性能优势。文件名提取:pl.lit(f.name) 直接使用完整文件名。如果需要提取文件名中的特定部分(如 product_1),可以使用Polars的字符串方法(str.replace(), str.extract() 等)进行进一步处理。
总结
通过结合使用 polars.scan_csv、pathlib 和 polars.concat,我们能够优雅且高效地解决批量加载多个CSV文件并动态添加文件名信息的需求。这种方法充分利用了Polars的惰性评估和并行处理能力,不仅代码简洁,而且在处理大规模数据时表现出卓越的性能。掌握这一模式,将极大地提升您在Polars中进行数据预处理和特征工程的效率。
以上就是Polars教程:高效加载多文件并动态添加文件名信息列的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1373971.html
微信扫一扫
支付宝扫一扫