PySpark DataFrame多列多函数聚合与结果重塑教程

PySpark DataFrame多列多函数聚合与结果重塑教程

本教程详细介绍了如何在pyspark中对dataframe的所有列同时应用多个聚合函数(如`min`和`max`),并以行式结构(每行代表一个聚合结果)展示。通过结合使用`select`进行初步聚合、`cache`优化性能以及`unionbyname`进行结果重塑,实现了灵活且高效的数据分析,避免了直接`agg`函数无法满足特定输出格式的问题。

在PySpark进行数据分析时,一个常见的需求是对DataFrame中的所有或指定列应用多个聚合函数,例如同时计算每列的最小值和最大值。虽然DataFrame.agg()方法能够轻松实现多列多函数的聚合,但其默认输出是将所有聚合结果展平为单行,这往往无法满足将不同聚合类型(如最小值和最大值)作为独立行呈现的需求。

例如,直接使用df.agg(*exprs)的方式,其中exprs = [min(c).alias(c), max(c).alias(c) for c in df.columns],会生成一个包含所有列的最小值和最大值,但这些值都将并列在同一行中,而不是我们期望的“一行是所有列的最小值,另一行是所有列的最大值”的结构。

为了实现这种行式输出的聚合结果,我们需要一种更为精细的策略,结合PySpark的select、cache和unionByName等操作。

解决方案:多阶段聚合与结果重塑

以下步骤将详细演示如何通过分阶段处理来达到目标输出格式:

初步聚合所有最小值和最大值: 首先,对DataFrame的所有列分别计算其最小值和最大值。这些聚合结果将暂时存储在一个新的DataFrame的单行中,其中每一列对应一个聚合值(例如,min_col1, max_col1, min_col2, max_col2等)。缓存中间结果: 为了避免重复计算,对包含所有聚合值的中间DataFrame进行缓存。这在处理大型数据集时尤为重要。重塑结果为行式结构: 将缓存的单行聚合结果拆分为多个DataFrame,每个DataFrame代表一种聚合类型(例如,一个DataFrame只包含所有列的最小值,另一个只包含所有列的最大值)。在拆分过程中,为每个DataFrame添加一个标识列(如agg_type),并统一列名,以便后续合并。合并结果: 使用unionByName()方法将重塑后的DataFrame合并,最终得到我们期望的行式输出。

示例代码与详细解释

让我们通过一个具体的PySpark代码示例来演示上述过程:

import operatorfrom pyspark.sql import SparkSessionfrom pyspark.sql import functions as F# 初始化Spark会话spark = SparkSession.builder.appName("MultiFunctionAggregate").getOrCreate()# 示例数据_data = [    (4, 123, 18, 29),    (8, 5, 26, 187),    (2, 97, 18, 29),]_schema = ['col_1', 'col2', 'col3', 'col_4']df = spark.createDataFrame(_data, _schema)print("原始DataFrame:")df.show()# +-----+----+----+-----+# |col_1|col2|col3|col_4|# +-----+----+----+-----+# |    4| 123|  18|   29|# |    8|   5|  26|  187|# |    2|  97|  18|   29|# +-----+----+----+-----+# 1. 初步聚合所有最小值和最大值# 构建min聚合表达式列表,并为结果列添加'min_'前缀min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]# 构建max聚合表达式列表,并为结果列添加'max_'前缀max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]# 使用select执行所有聚合,结果是一个单行DataFramedf_agg_raw = df.select(min_vals + max_vals)print("初步聚合结果 (单行):")df_agg_raw.show()# +-------+-------+-------+--------+-------+-------+-------+--------+# |min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|# +-------+-------+-------+--------+-------+-------+-------+--------+# |      2|       5|      18|       29|      8|     123|     26|     187|# +-------+-------+-------+--------+-------+-------+-------+--------+# 2. 缓存中间结果# 缓存df_agg_raw以提高后续操作的性能df_agg_raw.cache()# 3. 重塑结果为行式结构# 为最小值行构建选择表达式:添加'agg_type'列,并将min_前缀的列重命名回原始列名min_cols = operator.add(    [F.lit('min').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'min'    [F.col(f'min_{c}').alias(c) for c in df.columns] # 选取带有'min_'前缀的列,并将其别名改回原始列名)# 为最大值行构建选择表达式,原理同上max_cols = operator.add(    [F.lit('max').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'max'    [F.col(f'max_{c}').alias(c) for c in df.columns] # 选取带有'max_'前缀的列,并将其别名改回原始列名)# 从缓存的df_agg_raw中选择并重命名列,创建最小值DataFramemin_df = df_agg_raw.select(min_cols)# 从缓存的df_agg_raw中选择并重命名列,创建最大值DataFramemax_df = df_agg_raw.select(max_cols)print("重塑后的最小值DataFrame:")min_df.show()# +--------+-----+----+----+-----+# |agg_type|col_1|col2|col3|col_4|# +--------+-----+----+----+-----+# |     min|    2|   5|  18|   29|# +--------+-----+----+----+-----+print("重塑后的最大值DataFrame:")max_df.show()# +--------+-----+----+----+-----+# |agg_type|col_1|col2|col3|col_4|# +--------+-----+----+----+-----+# |     max|    8| 123|  26|  187|# +--------+-----+----+----+-----+# 4. 合并结果# 使用unionByName合并两个DataFrame,确保按列名匹配result = min_df.unionByName(max_df)print("最终结果DataFrame:")result.show()# +--------+-----+----+----+-----+# |agg_type|col_1|col2|col3|col_4|# +--------+-----+----+----+-----+# |     min|    2|   5|  18|   29|# |     max|    8| 123|  26|  187|# +--------+-----+----+----+-----+# 停止Spark会话spark.stop()

注意事项与总结

列名管理: 在聚合阶段,通过alias()为聚合结果列添加前缀(如min_,max_)是关键,这有助于在后续重塑阶段清晰地识别和操作这些列。operator.add 的使用: 示例中operator.add用于连接两个列表,它等同于简单的列表拼接操作(list1 + list2)。F.lit()的作用: F.lit(‘min’)或F.lit(‘max’)用于创建一个字面量列,其值在所有行中都相同。这对于标识不同聚合类型至关重要。F.col()与alias(): 在重塑阶段,F.col(f’min_{c}’).alias(c)的作用是选取带有特定前缀的列,并将其重命名回原始的列名,以保持最终结果的列名一致性。cache()的重要性: df_agg_raw.cache()在执行min_df和max_df的select操作之前,将中间聚合结果持久化到内存中。这避免了每次创建min_df和max_df时都重新计算原始DataFrame的聚合,显著提升了性能。unionByName(): unionByName()是合并具有相同列名但可能顺序不同的DataFrame的理想选择。它会根据列名进行匹配,而不是列的物理位置,从而增加了代码的健壮性。扩展性: 这种方法不仅限于min和max,您可以轻松扩展到其他聚合函数(如avg, sum, count等),只需相应地修改聚合表达式和重塑逻辑即可。

通过上述方法,我们能够灵活地控制PySpark聚合结果的输出格式,满足将不同聚合类型以行式结构呈现的特定分析需求,同时兼顾了性能优化。

以上就是PySpark DataFrame多列多函数聚合与结果重塑教程的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1378104.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 18:34:28
下一篇 2025年12月14日 18:34:44

