
本教程详细讲解如何在pyspark dataframe中对多个列应用多个聚合函数(如min和max),并将聚合结果以行式(而非默认的列式)结构进行展示。我们将通过分步操作,利用select、alias、f.lit和unionbyname等函数,将每个列的最小值和最大值分别作为独立行呈现,从而满足特定的数据分析和报告需求。
在PySpark中,对DataFrame的多个列执行聚合操作是常见的需求。通常,我们可以使用df.agg()配合F.min()、F.max()等函数来实现。然而,当期望的输出格式是将不同聚合函数的结果以行而非列的形式展示时,标准的df.agg()方法会生成一个单行多列的DataFrame,这与将“所有列的最小值”作为一行,“所有列的最大值”作为另一行的需求不符。本教程将介绍一种实现这种特定行式聚合结果的方法。
1. 准备示例数据
首先,我们创建一个示例PySpark DataFrame,以便演示后续的操作。
import operatorfrom pyspark.sql import functions as Ffrom pyspark.sql import SparkSession# 初始化SparkSessionspark = SparkSession.builder.appName("PySparkMultiAggRowWise").getOrCreate()_data = [ (4, 123, 18, 29), (8, 5, 26, 187), (2, 97, 18, 29),]_schema = ['col_1', 'col2', 'col3', 'col_4']df = spark.createDataFrame(_data, _schema)df.show()
输出的DataFrame df 如下:
+-----+----+----+-----+|col_1|col2|col3|col_4|+-----+----+----+-----+| 4| 123| 18| 29|| 8| 5| 26| 187|| 2| 97| 18| 29|+-----+----+----+-----+
2. 执行列式聚合并合并结果
为了得到行式的聚合结果,我们首先分别计算每个列的最小值和最大值,并将它们收集到一个新的DataFrame中。
# 生成所有列的最小值表达式min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]# 生成所有列的最大值表达式max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]# 使用select执行聚合,结果将是一个单行DataFrame,包含所有min_和max_列df_aggregated = df.select(min_vals + max_vals)df_aggregated.cache() # 缓存结果,因为后续会多次使用df_aggregated.show()
df_aggregated 的输出如下:
+-------+------+-------+-------+-------+------+-------+-------+|min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|+-------+------+-------+-------+-------+------+-------+-------+| 2| 5| 18| 29| 8| 123| 26| 187|+-------+------+-------+-------+-------+------+-------+-------+
此时,我们得到了一个包含所有聚合结果的单行DataFrame,但其结构仍是列式的。
3. 重构为行式输出
为了将上述列式结果转换为行式,我们需要创建两个独立的DataFrame:一个用于最小值,一个用于最大值,然后将它们通过unionByName合并。
3.1 构造最小值DataFrame
我们从 df_aggregated 中选择所有 min_ 开头的列,并将它们重命名回原始列名。同时,添加一个名为 agg_type 的字面量列来标识这些行代表的是最小值。
芦笋演示
一键出成片的录屏演示软件,专为制作产品演示、教学课程和使用教程而设计。
34 查看详情
min_cols = operator.add( [F.lit('min').alias('agg_type')], # 添加聚合类型标识列 [F.col(f'min_{c}').alias(c) for c in df.columns] # 选择并重命名最小值列)min_df = df_aggregated.select(min_cols)min_df.show()
min_df 的输出如下:
+--------+-----+----+----+-----+|agg_type|col_1|col2|col3|col_4|+--------+-----+----+----+-----+| min| 2| 5| 18| 29|+--------+-----+----+----+-----+
3.2 构造最大值DataFrame
类似地,我们为最大值创建另一个DataFrame。
max_cols = operator.add( [F.lit('max').alias('agg_type')], # 添加聚合类型标识列 [F.col(f'max_{c}').alias(c) for c in df.columns] # 选择并重命名最大值列)max_df = df_aggregated.select(max_cols)max_df.show()
max_df 的输出如下:
+--------+-----+----+----+-----+|agg_type|col_1|col2|col3|col_4|+--------+-----+----+----+-----+| max| 8| 123| 26| 187|+--------+-----+----+----+-----+
4. 合并最终结果
最后,使用 unionByName 将 min_df 和 max_df 合并。unionByName 会根据列名匹配来合并DataFrame,这确保了即使列顺序不同也能正确合并。
result = min_df.unionByName(max_df)result.show()
最终 result DataFrame的输出如下,它以行式展示了每个列的最小值和最大值:
+--------+-----+----+----+-----+|agg_type|col_1|col2|col3|col_4|+--------+-----+----+----+-----+| min| 2| 5| 18| 29|| max| 8| 123| 26| 187|+--------+-----+----+----+-----+
完整代码示例
import operatorfrom pyspark.sql import functions as Ffrom pyspark.sql import SparkSession# 初始化SparkSessionspark = SparkSession.builder.appName("PySparkMultiAggRowWise").getOrCreate()# 示例数据_data = [ (4, 123, 18, 29), (8, 5, 26, 187), (2, 97, 18, 29),]_schema = ['col_1', 'col2', 'col3', 'col_4']df = spark.createDataFrame(_data, _schema)print("原始DataFrame:")df.show()# 1. 生成所有列的最小值和最大值表达式min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]# 2. 执行列式聚合并缓存结果df_aggregated = df.select(min_vals + max_vals)df_aggregated.cache()print("聚合后的单行DataFrame:")df_aggregated.show()# 3. 构造最小值DataFramemin_cols = operator.add( [F.lit('min').alias('agg_type')], [F.col(f'min_{c}').alias(c) for c in df.columns])min_df = df_aggregated.select(min_cols)print("最小值DataFrame:")min_df.show()# 4. 构造最大值DataFramemax_cols = operator.add( [F.lit('max').alias('agg_type')], [F.col(f'max_{c}').alias(c) for c in df.columns])max_df = df_aggregated.select(max_cols)print("最大值DataFrame:")max_df.show()# 5. 合并最终结果result = min_df.unionByName(max_df)print("最终行式聚合结果:")result.show()# 停止SparkSessionspark.stop()
注意事项与总结
df.agg() 与 df.select() 的选择: 如果你只需要一个包含所有聚合结果的单行DataFrame(例如,col1_min, col1_max, col2_min, col2_max…),那么直接使用df.agg()会更简洁。本教程的方法是针对需要将不同聚合类型作为独立行展示的特定场景。cache() 的使用: 在 df_aggregated 上使用 cache() 是一个性能优化措施。由于 df_aggregated 会被 min_df 和 max_df 两次引用,缓存可以避免重复计算,提高效率。列重命名: 在构建 min_df 和 max_df 时,将 min_col_name 和 max_col_name 重命名回 col_name 是为了保持最终输出的列名一致性,方便后续处理。F.lit() 的作用: F.lit() 函数用于创建一个字面量列,这对于添加如 agg_type 这样的标识符非常有用。operator.add 的替代方案: 在生成 min_cols 和 max_cols 列表时,使用 operator.add 是为了将字面量列的表达式与聚合列的表达式列表连接起来。你也可以直接使用 [F.lit(‘min’).alias(‘agg_type’)] + [F.col(f’min_{c}’).alias(c) for c in df.columns] 这样的列表拼接方式。
通过上述步骤,我们成功地将PySpark DataFrame的多个列聚合结果以所需的行式结构呈现,这对于需要按聚合类型进行行级别分析或报告的场景非常实用。
以上就是PySpark DataFrame多列聚合与结果行式展示教程的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/587914.html
微信扫一扫
支付宝扫一扫