Redshift大数据量DataFrame高速插入策略

redshift大数据量dataframe高速插入策略

本文旨在解决从Python DataFrame向Amazon Redshift数据库插入大量数据时效率低下的问题。我们将探讨并对比两种主要的高速插入策略:优化的SQL批量插入(通过psycopg2.extras.execute_values)和Redshift官方推荐的COPY命令(结合S3作为中间存储),提供详细的实现代码和最佳实践,帮助用户显著提升数据加载性能,避免长时间等待和超时错误。

Redshift大数据插入的挑战与优化

在处理大规模数据时,将Python DataFrame中的数据高效地导入到Amazon Redshift等列式存储数据库是一个常见的挑战。传统的逐行插入或使用executemany的批量插入方法,对于Redshift这类针对批量加载优化的数据库而言,效率往往低下,容易导致长时间运行甚至超时错误。Redshift的设计哲学是利用并行处理能力,一次性处理大量数据,而非频繁的小事务。

用户尝试的两种方法,无论是将DataFrame转换为字典列表后使用executemany,还是转换为元组列表后循环execute,都未能达到理想的速度。这主要是因为这些方法在底层可能仍然导致数据库执行了大量独立的INSERT语句,或者未能充分利用Redshift的并行加载优势。对于数十万甚至数百万行的数据,我们需要更专业的策略。

Redshift官方文档明确指出:“如果COPY命令不是一个选项,并且您需要SQL插入,请尽可能使用多行插入。当您一次只添加一行或几行数据时,数据压缩效率低下。” 这强调了两种核心优化方向:多行SQL插入和更高效的COPY命令。

方法一:优化SQL插入(批量插入)

虽然Redshift推荐使用COPY命令进行大规模数据加载,但在某些场景下,如果数据量不是极端巨大(例如数十万到数百万行),或者不希望引入S3作为中间存储的复杂性,优化的SQL批量插入仍然是一个可行的选择。这里的“优化”指的是使用数据库驱动程序提供的、能够将多行数据打包成单个SQL语句的机制,而不是发送多个独立的INSERT语句。

psycopg2库提供了psycopg2.extras.execute_values函数,它能够高效地构建一个包含多组VALUES的多行INSERT语句,并一次性发送给数据库。这比循环执行单行插入或简单的executemany(在某些情况下可能仍然分解为多个语句)效率更高。

实现示例:使用 psycopg2.extras.execute_values

import pandas as pdimport psycopg2from psycopg2.extras import execute_valuesfrom datetime import date# 假设这是你的DataFrame数据data = [    (69370, 'subject', 'working', 1, date(2023, 12, 15)),    (69370, 'subject', 'scenes', 1, date(2023, 12, 15)),    (69370, 'subject', 'intended', 1, date(2023, 12, 15)),    (69371, 'subject', 'redirected', 1, date(2023, 12, 15)),    (69371, 'subject', 'ge', 2, date(2023, 12, 15)),    (69371, 'subject', 'sensor', 1, date(2023, 12, 15)),    (69371, 'subject', 'flush', 1, date(2023, 12, 15)),    (69371, 'subject', 'motion', 1, date(2023, 12, 15)),    (69371, 'subject', 'led', 1, date(2023, 12, 15)),    (69371, 'subject', 'fixture', 1, date(2023, 12, 15)),    (69371, 'subject', 'contact', 1, date(2023, 12, 15)),    # ... 更多数据,假设有60万条记录]# 为了演示,我们生成更多数据for i in range(100000): # 模拟大量数据    data.append((70000 + i, 'subject_new', f'text_{i}', i % 5 + 1, date(2023, 12, 15)))df = pd.DataFrame(data, columns=['case_id', 'column_name', 'split_text', 'split_text_cnt', 'load_ts'])# Redshift连接参数REDSHIFT_HOST = 'redshift-####-dev.00000.us-east-1.redshift.amazonaws.com'REDSHIFT_DB = '*****'REDSHIFT_USER = '****'REDSHIFT_PASSWORD = '*****'REDSHIFT_PORT = '5439'conn = Nonecur = Nonetry:    conn = psycopg2.connect(        host=REDSHIFT_HOST,        database=REDSHIFT_DB,        user=REDSHIFT_USER,        password=REDSHIFT_PASSWORD,        port=REDSHIFT_PORT    )    conn.autocommit = False # 确保在事务中操作    print("成功连接到 RedShift")    cur = conn.cursor()    table_name = "odey.sfc_ca_sit_di"    columns = "(case_id, column_name, split_text, split_text_cnt, load_ts)"    # 将DataFrame转换为元组列表    # 注意:日期对象需要被psycopg2正确处理,通常直接传递date对象即可    rows_to_insert = [tuple(row) for row in df.itertuples(index=False)]    # 定义批量大小,可以根据网络、数据库性能调整    batch_size = 10000     total_inserted_rows = 0    print(f"开始批量插入 {len(rows_to_insert)} 条记录...")    for i in range(0, len(rows_to_insert), batch_size):        batch = rows_to_insert[i:i + batch_size]        sql = f"INSERT INTO {table_name} {columns} VALUES %s"        execute_values(cur, sql, batch)        total_inserted_rows += len(batch)        print(f"已插入 {total_inserted_rows} / {len(rows_to_insert)} 条记录")    conn.commit()    print(f"所有 {total_inserted_rows} 条记录成功插入 (批量插入方式)")except psycopg2.Error as e:    if conn:        conn.rollback()    print(f"批量插入失败: {e}")except Exception as e:    print(f"发生未知错误: {e}")finally:    if cur:        cur.close()    if conn:        conn.close()    print("数据库连接已关闭。")

