PySpark中高效移除重复数据的两种策略

PySpark中高效移除重复数据的两种策略

本文详细阐述了在PySpark环境中处理重复数据的两种主要方法:针对原生PySpark SQL DataFrame的dropDuplicates()和针对PySpark Pandas DataFrame的drop_duplicates()。文章深入分析了这两种函数的用法、适用场景及关键区别,并通过代码示例和注意事项,指导用户根据其DataFrame类型选择最合适的去重策略,确保数据处理的准确性和效率。

PySpark中重复数据处理概述

在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。

使用 pyspark.sql.DataFrame.dropDuplicates() 进行去重

pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。

函数签名与用法

dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。

DataFrame.dropDuplicates(subset=None)

subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。

示例代码

假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# 初始化SparkSessionspark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate()# 创建一个示例PySpark SQL DataFramedata = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")]columns = ["CUSTOMER_ID", "NAME"]df_sql = spark.createDataFrame(data, columns)print("原始 PySpark SQL DataFrame:")df_sql.show()# 1. 对所有列进行去重df_distinct_all = df_sql.dropDuplicates()print("所有列去重后的 DataFrame:")df_distinct_all.show()# 2. 仅根据 'CUSTOMER_ID' 列进行去重# 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。# 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。# 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"])print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:")df_distinct_id.show()# 停止SparkSessionspark.stop()

输出示例:

原始 PySpark SQL DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C001|  Alice||       C003|Charlie||       C002|    Bob|+-----------+-------+所有列去重后的 DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C003|Charlie|+-----------+-------+根据 'CUSTOMER_ID' 列去重后的 DataFrame:+-----------+-------+|CUSTOMER_ID|   NAME|+-----------+-------+|       C001|  Alice||       C002|    Bob||       C003|Charlie|+-----------+-------+

使用 pyspark.pandas.DataFrame.drop_duplicates() 进行去重

PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。

函数签名与用法

drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。

DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)

subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。keep: 字符串,可选值有’first’、’last’或False。’first’: 保留第一个出现的重复行。’last’: 保留最后一个出现的重复行。False: 删除所有重复行(即,如果某行有重复,则该行及其所有重复项都会被删除)。inplace: 布尔值,如果为True,则在原始DataFrame上进行操作并返回None;如果为False,则返回一个新DataFrame。ignore_index: 布尔值,如果为True,则重置结果DataFrame的索引。

示例代码

import pyspark.pandas as psfrom pyspark.sql import SparkSession# 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession)spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate()# 创建一个示例PySpark Pandas DataFramedata = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"],        "NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]}psdf = ps.DataFrame(data)print("原始 PySpark Pandas DataFrame:")print(psdf)# 1. 对所有列进行去重 (默认 keep='first')psdf_distinct_all = psdf.drop_duplicates()print("所有列去重后的 DataFrame:")print(psdf_distinct_all)# 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first')print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:")print(psdf_distinct_id_first)# 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last')print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:")print(psdf_distinct_id_last)# 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False)print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:")print(psdf_distinct_id_false)# 停止SparkSession (如果需要,但通常在脚本结束时自动停止)spark.stop()

输出示例:

原始 PySpark Pandas DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob2        C001    Alice3        C003  Charlie4        C002      Bob所有列去重后的 DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:  CUSTOMER_ID     NAME0        C001    Alice1        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:  CUSTOMER_ID     NAME2        C001    Alice4        C002      Bob3        C003  Charlie根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:  CUSTOMER_ID     NAME3        C003  Charlie

选择正确的去重方法:关键区别与注意事项

选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。

DataFrame类型识别:

如果你通过spark.createDataFrame()或读取Spark数据源(如Parquet、CSV)创建DataFrame,你得到的是pyspark.sql.DataFrame。此时应使用dropDuplicates()。如果你通过pyspark.pandas.DataFrame()构造函数创建DataFrame,或者将pyspark.sql.DataFrame通过df.to_pandas_on_spark()(或旧版df.to_pandas())转换为pyspark.pandas.DataFrame,那么你应该使用drop_duplicates()。

你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。

API一致性:

dropDuplicates()是Spark原生的API,其行为和性能优化是基于Spark分布式计算模型设计的。drop_duplicates()则遵循Pandas的API规范,对于熟悉Pandas的用户来说更直观。它在底层会转换为Spark操作,但其接口与Pandas保持高度一致。

功能差异:

dropDuplicates()相对简洁,主要关注去重本身。当基于子集去重时,它保留哪个重复项是不确定的(通常是Spark内部优化决定的任意一个)。drop_duplicates()提供了keep参数,允许你精确控制保留第一个、最后一个还是删除所有重复项,这在某些业务场景下非常有用。

