
本教程旨在指导用户如何高效地从数据集中筛选重复记录,并为每个重复组保留指定数量(例如最后N条)的数据。我们将重点介绍Pandas中简洁高效的groupby().tail()方法,并与PySpark中基于窗口函数的方法进行对比,通过详细代码示例和最佳实践,帮助读者优化数据清洗流程。
问题场景描述
在数据清洗和预处理过程中,我们经常会遇到包含重复记录的数据集。这些重复记录可能基于一个或多个列的组合,但我们往往需要为每个重复组保留特定数量的记录,例如,只保留每个重复组中最新的n条记录。例如,在一个包含用户活动记录的dataframe中,我们可能希望针对每个用户(由first_name, last_name, sex等列定义),只保留其最新的3条活动记录。
假设我们有如下一个DataFrame:
01JohnDoeMaleUSA02JohnDoeMaleCanada03JohnDoeMaleMexico04MarkKayMaleItaly05JohnDoeMaleSpain06MarkKayMaleFrance07JohnDoeMalePeru08MarkKayMaleIndia09MarkKayMaleLaos10JohnDoeMaleBenin
目标是基于first_name、last_name和sex列的组合识别重复项,并为每个组合保留最新的3条记录(根据id列的降序)。
基于Pandas的解决方案:使用groupby().tail()
对于在内存中操作的Pandas DataFrame,groupby().tail()方法提供了一种非常简洁且高效的解决方案。
核心思路
排序数据: 首先,确保DataFrame按照定义“最新”的标准进行排序。在我们的例子中,id越大表示越新,因此需要按id升序排序。分组: 按照用于识别重复项的列(例如first_name, last_name, sex)进行分组。选取尾部记录: 对每个分组应用tail(n)方法,这将返回该分组中最后N条记录。由于我们已经提前排序,这些“最后N条”就是我们想要的“最新N条”。
示例代码
import pandas as pd# 原始DataFrame数据data = { 'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'first_name': ['John', 'John', 'John', 'Mark', 'John', 'Mark', 'John', 'Mark', 'Mark', 'John'], 'last_name': ['Doe', 'Doe', 'Doe', 'Kay', 'Doe', 'Kay', 'Doe', 'Kay', 'Kay', 'Doe'], 'sex': ['Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male'], 'country': ['USA', 'Canada', 'Mexico', 'Italy', 'Spain', 'France', 'Peru', 'India', 'Laos', 'Benin']}df = pd.DataFrame(data)# 1. 根据'id'列对DataFrame进行排序,确保'tail(3)'能获取到最新的3条记录# 如果'id'本身就是递增的,此步骤可确保正确性。df_sorted = df.sort_values(by='id').copy()# 2. 按照指定列进行分组,并为每个组保留最后3条记录result_df = df_sorted.groupby(['first_name', 'last_name', 'sex']).tail(3)# 3. (可选)重置索引,使索引连续result_df = result_df.reset_index(drop=True)# 显示结果DataFrameprint("处理后的DataFrame:")print(result_df)
输出结果:
处理后的DataFrame: id first_name last_name sex country0 5 John Doe Male Spain1 6 Mark Kay Male France2 7 John Doe Male Peru3 8 Mark Kay Male India4 9 Mark Kay Male Laos5 10 John Doe Male Benin
基于PySpark的解决方案:使用窗口函数
对于大规模分布式数据集,例如在Apache Spark环境中使用PySpark,groupby().tail()方法不再适用。此时,窗口函数(Window Functions)是实现此功能的标准且高效的方式。
核心思路
定义窗口规范: 使用Window.partitionBy()定义分组的列,并使用orderBy()定义组内排序的列。为了获取“最新”的记录,排序通常是降序(例如,id降序)。生成行号: 在每个分区(即每个重复组)内,根据排序规则为每条记录生成一个行号(row_number())。过滤: 筛选出row_number小于或等于N的记录。
示例代码(PySpark风格)
from pyspark.sql import SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql.window import Window# 假设df是一个Spark DataFrame# 这里为了示例,我们创建一个模拟的SparkSession和DataFramespark = SparkSession.builder.appName("FilterDuplicates").getOrCreate()data = [ (1, 'John', 'Doe', 'Male', 'USA'), (2, 'John', 'Doe', 'Male', 'Canada'), (3, 'John', 'Doe', 'Male', 'Mexico'), (4, 'Mark', 'Kay', 'Male', 'Italy'), (5, 'John', 'Doe', 'Male', 'Spain'), (6, 'Mark', 'Kay', 'Male', 'France'), (7, 'John', 'Doe', 'Male', 'Peru'), (8, 'Mark', 'Kay', 'Male', 'India'), (9, 'Mark', 'Kay', 'Male', 'Laos'), (10, 'John', 'Doe', 'Male', 'Benin')]columns = ['id', 'first_name', 'last_name', 'sex', 'country']df_spark = spark.createDataFrame(data, columns)# 定义窗口规范:按first_name, last_name, sex分组,按id降序排序window_spec = Window.partitionBy('first_name', 'last_name', 'sex').orderBy(F.desc('id'))# 为每个分区内的记录生成行号df_with_row_number = df_spark.withColumn('row_number', F.row_number().over(window_spec))# 过滤,只保留行号小于等于3的记录filtered_df_spark = df_with_row_number.filter('row_number <= 3')# 移除辅助列row_numberresult_df_spark = filtered_df_spark.drop('row_number')# 显示结果print("处理后的Spark DataFrame:")result_df_spark.show()spark.stop()
输出结果:
处理后的Spark DataFrame:+---+----------+---------+----+-------+| id|first_name|last_name| sex|country|+---+----------+---------+----+-------+| 5| John| Doe|Male| Spain|| 7| John| Doe|Male| Peru|| 10| John| Doe|Male| Benin|| 6| Mark| Kay|Male| France|| 8| Kay | Kay|Male| India|| 9| Mark| Kay|Male| Laos|+---+----------+---------+----+-------+
效率与选择考量
Pandas groupby().tail(): 对于数据集能够完全载入内存的情况,groupby().tail()方法通常非常高效且代码简洁。它是Pandas中处理此类问题的首选方法。其内部实现经过高度优化,能够有效处理分组和选择操作。PySpark 窗口函数: 对于大规模分布式数据集,当数据量超出单机内存限制时,PySpark的窗口函数是唯一的选择。虽然代码可能比Pandas版本稍长,但它能在分布式集群上高效执行,避免了数据收集到单个节点的瓶颈。Spark的优化器能够智能地处理窗口操作,确保性能。
注意事项
排序的重要性: 无论是Pandas还是PySpark,定义“最新”或“最旧”的关键在于正确的排序。如果“最新”是基于时间戳,则应按时间戳列排序。如果“最新”是基于某个ID,则按ID排序。性能优化:在Pandas中,如果DataFrame非常大,sort_values()可能会消耗较多内存和时间。确保你的系统有足够的资源。在PySpark中,窗口操作会涉及数据重分区(shuffle),这可能是一个耗时操作。合理选择partitionBy的列,避免创建过多或过少的分区,有助于优化性能。reset_index(): 在Pandas中,groupby().tail()操作会保留原始索引。如果需要一个从0开始的连续索引,记得调用reset_index(drop=True)。
总结
本文详细介绍了在数据处理中,如何根据特定分组筛选重复记录并保留指定数量(N)的最新数据。对于内存中的数据集,Pandas的df.sort_values().groupby().tail(N)组合方法提供了一个简洁高效的解决方案。而对于分布式大数据集,PySpark的窗口函数(Window.partitionBy().orderBy().row_number())则是实现相同逻辑的标准且高性能途径。理解这两种方法的适用场景和实现原理,能帮助开发者根据实际需求选择最合适的工具和策略,从而高效地完成数据清洗任务。
以上就是Pandas数据处理:高效筛选重复记录并保留指定数量的最新数据的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1370212.html
微信扫一扫
支付宝扫一扫