注意事项

批量大小(batch_size):选择合适的批量大小至关重要。过小会增加数据库交互次数,过大可能导致单个SQL命令超过Redshift的16MB限制,或消耗过多内存。通常,几千到几万行是一个合理的起点,需要根据实际环境进行测试和调整。事务管理:务必在事务中执行批量插入,即在所有批次完成后统一commit(),如果任何批次失败则rollback()。这能保证数据的一致性。数据类型匹配:确保DataFrame中的数据类型与Redshift目标表的列类型严格匹配,否则可能导致插入失败。

方法二:Redshift COPY 命令(推荐的超高速方案)

对于真正大规模的数据加载(数百万行甚至TB级别),Redshift官方强烈推荐使用COPY命令。COPY命令是Redshift专门为高速数据加载设计的,它能够直接从Amazon S3、Amazon DynamoDB或Amazon EMR等数据源并行加载数据,效率远超任何基于SQL的INSERT方法。

其核心思想是:将DataFrame数据导出为文件(如CSV、Parquet),上传到Amazon S3,然后指示Redshift从S3读取这些文件并加载到表中。

工作流程

DataFrame导出为文件:将Python DataFrame中的数据导出为CSV或Parquet格式的文件。对于大型数据集,建议将数据分割成多个小文件(例如,每个文件1GB左右),以充分利用Redshift的并行加载能力。上传至Amazon S3:使用boto3库将这些文件上传到预配置的S3存储桶。执行Redshift COPY命令:通过psycopg2连接Redshift,并执行COPY SQL命令,指定S3文件的位置、IAM角色、文件格式等参数。

实现示例:使用 Pandas, Boto3, Psycopg2

import pandas as pdimport boto3import ioimport psycopg2from datetime import dateimport os# 假设这是你的DataFrame数据data = [    (69370, 'subject', 'working', 1, date(2023, 12, 15)),    (69370, 'subject', 'scenes', 1, date(2023, 12, 15)),    (69370, 'subject', 'intended', 1, date(2023, 12, 15)),    (69371, 'subject', 'redirected', 1, date(2023, 12, 15)),    (69371, 'subject', 'ge', 2, date(2023, 12, 15)),    (69371, 'subject', 'sensor', 1, date(2023, 12, 15)),    (69371, 'subject', 'flush', 1, date(2023, 12, 15)),    (69371, 'subject', 'motion', 1, date(2023, 12, 15)),    (69371, 'subject', 'led', 1, date(2023, 12, 15)),    (69371, 'subject', 'fixture', 1, date(2023, 12, 15)),    (69371, 'subject', 'contact', 1, date(2023, 12, 15)),    # ... 更多数据]# 为了演示,我们生成更多数据 (约60万条)for i in range(600000):     data.append((70000 + i, 'subject_new', f'text_{i}', i % 5 + 1, date(2023, 12, 15)))df = pd.DataFrame(data, columns=['case_id', 'column_name', 'split_text', 'split_text_cnt', 'load_ts'])# 将日期列转换为字符串,以匹配CSV格式df['load_ts'] = df['load_ts'].astype(str)# S3配置S3_BUCKET_NAME = 'your-s3-bucket-for-redshift-data' # 替换为你的S3桶名S3_KEY_PREFIX = 'redshift_temp_data/' # S3上的路径前缀IAM_ROLE_ARN = 'arn:aws:iam::YOUR_ACCOUNT_ID:role/YourRedshiftIAMRole' # 替换为具有S3读权限的IAM角色ARNAWS_REGION = 'us-east-1' # S3桶和Redshift集群所在的AWS区域# Redshift连接参数REDSHIFT_HOST = 'redshift-####-dev.00000.us-east-1.redshift.amazonaws.com'REDSHIFT_DB = '*****'REDSHIFT_USER = '****'REDSHIFT_PASSWORD = '*****'REDSHIFT_PORT = '5439'conn = Nonecur = Nones3_client = boto3.client('s3', region_name=AWS_REGION)try:    # 1. DataFrame导出为CSV并上传到S3    print("开始将DataFrame导出为CSV并上传到S3...")    file_name = f"data_{pd.Timestamp.now().strftime('%Y%m%d%H%M%S')}.csv"    s3_full_key = S3_KEY_PREFIX + file_name    csv_buffer = io.StringIO()    # 注意:header=False, index=False 是COPY命令的常见要求    df.to_csv(csv_buffer, index=False, header=False, sep=',', encoding='utf-8')     s3_client.put_object

