使用 Polars 高效加载多文件并进行自定义处理

使用 polars 高效加载多文件并进行自定义处理

本文将详细介绍如何利用 Polars 的惰性计算(LazyFrame)和并行处理能力,高效地加载多个具有相同结构的 CSV 文件,并在合并之前为每个文件添加一个基于文件名的自定义列(例如产品代码)。通过结合 scan_csv 和 concat 方法,可以在处理大量文件时保持高性能和灵活性。

引言:多文件加载与自定义需求

在数据分析工作中,我们经常需要处理存储在多个文件中的数据,例如按产品、日期或区域划分的 CSV 文件。一个常见的需求是,在将这些文件合并成一个统一的 DataFrame 时,能够为每条记录添加一个标识其来源的列,例如文件名称或从文件名中提取的特定信息(如产品ID)。

考虑以下场景:您有一系列 CSV 文件,命名模式为 data_product_1.csv, data_product_2.csv 等,它们结构相同。您希望将所有数据合并到一个 Polars DataFrame 中,并额外添加一列 product_code,其值应从文件名中提取,例如 product_1、product_2。

直接使用 polars.read_csv(“data_*.csv”) 可以将所有文件合并,但这种方法不提供在加载过程中添加自定义列的机制。虽然可以逐个文件加载、添加列再合并,但这可能无法充分利用 Polars 的并行处理优势,尤其是在文件数量众多时。

Polars 解决方案:结合惰性计算与并行处理

为了高效地解决上述问题,Polars 提供了 scan_csv(或 scan_parquet 等)结合 LazyFrame 的方式,允许我们对每个文件进行预处理,然后并行地收集结果。

1. 准备示例数据

首先,我们创建几个示例 CSV 文件,以便后续代码能够运行。

import polars as plfrom pathlib import Path# 创建一个临时目录来存放CSV文件temp_dir = Path("temp_data")temp_dir.mkdir(exist_ok=True)# 创建示例CSV文件data_product_1 = pl.DataFrame({    "data": ["2000-01-01", "2000-01-02"],    "value": [1, 2]})data_product_1.write_csv(temp_dir / "data_product_1.csv")data_product_2 = pl.DataFrame({    "data": ["2000-01-01", "2000-01-02"],    "value": [3, 4]})data_product_2.write_csv(temp_dir / "data_product_2.csv")data_product_3 = pl.DataFrame({    "data": ["2000-01-01", "2000-01-02"],    "value": [5, 6]})data_product_3.write_csv(temp_dir / "data_product_3.csv")print("示例CSV文件已创建在 'temp_data' 目录下。")

2. 核心实现:使用 scan_csv 和 concat

该方法的核心思想是:

惰性扫描: 使用 pl.scan_csv() 而不是 pl.read_csv()。scan_csv 不会立即读取文件内容,而是返回一个 LazyFrame 对象,它代表了未来要执行的计算计划。逐文件转换: 对每个 LazyFrame 应用 with_columns() 方法,添加基于文件名的自定义列。并行合并与收集: 使用 pl.concat() 将所有 LazyFrame 合并,然后调用 .collect() 触发实际的数据读取和计算。Polars 可以在 collect() 阶段并行处理这些独立的 LazyFrame。

import polars as plfrom pathlib import Path# 假设文件位于当前目录或指定目录# 如果文件在 'temp_data' 目录下,则路径应为 Path("temp_data")data_directory = Path("temp_data") # 获取所有匹配的文件路径csv_files = list(data_directory.glob("data_*.csv"))# 创建 LazyFrame 列表,并为每个 LazyFrame 添加 product_code 列lazy_frames = []for f_path in csv_files:    # 提取文件名作为 product_code    # f_path.stem 获取不带扩展名的文件名 (e.g., "data_product_1")    # .replace("data_", "") 进一步提取 "product_1"    product_code = f_path.stem.replace("data_", "")    # 使用 scan_csv 创建 LazyFrame    # 使用 with_columns 添加 product_code 列    lf = pl.scan_csv(f_path).with_columns(        pl.lit(product_code).alias("product_code")    )    lazy_frames.append(lf)# 使用 pl.concat 合并所有 LazyFrame,然后使用 .collect() 触发计算# 默认情况下,pl.concat 会并行处理 LazyFrameif lazy_frames:    final_df = pl.concat(lazy_frames).collect()    print(final_df)else:    print("未找到匹配的CSV文件。")# 清理示例数据import shutilif temp_dir.exists():    shutil.rmtree(temp_dir)    print("n示例数据目录 'temp_data' 已删除。")

