PySpark中高效移除重复数据的两种策略

PySpark中高效移除重复数据的两种策略

本文详细阐述了在PySpark环境中处理重复数据的两种主要方法:针对原生PySpark SQL DataFrame的dropDuplicates()和针对PySpark Pandas DataFrame的drop_duplicates()。文章深入分析了这两种函数的用法、适用场景及关键区别,并通过代码示例和注意事项,指导用户根据其DataFrame类型选择最合适的去重策略,确保数据处理的准确性和效率。

PySpark中重复数据处理概述

在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。

使用 pyspark.sql.DataFrame.dropDuplicates() 进行去重

pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。

函数签名与用法

dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。

DataFrame.dropDuplicates(subset=None)

subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。

示例代码

假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# 初始化SparkSessionspark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate()# 创建一个示例PySpark SQL DataFramedata = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")]columns = ["CUSTOMER_ID", "NAME"]df_sql = spark.createDataFrame(data, columns)print("原始 PySpark SQL DataFrame:")df_sql.show()# 1. 对所有列进行去重df_distinct_all = df_sql.dropDuplicates()print("所有列去重后的 DataFrame:")df_distinct_all.show()# 2. 仅根据 'CUSTOMER_ID' 列进行去重# 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。# 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。# 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"])print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:")df_distinct_id.show()# 停止SparkSessionspark.stop()

输出示例:

原始 PySpark SQL DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C001|  Alice||       C003|Charlie||       C002|    Bob|+-----------+-------+所有列去重后的 DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C003|Charlie|+-----------+-------+根据 'CUSTOMER_ID' 列去重后的 DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C003|Charlie|+-----------+-------+

使用 pyspark.pandas.DataFrame.drop_duplicates() 进行去重

PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。

函数签名与用法

drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。

DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)

subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。keep: 字符串,可选值有’first’、’last’或False。’first’: 保留第一个出现的重复行。’last’: 保留最后一个出现的重复行。False: 删除所有重复行(即,如果某行有重复,则该行及其所有重复项都会被删除)。inplace: 布尔值,如果为True,则在原始DataFrame上进行操作并返回None;如果为False,则返回一个新DataFrame。ignore_index: 布尔值,如果为True,则重置结果DataFrame的索引。

示例代码

import pyspark.pandas as psfrom pyspark.sql import SparkSession# 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession)spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate()# 创建一个示例PySpark Pandas DataFramedata = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"],        "NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]}psdf = ps.DataFrame(data)print("原始 PySpark Pandas DataFrame:")print(psdf)# 1. 对所有列进行去重 (默认 keep='first')psdf_distinct_all = psdf.drop_duplicates()print("所有列去重后的 DataFrame:")print(psdf_distinct_all)# 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first')print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:")print(psdf_distinct_id_first)# 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last')print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:")print(psdf_distinct_id_last)# 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False)print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:")print(psdf_distinct_id_false)# 停止SparkSession (如果需要,但通常在脚本结束时自动停止)spark.stop()

输出示例:

原始 PySpark Pandas DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob2        C001    Alice3        C003  Charlie4        C002      Bob所有列去重后的 DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:  CUSTOMER_ID     NAME2        C001    Alice4        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:  CUSTOMER_ID     NAME3        C003  Charlie

选择正确的去重方法:关键区别与注意事项

选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。

DataFrame类型识别:

如果你通过spark.createDataFrame()或读取Spark数据源(如Parquet、CSV)创建DataFrame,你得到的是pyspark.sql.DataFrame。此时应使用dropDuplicates()。如果你通过pyspark.pandas.DataFrame()构造函数创建DataFrame,或者将pyspark.sql.DataFrame通过df.to_pandas_on_spark()(或旧版df.to_pandas())转换为pyspark.pandas.DataFrame,那么你应该使用drop_duplicates()。

你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。

API一致性:

dropDuplicates()是Spark原生的API,其行为和性能优化是基于Spark分布式计算模型设计的。drop_duplicates()则遵循Pandas的API规范,对于熟悉Pandas的用户来说更直观。它在底层会转换为Spark操作,但其接口与Pandas保持高度一致。

功能差异:

dropDuplicates()相对简洁,主要关注去重本身。当基于子集去重时,它保留哪个重复项是不确定的(通常是Spark内部优化决定的任意一个)。drop_duplicates()提供了keep参数,允许你精确控制保留第一个、最后一个还是删除所有重复项,这在某些业务场景下非常有用。

性能考量:两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。