以上就是Redshift大数据量DataFrame高速插入策略的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1382660.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月15日 00:25:45
下一篇 2025年12月11日 22:16:01

相关推荐

  • SQLAlchemy 2.0与Pydantic:实现类型安全的模型转换

    本文旨在解决sqlalchemy orm模型与pydantic数据模型在类型转换过程中常见的类型不匹配问题,特别是在使用mypy等类型检查工具时。我们将深入探讨如何利用sqlalchemy 2.0的声明式映射(declarative mapping)和`mapped`类型提示,结合pydantic的…

    2025年12月15日
    000
  • 实现Python可重用迭代器:构建自定义range类

    在python中,生成器函数创建的是一次性迭代器,一旦遍历完成便耗尽。与此不同,内置的`range`对象是一个可重用的可迭代对象,每次请求迭代时都能提供新的序列。本文将深入探讨python中迭代器和可迭代对象的机制,并通过构建一个自定义类来模拟内置`range`的行为,使其具备可重用性,从而解决生成…

    2025年12月15日
    000
  • 在GitLab CI/CD中运行Pyglet渲染测试的终极指南

    在无头ci/cd环境中运行需要图形渲染的pyglet测试常会遇到`nosuchconfigexception`错误。本文将详细指导您如何通过配置gitlab ci/cd管道,利用xvfb(x虚拟帧缓冲器)创建一个虚拟显示环境,从而成功执行pyglet渲染测试。我们将提供一个完整的`gitlab-ci…

    2025年12月15日
    000
  • 使用Python和正则表达式统计特定标记词后的单词数量

    本文详细介绍了如何利用python和正则表达式精确统计字符串中特定下划线标记词后的单词数量。教程提供了两种正则表达式模式及相应的python实现,分别用于在统计中包含或排除标记词本身。通过具体代码示例和解析,帮助读者掌握根据不同需求进行单词计数的技巧,确保结果的准确性和灵活性。 在文本处理中,我们经…

    2025年12月15日
    000
  • Python pynput 键盘监听器与外部循环控制:实现精确程序终止

    本文详细探讨了如何在使用 `pynput.keyboard.Listener` 监听键盘事件时,通过特定按键(如 `Esc`)精确控制外部程序循环的终止。文章分析了直接返回 `False` 无法停止外部循环的原因,并提供了一种基于共享布尔标志的解决方案,通过在回调函数中修改该标志,并由主循环检查其状…

    2025年12月15日
    000
  • Python FileNotFoundError 深度解析与文件路径处理教程

    本文深入探讨了python中常见的`filenotfounderror`(错误码2),详细解析了其发生原因,主要归结为文件路径不正确或对当前工作目录的误解。教程提供了识别、诊断和解决此类错误的实用方法,包括理解相对路径与绝对路径、使用`os`模块进行路径管理和调试,并通过具体代码示例指导读者正确处理…

    2025年12月15日
    000
  • Python面向对象设计:利用组合模式构建灵活的多层级数据结构

    本文探讨了在python中如何通过面向对象设计处理具有可变子属性的复杂数据结构。针对一个站点可能拥有多个校区(或无校区)的场景,我们提出并演示了使用独立类(如`campus`)与主类(如`site`)进行组合(composition)的模式,从而实现高度模块化、灵活且易于扩展的代码结构,避免了冗余和…

    2025年12月15日
    000
  • 使用 pddl Python 框架实现旅行商问题:解决动作效果定义中的递归错误

    本教程探讨了在使用 `pddl` python 框架为旅行商问题(tsp)建模时,定义 pddl 动作效果时可能遇到的 `recursionerror`。核心问题在于错误地使用字符串拼接来构建动作效果。文章将详细解释为何应使用 `pddl` 库提供的逻辑运算符来正确构建 pddl 表达式,并提供正确…

    2025年12月15日
    000
  • 从包含字典列表的DataFrame列创建新DataFrame

    本文详细介绍了如何将pandas dataframe中包含字典列表的复杂列展开为多个独立的列。通过两种主要方法,包括使用`.str[0]`结合`.apply(pd.series)`进行直接转换,以及通过模板字典和`.where()`方法更精细地处理空列表和缺失值,帮助读者高效地从嵌套数据结构中提取并…

    2025年12月15日
    000
  • python namedtuple中加入新字段

    无法直接修改namedtuple添加字段,但可通过重新定义新类型并继承原数据实现扩展,例如使用_fields结合*args创建新实例,或通过_asdict()转为字典后更新字段,也可封装函数复用逻辑;Python 3.6+推荐用typing.NamedTuple显式定义新类,支持默认值与类型注解,但…

    2025年12月15日
    000
  • 自动化CSV列传输:适配电商平台的产品数据集成指南

    本教程旨在指导用户如何将来自联盟网络的CSV产品数据适配到如ClipMyDeals等电商主题所需的特定CSV格式。文章将详细介绍通过手动操作和Python脚本自动化两种方法,高效地从源文件中提取、重命名并整合必要的列,同时强调查阅主题官方文档的重要性,以确保数据格式的准确性和导入的成功率。 1. 理…

    2025年12月15日
    000
  • python嵌套列表如何拷贝

    必须使用深拷贝避免引用共享,因赋值或切片仅创建浅拷贝,修改嵌套元素会影响原列表;使用copy.deepcopy()可递归复制所有层级,确保数据独立。 Python中嵌套列表的拷贝不能简单使用赋值操作,因为这只会复制引用,修改原列表或新列表会影响彼此。要真正拷贝嵌套列表,必须进行深拷贝。 使用 cop…

    2025年12月15日
    000
  • python中字典dict函数是如何使用的?

    Python中字典用于存储键值对,可通过花括号直接定义或dict()函数创建;dict()支持关键字参数、元组列表和复制字典三种方式;常见操作包括增删改查,如添加d[‘key’]=’value’、判断键是否存在等,使用灵活方便。 字典(dict)在Py…

    2025年12月15日
    000
  • python中如何删除dict元素?

    del 删除指定键,键不存在时抛出 KeyError;2. pop() 删除键并返回值,可设默认值避免错误;3. popitem() 删除并返回最后一个键值对;4. clear() 清空所有元素。 在 Python 中删除字典(dict)元素有几种常用方法,根据不同的使用场景可以选择合适的方式。 使…

    2025年12月15日
    000
  • python中exp函数如何实现指数计算?

    Python中exp函数用于计算e的x次方,主要通过math模块和numpy模块实现;math.exp()适用于单个数值,如math.exp(2)返回约7.389;而numpy.exp()可处理数组或列表,支持逐元素计算,适合批量数据处理;注意math.exp()仅接受实数,不支持列表或复数,传入非…

    2025年12月15日
    000
  • python引入模块的import语句

    import语句用于引入模块以提高代码复用性,基本语法为import模块名;可通过as设置别名如import numpy as np;使用from…import可导入特定内容如from datetime import datetime;避免使用from module import *以防…

    2025年12月15日
    000
  • Python NameError 的常见原因与解决方法

    NameError通常由未定义变量、拼写错误、作用域问题或未导入模块引起。1. 使用前需定义变量;2. 注意名称大小写和拼写;3. 局部变量不可在外部访问,可通过返回值传递;4. 调用函数前应导入相应模块,如from math import sqrt。 在使用 Python 编程时,NameErro…

    2025年12月15日
    000
  • 掌握Pandas中‘object’类型数据的数值分析与智能转换:以计算平均值为例

    本教程详细讲解了在Pandas中处理包含数值信息的’object’类型数据以进行描述性统计分析的方法。针对数据集中常见的数值与单位混合、小数分隔符不一致等问题,文章提供了一套智能转换策略,通过逐列遍历和条件解析,将非标准数值字符串转换为可计算的浮点数,最终实现对这些复杂&#8…

    2025年12月15日
    000
  • 使用Python和IMAPLIB在Gmail中创建HTML邮件草稿的教程

    本教程详细介绍了如何使用%ignore_a_1%的`imaplib`库在gmail中创建可正确渲染的html邮件草稿。核心在于通过设置邮件消息的`content-type`头部为`text/html;charset=utf-8`,确保html内容在gmail草稿中被解析而非显示为纯文本。文章将提供完…

    2025年12月15日
    000
  • 在SLURM中通过Python脚本调用srun的性能影响分析与实践

    本文探讨了在SLURM高性能计算环境中,通过Bash脚本提交一个Python脚本,该Python脚本进而使用`srun`启动大规模并行工作负载的性能考量。研究表明,Python脚本作为中间协调层在启动阶段引入的开销微乎其微,对后续大规模并行计算的运行时性能影响可忽略不计。 SLURM任务编排:Pyt…

    2025年12月15日
    000

发表回复

登录后才能评论
关注微信