输出示例:

shape: (6, 3)┌────────────┬───────┬──────────────┐│ data       ┆ value ┆ product_code ││ ---        ┆ ---   ┆ ---          ││ str        ┆ i64   ┆ str          │╞════════════╪═══════╪══════════════╡│ 2000-01-01 ┆ 1     ┆ product_1    ││ 2000-01-02 ┆ 2     ┆ product_1    ││ 2000-01-01 ┆ 3     ┆ product_2    ││ 2000-01-02 ┆ 4     ┆ product_2    ││ 2000-01-01 ┆ 5     ┆ product_3    ││ 2000-01-02 ┆ 6     ┆ product_3    │└────────────┴───────┴──────────────┘

3. 简化版本(列表推导式)

上述 for 循环可以通过列表推导式进一步简化,代码更加紧凑:

import polars as plfrom pathlib import Pathdata_directory = Path("temp_data") # 重新创建示例数据以确保代码可运行temp_dir = Path("temp_data")temp_dir.mkdir(exist_ok=True)data_product_1 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [1, 2]})data_product_1.write_csv(temp_dir / "data_product_1.csv")data_product_2 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [3, 4]})data_product_2.write_csv(temp_dir / "data_product_2.csv")data_product_3 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [5, 6]})data_product_3.write_csv(temp_dir / "data_product_3.csv")lazy_frames = [    pl.scan_csv(f_path).with_columns(        pl.lit(f_path.stem.replace("data_", "")).alias("product_code")    )    for f_path in data_directory.glob("data_*.csv")]if lazy_frames:    final_df = pl.concat(lazy_frames).collect()    print(final_df)else:    print("未找到匹配的CSV文件。")# 清理示例数据import shutilif temp_dir.exists():    shutil.rmtree(temp_dir)

关键概念与优势

惰性计算 (LazyFrame): pl.scan_csv() 返回的是 LazyFrame。这意味着 Polars 只是构建了一个计算计划,而没有立即执行数据读取和转换。所有操作都被“记录”下来,直到调用 .collect() 时才一次性执行。优化与并行化: 由于 Polars 知道整个计算图,它可以在 .collect() 阶段对操作进行优化,并利用多核处理器并行读取和处理多个文件。这对于处理大量文件或大型文件时,能显著提高性能。灵活性: 这种方法允许在每个文件的 LazyFrame 上应用任意的 Polars 表达式 (with_columns, filter, select 等),从而实现高度定制化的预处理逻辑,而无需在内存中加载整个文件。内存效率: 对于非常大的文件,逐个文件加载到 LazyFrame 并进行转换,可以避免一次性将所有数据加载到内存中,从而减少内存压力。

注意事项

文件路径: 确保 Path().glob(“data_*.csv”) 或 data_directory.glob(“data_*.csv”) 能够正确找到您的文件。文件名解析: f_path.stem.replace(“data_”, “”) 是一种简单的文件名解析方式。如果您的文件名模式更复杂,可能需要使用正则表达式 (re 模块) 来提取所需信息。错误处理: 在生产环境中,您可能需要添加错误处理机制,例如使用 try-except 块来处理文件不存在或格式错误的情况。数据类型: pl.lit() 创建的字面量列的数据类型将根据输入自动推断。如果需要特定类型,可以使用 pl.lit(value).cast(pl.String) 等进行强制转换。替代方案(DuckDB): 值得一提的是,其他数据处理工具如 DuckDB 提供了直接在 read_csv_auto 函数中通过 filename=true 参数添加文件名列的功能。Polars 目前尚未在 read_csv 或 scan_csv 中内置此功能,但通过上述 LazyFrame 的组合使用,可以灵活地实现相同的效果。

总结

通过巧妙地结合 Polars 的 scan_csv、LazyFrame 和 concat 方法,我们能够高效且灵活地处理多文件数据加载场景。这种方法不仅允许在合并前对每个文件进行自定义转换,还充分利用了 Polars 的并行处理能力,从而在处理大规模数据集时提供了卓越的性能和内存效率。掌握这一模式,将极大地提升您在 Polars 中处理复杂数据管道的能力。

以上就是使用 Polars 高效加载多文件并进行自定义处理的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1374037.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 13:47:52
下一篇 2025年12月14日 13:48:01

