
本教程详细探讨如何在数据帧中高效处理重复记录,并仅保留每组重复项中的指定数量(例如,最新的N条)。文章将介绍两种主流的数据处理工具:Pandas的groupby().tail()方法和PySpark的窗口函数。通过具体的代码示例和解释,帮助读者理解并应用这些技术,以优化数据清洗和预处理流程,特别是在处理大规模数据集时。
在数据分析和处理过程中,我们经常会遇到包含重复记录的数据集。虽然有时需要完全删除重复项,但在某些场景下,我们可能希望保留每组重复项中的特定数量,例如最新的n条记录。本文将深入探讨如何使用python的pandas库和pyspark框架,高效地实现这一目标。
1. 问题场景描述
假设我们有一个包含用户活动的数据帧,其中first_name、last_name和sex组合可能存在重复,但id和country是唯一的。我们的目标是针对每个重复的用户组合(由first_name、last_name和sex定义),只保留其最新的3条记录。这里的“最新”通常根据某个时间戳或递增的ID列来定义。
原始数据帧示例:
01JohnDoeMaleUSA02JohnDoeMaleCanada03JohnDoeMaleMexico04MarkKayMaleItaly05JohnDoeMaleSpain06MarkKayMaleFrance07JohnDoeMalePeru08MarkKayMaleIndia09MarkKayMaleLaos10JohnDoeMaleBenin
期望结果(保留每组重复项的最后3条,基于id排序):
05JohnDoeMaleSpain06MarkKayMaleFrance07JohnDoeMalePeru08MarkKayMaleIndia09MarkKayMaleLaos10JohnDoeMaleBenin
2. 使用 Pandas 实现:groupby().tail()
对于中小型数据集,Pandas提供了一个非常简洁且高效的方法来解决这个问题,即结合groupby()和tail()。
2.1 核心思想
定义重复组: 使用groupby()方法根据定义重复的列(例如first_name, last_name, sex)对数据帧进行分组。确定“最新”顺序: 在分组之前,确保数据帧已根据表示时间或顺序的列(例如id)进行排序。选择最后N条: 对每个分组应用tail(n)方法,它将返回该分组的最后n行。
2.2 示例代码
import pandas as pd# 示例数据帧data = { 'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'first_name': ['John', 'John', 'John', 'Mark', 'John', 'Mark', 'John', 'Mark', 'Mark', 'John'], 'last_name': ['Doe', 'Doe', 'Doe', 'Kay', 'Doe', 'Kay', 'Doe', 'Kay', 'Kay', 'Doe'], 'sex': ['Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male'], 'country': ['USA', 'Canada', 'Mexico', 'Italy', 'Spain', 'France', 'Peru', 'India', 'Laos', 'Benin']}df = pd.DataFrame(data)print("原始数据帧:")print(df)# 步骤1: 根据 'id' 列对数据帧进行排序,确保“最新”的定义是正确的# 默认升序,即较大的ID代表更新的记录df_sorted = df.sort_values(by='id')# 步骤2: 根据重复键进行分组,并对每个组保留最后3条记录result_df = df_sorted.groupby(['first_name', 'last_name', 'sex']).tail(3)# 步骤3: 重置索引(可选,但通常推荐,使索引连续)result_df = result_df.reset_index(drop=True)print("n处理后的数据帧:")print(result_df)
2.3 代码解析
df.sort_values(by=’id’): 这一步至关重要,它确保了在每个分组内部,tail(3)能够正确地选择出“最新”的3条记录。如果id是递增的,那么降序排列后取head(3)也可以达到相同的效果。df_sorted.groupby([‘first_name’, ‘last_name’, ‘sex’]): 根据指定的列组合创建分组对象。.tail(3): 对每个分组应用tail(3)操作,返回每个分组的最后3行。result_df.reset_index(drop=True): 清除旧的索引,并生成一个新的从0开始的连续索引。drop=True表示不将旧索引作为新列保留。
3. 使用 PySpark 实现:窗口函数
对于大规模数据集,PySpark提供了分布式处理能力,其窗口函数是处理此类问题的强大工具。
3.1 核心思想
定义窗口: 使用Window.partitionBy()定义分组的列,并使用orderBy()定义窗口内的排序规则。分配行号: 使用row_number()或rank()等窗口函数为每个分组内的记录分配一个序号。筛选: 根据分配的行号筛选出我们需要的N条记录。
3.2 示例代码
from pyspark.sql import SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql.window import Window# 初始化SparkSessionspark = SparkSession.builder.appName("FilterDuplicatesSpark").getOrCreate()# 示例数据data = [ (1, 'John', 'Doe', 'Male', 'USA'), (2, 'John', 'Doe', 'Male', 'Canada'), (3, 'John', 'Doe', 'Male', 'Mexico'), (4, 'Mark', 'Kay', 'Male', 'Italy'), (5, 'John', 'Doe', 'Male', 'Spain'), (6, 'Mark', 'Kay', 'Male', 'France'), (7, 'John', 'Doe', 'Male', 'Peru'), (8, 'Mark', 'Kay', 'Male', 'India'), (9, 'Mark', 'Kay', 'Male', 'Laos'), (10, 'John', 'Doe', 'Male', 'Benin')]columns = ['id', 'first_name', 'last_name', 'sex', 'country']df_spark = spark.createDataFrame(data, columns)print("原始Spark数据帧:")df_spark.show()# 步骤1: 定义窗口规范# partitionBy: 根据哪些列来分组# orderBy: 在每个分组内,根据哪些列进行排序。F.desc('id') 表示按id降序,以便row_number为1的是最新的记录。window_spec = Window.partitionBy('first_name', 'last_name', 'sex').orderBy(F.desc('id'))# 步骤2: 使用row_number()为每个分组内的记录分配行号df_with_row_number = df_spark.withColumn('row_number', F.row_number().over(window_spec))print("n添加行号后的Spark数据帧:")df_with_row_number.show()# 步骤3: 筛选出row_number小于等于3的记录,即每个分组的最新3条filtered_df = df_with_row_number.filter('row_number <= 3')# 步骤4: 移除辅助列row_numberresult_df_spark = filtered_df.drop('row_number')print("n处理后的Spark数据帧:")result_df_spark.show()# 停止SparkSessionspark.stop()
3.3 代码解析
Window.partitionBy(‘first_name’, ‘last_name’, ‘sex’): 定义了窗口的分组依据,与Pandas的groupby()类似。orderBy(F.desc(‘id’)): 在每个分组内部,根据id列进行降序排序。这意味着id值最大的记录(即最新的记录)将获得最小的行号。F.row_number().over(window_spec): 这是一个窗口函数,它为window_spec定义的每个分组中的每一行分配一个从1开始的连续整数行号。由于我们是按id降序排列,row_number=1对应于该组中id最大的记录。df_with_row_number.filter(‘row_number filtered_df.drop(‘row_number’): 移除在处理过程中添加的临时row_number列。
4. 性能与选择考量
Pandas groupby().tail():优点: 代码简洁,易于理解和实现,对于内存中的中小型数据集(通常几十万到几百万行)性能良好。缺点: 不适用于超出单机内存容量的超大数据集。sort_values和groupby操作在非常大的数据帧上可能会消耗大量内存和CPU。PySpark 窗口函数:优点: 专为分布式计算设计,能够处理TB级别甚至PB级别的超大规模数据集。通过将计算分布到集群中的多个节点上,避免了单机内存限制。缺点: 配置和运行Spark环境相对复杂,代码可能比Pandas版本稍长,对小数据集而言,启动SparkSession和分布式开销可能导致性能劣势。
选择建议:
如果数据量较小,能够轻松载入单机内存,且对开发效率有较高要求,优先选择Pandas。如果数据量巨大,需要分布式处理能力,或者已经在使用Spark生态系统,则PySpark窗口函数是更合适的选择。
5. 注意事项与最佳实践
排序的重要性: 无论是Pandas还是PySpark,确保用于排序的列(如id或时间戳)能够准确反映记录的“新旧”关系是至关重要的。错误的排序会导致筛选出错误的“最新”记录。重复键的定义: 仔细确定哪些列的组合构成了“重复项”。本例中是first_name, last_name, sex,但实际场景可能有所不同。内存管理(Pandas): 对于接近内存限制的数据集,可以考虑分块处理或使用Dask等工具。资源配置(PySpark): Spark作业的性能高度依赖于集群的资源配置(如Executor内存、核心数)。合理配置这些参数可以显著提高效率。替代窗口函数: 除了row_number(),PySpark还提供了rank()和dense_rank()。rank()在遇到相同排序值的记录时会跳过序号(例如1, 2, 2, 4),而dense_rank()则不会跳过(例如1, 2, 2, 3)。根据具体需求选择合适的函数。在本场景中,row_number()是最直接的选择,因为它为每条记录分配唯一的行号。
6. 总结
本文详细介绍了在Python数据生态中处理数据帧重复记录,并保留指定数量最新记录的两种主要方法:Pandas的groupby().tail()和PySpark的窗口函数。Pandas方案适用于中小型数据集,以其简洁性著称;而PySpark方案则为大规模分布式数据处理提供了高效且可扩展的解决方案。理解这两种方法的原理、适用场景及注意事项,将有助于您在实际数据处理工作中做出明智的技术选择,从而更有效地管理和清洗数据。
以上就是数据帧重复记录筛选:高效保留指定数量的最新数据的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1370202.html
微信扫一扫
支付宝扫一扫