性能考量:两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。

总结

PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。

以上就是PySpark中高效移除重复数据的两种策略的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369278.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
Beautiful Soup 中定位字符串及其父标签
上一篇 2025年12月14日 09:31:23
利用BeautifulSoup定位字符串并获取其上下文标签
下一篇 2025年12月14日 09:31:43

相关推荐

  • composer require-dev和require有什么不同_Composer Require与Require-Dev区别解析

    require用于声明项目运行必需的依赖,如框架、数据库组件和第三方SDK,这些包会随项目部署到生产环境;2. require-dev用于声明仅在开发和测试阶段需要的工具,如PHPUnit、PHPStan、Faker等,不会默认部署到生产环境;3. 安装时composer install根据环境决定…

    2026年5月10日
    1000
  • php常量怎么用_PHP常量(define/const)定义与使用方法

    PHP中可通过define函数和const关键字定义常量,用于存储不可变值。define适用于全局作用域,支持动态名称和条件定义,如define(‘SITE_NAME’, ‘MyWebsite’);const在编译时生效,语法简洁但限制多,只能在类或全…

    2026年5月10日
    000
  • Go语言接口与切片:如何识别和操作[]interface{}

    本文将深入探讨Go语言中如何识别和操作`[]interface{}`类型的切片。我们将介绍类型断言(Type Assertion)的关键作用,并通过`switch`语句演示如何安全地检测`[]interface{}`类型,并进而遍历其内部元素。文章旨在提供清晰的示例代码和专业指导,帮助开发者有效地处…

    2026年5月10日
    000
  • c++中头文件和源文件的区别_c++头文件与源文件作用对比

    头文件声明接口,源文件实现逻辑。头文件含类、函数声明及宏定义,通过#include被多文件共享,用include守卫防重;源文件实现具体功能,编译为目标文件后由链接器合并。声明与实现分离提升模块化与编译效率,模板和内联函数因需编译时可见故常置于头文件,命名空间避免符号冲突,整体结构使项目更清晰易维护…

    2026年5月10日
    000
  • Go语言中复制数组的几种方法详解

    本文介绍了在 Go 语言中复制数组和切片的几种方法,重点讲解了内置的 `copy` 函数的使用方式,以及在多维切片场景下深拷贝与浅拷贝的区别,并提供了相应的代码示例。通过本文,你将掌握在不同场景下选择合适的复制方法,避免潜在的陷阱。 在 Go 语言中,复制数组和切片是一个常见的操作。根据不同的需求,…

    2026年5月10日
    000
  • C++怎么使用C++17的并行算法库_C++ std::execution与多核性能优化

    c++kquote>C++17通过std::execution策略引入并行算法支持,需编译器(如GCC 8+)和线程库(如TBB)配合;提供seq、par、par_unseq三种策略控制执行模式;可用于sort、for_each等算法提升大数据性能,但需避免数据竞争,推荐使用reduce等安全…

    2026年5月10日
    000
  • 解决PHP foreach循环中变量“继承”问题:理解与避免意外数据泄露

    本文探讨PHP foreach循环中一个常见的陷阱:当循环内部的数组或变量未被显式初始化时,其值可能会“继承”自上一次循环迭代,导致意外的数据泄露和逻辑错误。文章将深入分析这一现象的根源,并通过示例代码展示如何通过在每次迭代开始时正确初始化变量来解决此问题,确保代码行为的预期一致性。 引言:fore…

    2026年5月10日
    100
  • Pandas:基于条件和 Groupby 替换列中的特定字符

    本文介绍了如何使用 Pandas 库,结合 groupby 函数和字符串操作,根据特定条件替换 DataFrame 列中的字符。通过累积计数和字典映射,能够灵活地修改列中的特定部分,并根据替换值调整相关文本,实现数据清洗和转换的目的。 在数据分析和处理中,经常需要根据特定条件修改 DataFrame…

    2026年5月10日
    000
  • PHP动态网页数据库备份恢复_PHP动态网页MySQL数据库备份教程

    答案:PHP动态网页的MySQL数据库备份与恢复需通过定期导出SQL文件并安全存储来保障数据安全,核心方法包括使用mysqldump命令行工具实现高效灵活的自动化备份,利用phpMyAdmin图形化工具进行手动导出导入以降低操作门槛,以及通过PHP脚本调用系统命令将备份过程集成到应用中;恢复时可采用…

    2026年5月10日
    000
  • Go语言中sync.WaitGroup的深度解析与实践

    sync.WaitGroup是Go语言中用于并发编程的重要同步原语,它允许主协程等待一组子协程执行完毕。本文将深入探讨WaitGroup的工作原理、典型使用模式及其与sync.Mutex等其他同步机制的区别,并通过实际代码示例,帮助读者掌握其在并发控制中的应用,避免常见的误区,确保并发程序的正确性和…

    2026年5月10日
    000
  • HTML文档脚本怎么加载_HTML加载JavaScript教程

    脚本应优先通过defer或async异步加载以避免阻塞渲染;将脚本放在body底部可防阻塞,但推荐使用defer确保DOM解析完成后再执行;async适用于独立脚本,defer用于依赖DOM或需顺序执行的脚本;优化方式包括代码分割、懒加载、CDN加速和浏览器缓存;加载失败时应重试、降级处理并监控错误…

    2026年5月10日
    000
  • Python怎么实现一个上下文管理器_Python上下文管理器协议实现

    自定义Python上下文管理器需实现__enter__和__exit__方法,前者在进入with块时获取资源并返回对象,后者在退出时释放资源并可处理异常;通过类或contextlib.contextmanager装饰生成器函数均可创建;文件操作中with open()自动关闭文件是典型应用;__ex…

    2026年5月10日
    000
  • JavaScript解释器_javascript代码执行

    JavaScript通过引擎解析执行,先语法分析生成AST,再编译为字节码或机器码,最后执行;执行时创建上下文并入栈,同步代码直接运行,异步任务由API处理后回调入队,事件循环在调用栈空时将回调推入执行;此机制解释了变量提升、暂时性死区及宏任务与微任务执行顺序差异。 JavaScript代码的执行依…

    2026年5月10日
    000
  • CSS的display属性有哪些值?inline和block有什么区别?

    CSS的display属性有哪些值?inline和block有什么区别?CSS的display属性有哪些值?inline和block有什么区别?CSS的display属性有哪些值?inline和block有什么区别?CSS的display属性有哪些值?inline和block有什么区别?

    css的display属性通过定义元素的显示方式来控制网页布局。1.block元素独占一行,可设置宽高,默认如div、p等;2.inline元素不独占行,宽高由内容决定,如span、a;3.inline-block兼具block和inline特性,可并排显示且能设尺寸;4.none隐藏元素且不占空间…

    2026年5月10日 用户投稿
    000
  • Python Pandas:高效合并多工作簿多工作表 Excel 数据

    本教程详细指导如何使用 Python Pandas 库高效合并来自多个 Excel 文件中指定工作表的数据。文章将解释如何遍历文件目录、正确加载 Excel 文件、识别并解析特定工作表,并将来自不同文件的同名工作表数据智能地整合到一个 Pandas DataFrame 字典中,同时提供完整的示例代码…

    2026年5月10日
    000
  • C++怎么使用静态库和动态库_C++链接静态库与动态库的方法与区别

    静态库在编译时链接,生成独立可执行文件;动态库运行时加载,节省内存。1. 静态库用ar打包.o文件为.a,编译时通过-L和-l链接;2. 动态库需-fPIC编译生成.so,运行前配置LD_LIBRARY_PATH或系统路径;3. 静态库体积大但部署方便,动态库共享内存利于更新。 在C++项目开发中,…

    2026年5月10日
    000
  • HTML Class属性详解:多类名与命名规范

    HTML中的class属性用于为元素应用样式和行为。理解不同类型的类名定义方式至关重要,特别是单类名(如class=”name”或class=”name-new”)和多类名(如class=”name new”)之间的区别。核心在…

    2026年5月10日
    100
  • c++中&的作用 引用与取地址运算符区别解析

    在c++++中,&符号既可以作为引用运算符,也可以作为取地址运算符。1) 作为引用运算符时,&用于创建变量的别名,常用于函数参数和返回值,提高效率。2) 作为取地址运算符时,&返回…

    2026年5月10日
    100
  • 如何优化JavaScript代码的性能以避免运行时瓶颈?

    优化JavaScript性能需减少DOM操作,通过缓存查询、使用DocumentFragment和合并样式修改来降低重排重绘;2. 采用事件委托减少内存占用并提升绑定效率;3. 拆分长任务,利用requestIdleCallback、Web Worker和requestAnimationFrame避…

    2026年5月10日
    000
  • 为什么 TypeScript 比 JavaScript 更好

    javascript 长期以来一直是 web 开发的基石,支持从小型脚本到大型应用程序的各种项目。然而,随着项目规模的扩大,javascript 的动态类型和缺乏结构性可能会成为开发的瓶颈。typescript 应运而生,它凭借静态类型检查和强大的工具集,迅速成为许多开发者构建可靠、可扩展应用程序的…

    2026年5月10日
    100

发表回复

登录后才能评论
关注微信