
本文将详细介绍如何利用 Polars 的惰性计算(LazyFrame)和并行处理能力,高效地加载多个具有相同结构的 CSV 文件,并在合并之前为每个文件添加一个基于文件名的自定义列(例如产品代码)。通过结合 scan_csv 和 concat 方法,可以在处理大量文件时保持高性能和灵活性。
引言:多文件加载与自定义需求
在数据分析工作中,我们经常需要处理存储在多个文件中的数据,例如按产品、日期或区域划分的 CSV 文件。一个常见的需求是,在将这些文件合并成一个统一的 DataFrame 时,能够为每条记录添加一个标识其来源的列,例如文件名称或从文件名中提取的特定信息(如产品ID)。
考虑以下场景:您有一系列 CSV 文件,命名模式为 data_product_1.csv, data_product_2.csv 等,它们结构相同。您希望将所有数据合并到一个 Polars DataFrame 中,并额外添加一列 product_code,其值应从文件名中提取,例如 product_1、product_2。
直接使用 polars.read_csv(“data_*.csv”) 可以将所有文件合并,但这种方法不提供在加载过程中添加自定义列的机制。虽然可以逐个文件加载、添加列再合并,但这可能无法充分利用 Polars 的并行处理优势,尤其是在文件数量众多时。
Polars 解决方案:结合惰性计算与并行处理
为了高效地解决上述问题,Polars 提供了 scan_csv(或 scan_parquet 等)结合 LazyFrame 的方式,允许我们对每个文件进行预处理,然后并行地收集结果。
1. 准备示例数据
首先,我们创建几个示例 CSV 文件,以便后续代码能够运行。
import polars as plfrom pathlib import Path# 创建一个临时目录来存放CSV文件temp_dir = Path("temp_data")temp_dir.mkdir(exist_ok=True)# 创建示例CSV文件data_product_1 = pl.DataFrame({ "data": ["2000-01-01", "2000-01-02"], "value": [1, 2]})data_product_1.write_csv(temp_dir / "data_product_1.csv")data_product_2 = pl.DataFrame({ "data": ["2000-01-01", "2000-01-02"], "value": [3, 4]})data_product_2.write_csv(temp_dir / "data_product_2.csv")data_product_3 = pl.DataFrame({ "data": ["2000-01-01", "2000-01-02"], "value": [5, 6]})data_product_3.write_csv(temp_dir / "data_product_3.csv")print("示例CSV文件已创建在 'temp_data' 目录下。")
2. 核心实现:使用 scan_csv 和 concat
该方法的核心思想是:
惰性扫描: 使用 pl.scan_csv() 而不是 pl.read_csv()。scan_csv 不会立即读取文件内容,而是返回一个 LazyFrame 对象,它代表了未来要执行的计算计划。逐文件转换: 对每个 LazyFrame 应用 with_columns() 方法,添加基于文件名的自定义列。并行合并与收集: 使用 pl.concat() 将所有 LazyFrame 合并,然后调用 .collect() 触发实际的数据读取和计算。Polars 可以在 collect() 阶段并行处理这些独立的 LazyFrame。
import polars as plfrom pathlib import Path# 假设文件位于当前目录或指定目录# 如果文件在 'temp_data' 目录下,则路径应为 Path("temp_data")data_directory = Path("temp_data") # 获取所有匹配的文件路径csv_files = list(data_directory.glob("data_*.csv"))# 创建 LazyFrame 列表,并为每个 LazyFrame 添加 product_code 列lazy_frames = []for f_path in csv_files: # 提取文件名作为 product_code # f_path.stem 获取不带扩展名的文件名 (e.g., "data_product_1") # .replace("data_", "") 进一步提取 "product_1" product_code = f_path.stem.replace("data_", "") # 使用 scan_csv 创建 LazyFrame # 使用 with_columns 添加 product_code 列 lf = pl.scan_csv(f_path).with_columns( pl.lit(product_code).alias("product_code") ) lazy_frames.append(lf)# 使用 pl.concat 合并所有 LazyFrame,然后使用 .collect() 触发计算# 默认情况下,pl.concat 会并行处理 LazyFrameif lazy_frames: final_df = pl.concat(lazy_frames).collect() print(final_df)else: print("未找到匹配的CSV文件。")# 清理示例数据import shutilif temp_dir.exists(): shutil.rmtree(temp_dir) print("n示例数据目录 'temp_data' 已删除。")
输出示例:
shape: (6, 3)┌────────────┬───────┬──────────────┐│ data ┆ value ┆ product_code ││ --- ┆ --- ┆ --- ││ str ┆ i64 ┆ str │╞════════════╪═══════╪══════════════╡│ 2000-01-01 ┆ 1 ┆ product_1 ││ 2000-01-02 ┆ 2 ┆ product_1 ││ 2000-01-01 ┆ 3 ┆ product_2 ││ 2000-01-02 ┆ 4 ┆ product_2 ││ 2000-01-01 ┆ 5 ┆ product_3 ││ 2000-01-02 ┆ 6 ┆ product_3 │└────────────┴───────┴──────────────┘
3. 简化版本(列表推导式)
上述 for 循环可以通过列表推导式进一步简化,代码更加紧凑:
import polars as plfrom pathlib import Pathdata_directory = Path("temp_data") # 重新创建示例数据以确保代码可运行temp_dir = Path("temp_data")temp_dir.mkdir(exist_ok=True)data_product_1 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [1, 2]})data_product_1.write_csv(temp_dir / "data_product_1.csv")data_product_2 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [3, 4]})data_product_2.write_csv(temp_dir / "data_product_2.csv")data_product_3 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [5, 6]})data_product_3.write_csv(temp_dir / "data_product_3.csv")lazy_frames = [ pl.scan_csv(f_path).with_columns( pl.lit(f_path.stem.replace("data_", "")).alias("product_code") ) for f_path in data_directory.glob("data_*.csv")]if lazy_frames: final_df = pl.concat(lazy_frames).collect() print(final_df)else: print("未找到匹配的CSV文件。")# 清理示例数据import shutilif temp_dir.exists(): shutil.rmtree(temp_dir)
关键概念与优势
惰性计算 (LazyFrame): pl.scan_csv() 返回的是 LazyFrame。这意味着 Polars 只是构建了一个计算计划,而没有立即执行数据读取和转换。所有操作都被“记录”下来,直到调用 .collect() 时才一次性执行。优化与并行化: 由于 Polars 知道整个计算图,它可以在 .collect() 阶段对操作进行优化,并利用多核处理器并行读取和处理多个文件。这对于处理大量文件或大型文件时,能显著提高性能。灵活性: 这种方法允许在每个文件的 LazyFrame 上应用任意的 Polars 表达式 (with_columns, filter, select 等),从而实现高度定制化的预处理逻辑,而无需在内存中加载整个文件。内存效率: 对于非常大的文件,逐个文件加载到 LazyFrame 并进行转换,可以避免一次性将所有数据加载到内存中,从而减少内存压力。
注意事项
文件路径: 确保 Path().glob(“data_*.csv”) 或 data_directory.glob(“data_*.csv”) 能够正确找到您的文件。文件名解析: f_path.stem.replace(“data_”, “”) 是一种简单的文件名解析方式。如果您的文件名模式更复杂,可能需要使用正则表达式 (re 模块) 来提取所需信息。错误处理: 在生产环境中,您可能需要添加错误处理机制,例如使用 try-except 块来处理文件不存在或格式错误的情况。数据类型: pl.lit() 创建的字面量列的数据类型将根据输入自动推断。如果需要特定类型,可以使用 pl.lit(value).cast(pl.String) 等进行强制转换。替代方案(DuckDB): 值得一提的是,其他数据处理工具如 DuckDB 提供了直接在 read_csv_auto 函数中通过 filename=true 参数添加文件名列的功能。Polars 目前尚未在 read_csv 或 scan_csv 中内置此功能,但通过上述 LazyFrame 的组合使用,可以灵活地实现相同的效果。
总结
通过巧妙地结合 Polars 的 scan_csv、LazyFrame 和 concat 方法,我们能够高效且灵活地处理多文件数据加载场景。这种方法不仅允许在合并前对每个文件进行自定义转换,还充分利用了 Polars 的并行处理能力,从而在处理大规模数据集时提供了卓越的性能和内存效率。掌握这一模式,将极大地提升您在 Polars 中处理复杂数据管道的能力。
以上就是使用 Polars 高效加载多文件并进行自定义处理的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1374037.html
微信扫一扫
支付宝扫一扫