相关推荐

  • 如何使用MongoDB聚合管道对子文档进行排序?

    MongoDB子文档排序:高效处理嵌套数据 在MongoDB中,对嵌套在文档中的子文档进行排序是数据处理中的常见需求。本文以一个客户敏感词数据为例,演示如何使用MongoDB聚合管道高效地完成子文档排序。假设我们的数据结构包含客户代码和一个名为list的数组,数组中的每个元素代表一个敏感词,包含wo…

    2025年12月15日
    000
  • Go语言MongoDB聚合:如何用Go语言实现$ne操作中undefined的等效表达?

    本文探讨如何在Go语言中模拟MongoDB聚合操作中$ne操作符与undefined的组合使用。在MongoDB Shell中,undefined表示字段缺失或值为null。然而,Go语言中没有直接的undefined等价物。 MongoDB Shell中的语句{$cond: [{$ne: [‘$a…

    2025年12月15日
    000
  • python列表推导式是什么意思?

    列表推导式是Python中创建列表的简洁方法,1. 通过[表达式 for 变量 in 可迭代对象 if 条件]语法实现;2. 可替代传统for循环生成如平方数列表;3. 支持条件筛选,如保留偶数平方;4. 适用于数据转换与过滤,提升代码可读性和效率。 列表推导式是 Python 中一种简洁、高效地创…

    2025年12月15日
    000
  • 优化SpaCy Matcher模式匹配:理解与应用greedy参数解决长度冲突

    本教程深入探讨了SpaCy `Matcher`在处理重叠模式时可能遇到的匹配长度冲突问题。当存在多个模式,其中一个模式是另一个模式的子集时,`Matcher`默认行为可能导致较短模式优先匹配,从而阻止更长、更具体的模式被识别。文章详细介绍了如何通过`Matcher.add()`方法中的`greedy…

    2025年12月15日
    000
  • 高效合并大量数据文件的策略:绕过解析实现快速连接

    处理大量数据文件时,直接使用数据帧库的合并功能(如polars的`read_ipc`配合`rechunk=true`)可能因数据解析和内存重分块而导致性能瓶颈。本文介绍了一种绕过完整数据解析、直接在文件系统层面进行内容拼接的策略,以显著加速文件合并过程,并探讨了针对apache arrow等特定格式…

    2025年12月15日
    000
  • 使用Python PDDL框架构建旅行商问题:Effect表达式的正确姿势

    本文旨在指导用户在使用`pddl` python框架构建旅行商问题(tsp)时,如何正确处理pddl动作的`effect`表达式。通过分析常见的`recursionerror`,揭示了将pddl逻辑表达式误用字符串拼接的错误,并提供了使用框架内置逻辑运算符(如`&`和`~`)来组合谓词的正确…

    2025年12月15日
    000
  • Python中利用自定义类实现分层字符串常量与点符号路径自动构建

    本文深入探讨如何在python中优雅地组织分层字符串常量,尤其适用于http端点路径等场景。通过自定义`endpoint`类,我们能够实现类似点符号的层级访问,并自动构建完整的路径字符串,显著提升代码的可读性、可维护性及开发效率。 在构建需要与分层API(如RESTful服务)交互的Python客户…

    2025年12月15日
    000
  • 精通Django角色与权限管理:构建灵活的访问控制系统

    django提供强大的用户、组和权限系统,可用于实现精细的角色访问控制。本文将深入探讨如何利用django的内置功能,结合自定义逻辑,为不同用户角色(如经理、普通用户)分配差异化的数据访问权限,特别是如何实现部门级数据隔离,确保系统安全与业务需求。我们将从模型设计、组与权限配置,到视图层的数据过滤,…

    2025年12月15日
    000
  • 从Google Drive下载并解压ZIP文件至Colab Notebook

    本教程详细介绍了如何在Google Colab环境中,无需挂载Google Drive,从公共Google Drive链接下载并解压ZIP文件。文章分析了常见的`BadZipFile`错误原因,提供了使用`requests`库构建正确下载URL的方法,并重点推荐了更便捷、鲁棒的`gdown`库,以确…

    2025年12月15日
    000
  • Python中Collections模块数据类型如何使用?

    Collections模块提供高效容器:Counter统计频次,defaultdict自动初始化,OrderedDict保持顺序,deque支持双端操作,提升代码简洁性与性能。 Python 的 Collections 模块提供了比内置数据类型更高级、更灵活的容器类型,能够简化特定场景下的代码逻辑。…

    2025年12月15日
    000
  • Mac M1 芯片安装 Python 的注意事项

    在Mac M1芯片上安装Python需确保使用原生ARM64架构以获得最佳性能,避免通过Rosetta 2运行的x86_64版本以防依赖冲突和性能损失;2. 推荐使用pyenv + Homebrew或Miniforge进行安装,前者适合通用开发并可灵活管理多版本Python,后者专为数据科学优化且支…

    2025年12月15日
    000
  • python集合和列表推导式哪种方法去重快

    集合去重更快因其哈希实现,时间复杂度O(1);列表推导式查重为O(n²)较慢;需保序时推荐dict.fromkeys(),兼具性能与顺序。 在 Python 中,用集合(set)和列表推导式去重,集合去重更快。原因在于数据结构和时间复杂度的差异。 集合去重:高效且简洁 集合是哈希实现的,插入和查找平…

    2025年12月15日
    000
  • python中如何在排序时使用str.lower?

    答案:使用 key=str.lower 可实现忽略大小写的排序。通过 sorted() 或 list.sort() 的 key 参数传入 str.lower,使字符串按小写形式比较,但保留原值,常用此法实现不区分大小写的排序。 在 Python 中,如果想在排序时忽略大小写,可以通过 str.low…

    2025年12月15日
    000
  • python日志记录器的配置

    日志配置需设置级别、格式和输出目标,推荐使用字典配置管理。1. 设置日志级别为DEBUG或INFO以控制输出;2. 自定义格式包含时间、级别、模块名等;3. 输出到文件和控制台;4. 创建独立logger实例避免全局调用;5. 使用dictConfig集中管理复杂配置,防止重复handler和错误传…

    2025年12月15日
    000
  • python缩减exe文件内存

    使用PyInstaller精简打包可减小exe体积,排除冗余模块并用UPX压缩,同时优化代码以降低内存占用。 Python生成的exe文件通常体积较大,主要是因为打包工具(如PyInstaller)会把整个Python解释器和所有依赖库打包进去。虽然完全“缩减内存”运行时占用较难,但可以有效减小ex…

    2025年12月15日
    000
  • python集合如何检测内部特定元素?

    使用in操作符可高效检测Python集合中是否包含某元素,平均时间复杂度O(1):my_set = {1, 2, 3, 4, 5},if 3 in my_set: print(“元素 3 存在于集合中”);用not in判断不存在,如if 6 not in my_set: p…

    2025年12月15日
    000
  • python对象容器和回收的详解

    Python通过引用计数、标记清除和分代回收机制自动管理内存,容器如列表、字典等持有对象引用,导致对象生命周期延长;引用计数为主,对象被引用时计数加1,引用删除或重置时减1,计数为0则立即回收;但循环引用会导致计数无法归零,因此引入标记清除机制,从根对象出发标记可达对象,清除不可达对象;为提升效率,…

    2025年12月15日
    000
  • python中的索引是什么?如何在列表中索引?

    索引从0开始,正向访问首元素为0,反向为-1;用my_list[0]得’apple’,my_list[-1]得’date’,越界则报错list index out of range。 索引是Python中用来访问序列类型(如列表、字符串、元组)中特定位…

    2025年12月15日
    000
  • Django框架中如何创建项目及应用?

    首先创建Django项目并启动服务器验证,再在项目中创建应用并注册。使用django-admin startproject mysite创建项目,运行python manage.py runserver可访问欢迎页;在项目目录下执行python manage.py startapp blog创建应用…

    2025年12月15日
    000
  • 使用Python中的Tablib库

    Tablib 是一个轻量级 Python 库,支持 XLSX、CSV、JSON、YAML 等格式的表格数据导入导出,无需依赖 Pandas。其核心为 Dataset 对象,可定义表头并添加行数据,如 dataset.headers = [‘Name’, ‘Age&…

    2025年12月15日
    000

发表回复

登录后才能评论
关注微信