相关推荐

  • 在Xcelium与Specman集成中有效设置环境变量的指南

    针对在xcelium仿真环境中,通过specman的’e’语言调用python等外部脚本时,环境变量无法被正确识别的问题,本文提供了一系列设置环境变量的策略。内容涵盖了从仿真启动前、xcelium命令行参数到tcl集成等多种方法,旨在确保环境变量在仿真会话中被正确解析和使用,…

    2025年12月14日
    000
  • Matplotlib Y轴刻度标签字体大小调整教程

    本教程详细介绍了如何在Matplotlib中调整Y轴刻度标签的字体大小,以提高图表的可读性。文章提供了两种主要方法:使用`set_yticklabels()`函数直接设置标签字体,以及利用`tick_params()`函数进行更灵活的参数控制,并考虑了不同Matplotlib版本的兼容性。通过实际代…

    2025年12月14日
    000
  • 在Polars中高效计算指数移动平均线(EMA)及其初始化策略

    本教程详细介绍了如何在polars数据框架中实现指数移动平均线(ema)的计算,特别关注了将前n个周期初始化为简单移动平均线(sma)的常见需求。文章深入探讨了使用`ewm_mean`函数时的关键细节,包括正确处理空值(`none`而非`np.nan`)以及参数配置,旨在帮助用户避免常见陷阱并优化代…

    2025年12月14日
    000
  • 在 Polars 中高效计算指数移动平均线 (EMA) 并避免常见陷阱

    本教程详细介绍了如何在 Polars 中计算指数移动平均线 (EMA)。文章首先解释了 EMA 的基本概念和 Polars 中 `ewm_mean` 方法的使用。接着,重点阐述了在 Polars 中处理空值(`None` 与 `np.NaN`)的关键差异,并提供了一个经过优化的 `polars_em…

    2025年12月14日
    000
  • Python环境管理:解决Pip更新时的权限问题 (WinError 5)

    本教程旨在解决python pip更新时常见的`environmenterror: [winerror 5] access denied`权限问题。文章详细阐述了两种有效解决方案:以管理员身份运行命令提示符进行更新,或推荐将python重新安装到用户拥有完全权限的目录。通过这些方法,用户可以克服系统…

    2025年12月14日
    000
  • Odoo QWeb模板中浮点数到整数的正确转换与显示方法

    :显示拼接后的字符串。行为:它会计算表达式,转义结果,并将其插入到当前元素的开始标签和结束标签之间。 注意事项与最佳实践 选择正确的指令:当你的目标是显示数据或表达式的结果时,几乎总是应该使用t-esc。如果你需要赋值或设置属性,则考虑t-set或t-att-*系列指令。数据类型转换:在使用int(…

    2025年12月14日
    000
  • Python字典数据结构优化与值提取教程

    本文旨在指导python初学者如何优化字典数据结构,以避免不必要的嵌套,并实现高效的值提取与数据处理。通过分析常见的数据结构设计误区,我们将展示如何构建简洁且功能强大的字典,从而简化后续的数据操作,如排序,并提升代码的可读性和维护性。 在Python编程中,字典(Dictionary)是一种非常灵活…

    2025年12月14日
    000
  • Python Flask应用中在线图片URL生成Blurhash的关键指南

    本教程旨在指导您如何在python flask应用程序中,将在线图片url转换为blurhash键。针对官方文档主要聚焦于本地文件处理的痛点,本文将详细介绍如何利用`requests`库获取远程图片数据,并结合`blurhash-python`库进行编码,最终提供一个完整的flask集成示例,帮助开…

    2025年12月14日
    000
  • 使用pip管理和解决mysql-connector-python安装问题

    本教程详细介绍了如何使用pip安装python的mysql连接器mysql-connector-python。针对pip提示“requirement already satisfied”但仍需重新安装的情况,文章提供了手动清理现有包文件的方法,确保顺利完成安装过程,并避免常见的环境冲突问题,帮助开发…

    2025年12月14日
    000
  • Slack Webhook中自定义数据的高效处理:避免HTTP头误区

    在Slack应用开发中,直接通过HTTP请求头向Webhook发送自定义数据并期望在`slack_bolt`事件处理器中直接读取是不可行的。Slack的Webhook机制主要关注消息体(JSON payload)。本教程将详细指导如何将自定义数据作为元数据嵌入到Webhook的JSON payloa…

    2025年12月14日
    000
  • 深入理解Python数据访问:.attribute 与 [“key”] 的异同

    python中,访问数据主要通过两种机制:属性(attribute)和项(item)。属性通过点号(.)访问,通常用于对象的成员变量或方法;而项通过方括号([])访问,主要用于字典(通过键)或列表(通过索引)等集合类型的数据。理解这两种访问方式的区别对于编写清晰、健壮的python代码至关重要,尤其…

    2025年12月14日
    000
  • Python多线程在机器学习中的应用 Python多线程模型训练加速技巧

    多线程在机器学习中无法加速CPU密集型模型训练,主要受限于Python的GIL机制。然而,在数据预处理、I/O密集型任务及模型推理阶段,并发线程可显著提升效率。例如,使用ThreadPoolExecutor并行加载图像或解析小文件,能有效减少等待时间;在Web服务部署中,多线程可同时响应多个推理请求…

    2025年12月14日
    000
  • 在Python-pptx中为文本子串添加超链接的专业指南

    本教程详细阐述了如何在python-pptx中为一个文本字符串的特定子串添加超链接,同时保持文本的连续性。核心方法是利用`paragraph`对象可以包含多个`run`对象的特性,为不同的`run`设置独立的文本内容和超链接属性,从而实现精细化的文本控制。 理解Python-pptx中的文本结构 在…

    2025年12月14日
    000
  • Twilio WhatsApp API:从沙盒到生产环境的无缝消息发送指南

    本文详细阐述了在使用twilio whatsapp api时,为何无法向twilio沙盒外部号码发送消息的问题。核心原因在于沙盒环境仅用于开发测试,并限制消息发送至已加入沙盒的号码。要实现向任意whatsapp号码发送消息,开发者必须申请并配置whatsapp business api,从而将应用从…

    2025年12月14日
    000
  • Scipy优化中处理多重线性约束的正确姿势

    在使用`scipy.optimize.minimize`处理多重线性约束时,开发者常因python闭包的延迟绑定特性导致约束未能正确生效。本文将深入探讨这一常见陷阱,并提供两种有效的解决方案来确保约束的正确应用。此外,还将介绍如何利用`scipy.optimize.linearconstraint`…

    2025年12月14日
    000
  • 解决ReadTheDocs自定义PDF无法在下载菜单显示的问题

    本文详细介绍了在readthedocs平台配置自定义pdf生成并确保其在下载菜单中正确显示的方法。核心问题在于readthedocs对pdf文件的命名有特定要求。通过在`.readthedocs.yml`配置文件中,利用`mv`命令将生成的自定义pdf文件重命名为`$readthedocs_proj…

    2025年12月14日
    000
  • Python向Icecast服务器流式传输音频的正确方法

    向icecast服务器流式传输音频时,关键在于以音频的实际播放速度发送数据,而非尽可能快地传输文件块。直接将音频文件快速推送到服务器会导致缓冲区瞬间填满,但无法为客户端提供连续、实时的流。正确的做法是模拟实时播放,确保数据流的连续性和时间同步,对于复杂的实时音频处理,推荐使用专业的音频流媒体库。 理…

    2025年12月14日
    000
  • Scrapy CSS选择器失效:深入理解浏览器与爬虫获取HTML内容的差异

    在使用scrapy进行网页抓取时,开发者常常会遇到一个令人困惑的问题:精心调试的css选择器在浏览器开发者工具中能够准确匹配元素,但在scrapy爬取时却一无所获。这通常并非选择器本身有误,而是scrapy所见的网页内容与用户在浏览器中看到的内容存在本质差异。本文将深入探讨这一现象的原因,并提供实用…

    2025年12月14日
    000
  • NetBeans 20 Python插件安装失败:版本兼容性解决方案

    本文旨在解决netbeans 20中python插件安装失败的问题。核心原因在于尝试安装的插件版本与netbeans ide版本不兼容,通常是旧版本插件(如为netbeans 19设计)试图安装到新版本ide(netbeans 20)所致。教程将详细阐述问题现象、根本原因,并提供两种主要解决方案:寻…

    2025年12月14日
    000
  • Python字典中None值与内存优化:深入理解与实践

    python字典不会对值为none的键值对进行特殊优化,因为键的存在本身就是一种信息,与键的缺失不同。在大型字典中,即使移除none值对应的键,内存占用也可能保持一致,这主要受字典内部的键空间预分配机制和字符串驻留等因素影响。对于固定结构且需严格控制内存的数据,考虑使用带有`__slots__`的d…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信