总结

PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。

以上就是PySpark中高效移除重复数据的两种策略的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369278.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 09:31:23
下一篇 2025年12月14日 09:31:43

相关推荐

  • Pandas DataFrame高效提取Top N值及其行列坐标

    本文详细介绍了如何利用Pandas的stack()和nlargest()方法,高效地从DataFrame中提取指定数量的最大值,并获取这些值对应的行和列坐标。通过专业示例代码,读者将学会如何快速定位数据中的关键点,优化数据分析流程。 在数据分析中,我们经常需要从大型pandas dataframe中…

    2025年12月14日
    000
  • 编程实践:结果正确,过程更需严谨——如何精确遵循代码实现指令

    本文探讨了在编程实践中,即使代码输出了正确的结果,也必须严格遵循指定的实现步骤的重要性。通过一个计算总分的具体案例,文章对比了直接求和与按指令逐步累加两种方法,强调了过程的正确性对于代码可读性、可维护性和未来扩展性的深远影响,并提供了专业的指导和示例代码。 在软件开发中,我们常常会遇到这样的情况:一…

    2025年12月14日
    000
  • 编程实践:如何正确实现变量累加与遵循代码指令

    本文探讨在编程中实现变量累加的正确方法,强调即使程序输出结果正确,也必须严格遵循代码指令和逻辑规范。通过对比直接求和赋值与逐步累加两种方式,详细阐述了变量累加的最佳实践,并强调了遵循指令对于代码可读性、可维护性及团队协作的重要性。 理解变量累加的正确姿势 在软件开发过程中,我们经常会遇到需要对一系列…

    2025年12月14日
    000
  • 从多个局部排名列表重构全局排名列表的算法实践

    本文探讨了如何将多个评委提供的部分排名列表(可能存在分歧和缺失)有效地聚合成一个统一的全局排名列表。通过为每个评委的排名赋予位置分数,然后对这些分数进行累加和排序,可以生成一个综合性的、考虑了各项物品在不同评委眼中相对重要性的全局排名,从而有效解决在评审过程中遇到的复杂排名聚合问题。 问题背景:多源…

    2025年12月14日
    000
  • 使用Pandas高效计算时间序列数据的年度平均值

    本文将详细介绍如何利用Pandas库高效地将月度时间序列数据聚合为年度平均值。通过groupby()结合dt.year提取年份,并使用agg(‘mean’)对指定列进行平均值计算,最终生成一个简洁的年度统计数据框。文章将提供示例代码和方法解析,帮助读者掌握Pandas在时间序…

    2025年12月14日
    000
  • 掌握USDA食品数据API分页获取完整营养信息教程

    本教程详细介绍了如何通过Python有效地从USDA食品数据API获取完整的营养事实数据。针对API默认返回结果受限(如50条)的问题,文章深入探讨了API分页机制,并提供了利用pageSize和pageNumber参数迭代获取所有数据项的解决方案。教程包含示例代码、错误处理和最佳实践,旨在帮助开发…

    2025年12月14日
    000
  • 如何在GeoDataFrame中高效选择单个值:理解索引与位置

    本教程深入探讨GeoDataFrame中选择单个值的常见误区,尤其是在数据过滤后。我们将解释为什么直接通过索引访问可能失败,并介绍如何使用.iloc进行基于位置的精确选择。通过实例代码,读者将掌握在GeoDataFrame中安全、有效地提取单个几何对象或其他列值的方法,避免因索引非连续性导致的错误。…

    2025年12月14日
    000
  • Python如何使用装饰器_Python装饰器原理与实践指南

    Python装饰器是接收函数并返回增强函数的特殊函数,用于添加日志、权限检查等功能而不修改原函数代码。通过@语法糖应用,结合functools.wraps保留元数据,利用闭包和函数一等公民特性实现功能增强,支持带参装饰和类装饰器,适用于横切关注点,提升代码复用性与可维护性。 Python装饰器,说白…

    2025年12月14日
    000
  • Python中生成器函数用法详解 Python中yield关键字教程

    生成器函数与普通函数的本质区别在于:普通函数执行后返回值并销毁状态,而生成器函数通过yield暂停并保持状态,返回生成器对象实现惰性求值和内存高效迭代。 Python中的生成器函数和 yield 关键字,是处理大量数据或构建高效迭代器时非常强大的工具。它们的核心思想在于“按需生成”数据,而不是一次性…

    2025年12月14日
    000
  • Python怎么使用Pandas库_Pandas数据处理入门指南

    Pandas数据清洗常用技巧包括处理缺失值、重复值、异常值、文本数据、日期时间及数据标准化。具体为:用dropna()或fillna()处理缺失值;drop_duplicates()去除重复数据;通过IQR或标准差识别异常值并合理处理;利用str方法清洗文本,如去空格、大小写转换;用to_datet…

    2025年12月14日
    000
  • Python中数组如何操作 Python中数组操作教程

    Python中的“数组”主要指list和numpy.ndarray。list是内置的异构序列,支持多种数据类型和动态操作,适合小规模或非数值数据处理;而numpy.ndarray是同质多维数组,基于C实现,内存连续,支持高效数值运算和广播操作,适用于大规模科学计算。两者可通过np.array()和t…

    2025年12月14日
    000
  • Python如何操作集合_Python集合使用方法归纳

    Python集合是无序、不重复元素的容器,适用于去重、快速成员检测及数学集合运算。 Python集合,在我看来,是处理数据去重和执行数学集合运算时,一个极其高效且优雅的工具。它本质上是一个无序且不包含重复元素的容器。你可以通过字面量 {} (但注意, {} 创建的是空字典,空集合需要用 set() …

    2025年12月14日
    000
  • Python中排序算法如何实现 Python中排序算法详解

    选择合适的排序算法需根据数据规模、特性、内存限制和稳定性需求综合判断,Python内置sort()和sorted()方法高效且支持自定义key函数实现灵活排序,实际应用中推荐使用内置方法而非手动实现。 Python中排序算法的实现,本质上是将一系列无序的数据,通过特定的步骤,最终变成有序排列的过程。…

    2025年12月14日
    000
  • python怎么连接mysql数据库_python数据库操作指南

    Python连接MySQL需使用PyMySQL等库作为“桥梁”,通过API发送SQL指令。首先安装库并建立连接,注意配置host、user、password等参数,推荐使用环境变量避免硬编码。常见认证问题包括用户名密码错误、权限不足(如’@localhost’与’…

    2025年12月14日
    000
  • Python如何实现排序_Python排序算法与应用实例

    Python内置排序基于Timsort算法,结合归并排序与插入排序,兼具高效性与稳定性,适用于绝大多数场景;日常开发应优先使用list.sort()或sorted(),仅在学习、特定数据分布或极端优化需求下才考虑手写排序算法。 Python实现排序主要依赖其内置的 list.sort() 方法和 s…

    2025年12月14日
    000
  • Python怎么使用NumPy库_NumPy数组操作教程一览

    NumPy是Python科学计算的核心库,提供高性能多维数组ndarray及向量化操作工具。通过import numpy as np导入后,可使用np.array()、np.zeros()、np.ones()、np.linspace()等函数创建数组,相比Python列表,ndarray存储同类型数…

    2025年12月14日
    000
  • Python中列表如何添加元素 Python中列表添加元素方法

    Python中向列表添加元素有append()、insert()、extend()和+运算符四种主要方式。append()用于在末尾添加单个元素;insert()可在指定位置插入元素,但频繁使用尤其在列表开头插入时性能较差,时间复杂度为O(n);extend()适用于将可迭代对象的元素逐个添加到列表…

    2025年12月14日
    000
  • python怎么创建列表_python列表操作完全指南

    Python创建列表最常用方式是用方括号[]直接定义,如my_list = [1, 2, 3];也可用list()构造函数转换可迭代对象,或使用列表推导式[expr for item in iterable if cond]实现简洁高效的列表生成;列表支持通过索引和切片访问及修改元素,结合appen…

    2025年12月14日
    000
  • Python中生成器如何使用 Python中生成器教程

    生成器是一种特殊函数,通过yield实现惰性求值,按需返回值并暂停执行。调用生成器函数返回迭代器对象,每次next()或for循环触发时从上次暂停处继续,直到下一个yield。如示例所示,生成器分步输出1、2、3,每次执行到yield暂停,有效节省内存,适合处理大数据或无限序列。 Python中的生…

    2025年12月14日
    000
  • Python如何操作列表_Python列表常用方法汇总

    Python列表是可变有序序列,支持增删改查、切片和排序等操作,适用于需动态修改且顺序重要的数据场景,其灵活性高于元组和集合,但需注意迭代修改、浅拷贝陷阱及性能优化,如用列表推导式和deque提升效率。 Python列表是Python编程中最基础也最强大的数据结构之一,它本质上是一个动态数组,允许存…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信