高效管理PyADS通知与大规模数据采集

高效管理pyads通知与大规模数据采集

本文旨在深入探讨如何利用Python的PyADS库高效地从倍福PLC获取实时数据,特别是针对高频、大规模数据采集场景。我们将重点介绍如何通过面向对象的方法(类)来管理回调函数的内部状态和累积数据,从而避免使用全局变量,并提供性能优化策略,尤其是在处理同类型大量数据时,通过自定义字节解析结合NumPy实现数据转换的显著加速。

PyADS通知机制与状态管理挑战

PyADS库通过其通知(Notification)机制,允许开发者订阅PLC变量的变化,并在变量值更新时触发回调函数。这对于需要高速、实时获取数据的应用场景(例如,每200毫秒或更快)至关重要。然而,回调函数的特性决定了它无法直接返回数据,这给需要累积大量数据或管理内部状态的应用带来了挑战。传统的解决方案可能倾向于使用全局变量,但这通常会导致代码难以维护、可读性差,并可能引发并发问题。

对于需要采集大量数据(例如,每周期1000个值,累积15个信号共100,000个值)的应用,直接在回调函数中处理所有数据并累积,同时避免全局变量,是核心问题。

基于类的PyADS通知管理与数据累积

解决回调函数状态管理问题的“Pythonic”方法是采用面向对象编程,将PyADS连接、通知设置以及数据处理逻辑封装在一个类中。这样,回调函数就可以作为类的方法,自然地访问和修改类的实例变量,从而实现数据的累积和状态的维护,而无需依赖全局变量。

类结构示例

以下是一个将PyADS通知封装到类中的基本结构示例:

import pyadsimport ctypesimport numpy as npimport time# 假设的PLC连接参数和变量定义PLC_IP = '192.168.1.100'PLC_AMS_NET_ID = '192.168.1.100.1.1'LOCAL_AMS_NET_ID = '192.168.1.50.1.1' # 根据实际情况配置本地AMS Net ID# 示例结构体定义,用于从PLC读取多个DINT数组structure_def = (    ('nVar1', pyads.PLCTYPE_DINT, 1000),    ('nVar2', pyads.PLCTYPE_DINT, 1000),    ('nVar3', pyads.PLCTYPE_DINT, 1000),    # ... 可以根据需要添加更多变量)SIZE_OF_STRUCTURE = pyads.size_of_structure(structure_def)class PlcDataManager:    def __init__(self, plc_ip, plc_ams_net_id, local_ams_net_id):        self.plc = pyads.Connection(plc_ip, plc_ams_net_id, local_ams_net_id)        self.data_buffer = [] # 用于累积接收到的数据        self.notification_handles = {} # 存储通知句柄,便于管理        self.plc_state_run = False # PLC运行状态标记        try:            self.plc.open()            print(f"成功连接到PLC: {plc_ip}")            self._setup_notifications()        except pyads.ADSError as e:            print(f"连接PLC失败: {e}")            self.plc = None # 标记连接失败    def _setup_notifications(self):        """设置所有ADS通知。"""        if not self.plc:            return        # 1. 订阅数据变量通知        data_var_name = 'global.sample_structure' # 假设PLC中存在此结构体变量        attr_data = pyads.NotificationAttrib(SIZE_OF_STRUCTURE)        try:            handle_data = self.plc.add_device_notification(                data_var_name, attr_data, self._on_data_received,                pyads.ADSTransMode.OnChange, 200 # 每200ms检查一次变化            )            self.notification_handles[data_var_name] = handle_data            print(f"已为 '{data_var_name}' 设置数据通知,句柄: {handle_data}")        except pyads.ADSError as e:            print(f"设置数据通知失败: {e}")        # 2. 订阅PLC状态变化通知 (ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE)        # 这对于监控PLC是否处于运行模式非常有用        plc_state_addr = (int("0xF100", 16), int("0x0000", 16)) # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE        attr_state = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))        try:            handle_state = self.plc.add_device_notification(                plc_state_addr, attr_state, self._on_plc_status_change,                pyads.ADSTransMode.OnChange, 500 # 每500ms检查一次状态变化            )            self.notification_handles['plc_status'] = handle_state            print(f"已为PLC状态设置通知,句柄: {handle_state}")        except pyads.ADSError as e:            print(f"设置PLC状态通知失败: {e}")    def _on_data_received(self, handle, name, timestamp, value):        """数据变量通知回调函数。"""        # 使用pyads.dict_from_bytes进行初始转换,适用于混合类型或小批量数据        # values = pyads.dict_from_bytes(value, structure_def)        # self.data_buffer.append(values)        # 对于大规模同类型数据,推荐手动解析以优化性能        # 假设所有变量都是PLCTYPE_DINT,且长度相同        dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)        num_elements_per_var = structure_def[0][2] # 假设所有变量长度相同        total_elements = len(structure_def) * num_elements_per_var        # 将字节数据直接转换为NumPy数组        # 注意:这里假设字节序与系统匹配,通常ADS是小端序        # 如果需要,可以使用 np.frombuffer(value, dtype='<i4') 指定小端序32位整数        raw_data = np.frombuffer(value, dtype=np.int32)         # 将一维NumPy数组重新塑形为字典,方便访问        current_data = {}        for i, (var_name, _, var_len) in enumerate(structure_def):            start_idx = i * var_len            end_idx = start_idx + var_len            current_data[var_name] = raw_data[start_idx:end_idx]        self.data_buffer.append(current_data)        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")    def _on_plc_status_change(self, notification, _):        """PLC状态变化通知回调函数。"""        # pyads.parse_notification用于解析原始通知字节数据        *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)        if value == pyads.ADSSTATE_RUN:            self.plc_state_run = True            print("PLC进入运行模式 (ADSSTATE_RUN)")        else:            self.plc_state_run = False            print(f"PLC退出运行模式,当前状态: {pyads.ADSSTATE_NAMES.get(value, '未知状态')}")            # 可以触发错误处理或重连逻辑    def get_accumulated_data(self):        """获取并清空累积的数据缓冲区。"""        data = self.data_buffer[:]        self.data_buffer.clear()        return data    def close(self):        """关闭PLC连接并移除所有通知。"""        if self.plc:            for name, handle in self.notification_handles.items():                try:                    self.plc.del_device_notification(handle)                    print(f"已移除通知: {name} (句柄: {handle})")                except pyads.ADSError as e:                    print(f"移除通知 '{name}' 失败: {e}")            self.plc.close()            print("PLC连接已关闭。")# 示例使用if __name__ == "__main__":    manager = PlcDataManager(PLC_IP, PLC_AMS_NET_ID, LOCAL_AMS_NET_ID)    if manager.plc: # 确保PLC连接成功        print("等待数据...")        try:            # 模拟主程序循环,等待数据并进行处理            for i in range(20): # 运行约4秒 (20 * 200ms)                time.sleep(0.2) # 模拟主程序等待                if manager.data_buffer:                    collected_data = manager.get_accumulated_data()                    print(f"循环 {i+1}: 收集到 {len(collected_data)} 批数据。")                    # 这里可以对 collected_data 进行进一步处理,例如存储到文件或数据库                    # print(f"最新一批数据示例: {collected_data[-1]['nVar1'][0:5]}") # 打印第一批数据的nVar1前5个元素                else:                    print(f"循环 {i+1}: 暂无新数据。")        except KeyboardInterrupt:            print("n程序被用户中断。")        finally:            manager.close()    else:        print("无法运行,PLC连接失败。")

在上述示例中:

PlcDataManager 类封装了与PLC的连接、通知的设置以及数据缓冲区 self.data_buffer。_on_data_received 方法作为回调函数,它是一个实例方法,可以直接访问 self.data_buffer 并将接收到的数据追加进去。_on_plc_status_change 方法展示了如何使用 pyads.parse_notification 来解析PLC状态变化的通知,这对于监控PLC运行状态非常有用。get_accumulated_data 方法允许主程序在需要时获取并清空累积的数据,实现数据消费与生产的分离。

高性能数据解析与优化

对于大规模、同类型数据的采集,pyads.dict_from_bytes 或默认的变量读取方式可能会因为Python层面的频繁类型转换而成为性能瓶颈。当PLC发送的数据是连续的字节流,且其中包含大量相同类型的元素(如1000个DINT),更高效的方法是:

直接获取原始字节数据: 在 add_device_notification 中,回调函数接收到的 value 参数即为原始字节数据。利用NumPy进行批量转换: Python的NumPy库在处理数值数组方面具有极高的效率。可以将原始字节数据直接转换为NumPy数组,然后根据结构体定义进行切片和重塑。

# 在 _on_data_received 方法中进行优化# ...    def _on_data_received(self, handle, name, timestamp, value):        """数据变量通知回调函数,优化大规模数据解析。"""        # 假设所有变量都是PLCTYPE_DINT,且长度相同        # 例如:structure_def = (('nVar1', pyads.PLCTYPE_DINT, 1000), ...)        # 将字节数据直接转换为NumPy数组        # dtype='<i4' 表示小端序32位带符号整数 (DINT)        # 根据实际PLC的字节序和数据类型调整 dtype        raw_data_np = np.frombuffer(value, dtype='<i4')         current_data = {}        offset = 0        for var_name, var_type, var_len in structure_def:            # 获取每个变量的字节大小            type_size = ctypes.sizeof(var_type)            # 计算当前变量在NumPy数组中的元素范围            start_idx = offset            end_idx = offset + var_len            current_data[var_name] = raw_data_np[start_idx:end_idx]            offset += var_len # 更新偏移量        self.data_buffer.append(current_data)        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")# ...

这种方法通过NumPy的底层C实现进行批量数据转换,可以带来数量级的性能提升,尤其适用于数据记录和大数据量处理场景。

注意事项与总结

装饰器限制: pyads.notification 装饰器不能直接用于类方法。必须通过 plc.add_device_notification 方法显式地将类方法注册为回调函数。字节序与数据类型: 在手动解析字节数据时,务必注意PLC的数据字节序(通常为小端序)以及正确的数据类型映射(例如,PyADS的 PLCTYPE_DINT 对应NumPy的 np.int32 或 np.intc,且需考虑字节序)。错误处理: 监控PLC的状态变化(如 _on_plc_status_change 所示)对于构建健壮的通信应用至关重要,可以在PLC退出运行模式时触发告警或重连机制。资源管理: 确保在程序结束时正确关闭PLC连接并移除所有通知,释放资源。

通过采用类封装和高性能的数据解析策略,开发者可以构建出更加健壮、高效且易于维护的PyADS应用程序,从而有效地处理来自PLC的高速、大规模数据流。

以上就是高效管理PyADS通知与大规模数据采集的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1366956.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
PyADS通知回调函数中高效处理PLC数据:基于类的解决方案
上一篇 2025年12月14日 07:00:18
Keras模型训练与评估精度不一致问题解析与解决方案
下一篇 2025年12月14日 07:00:32

相关推荐

  • Matplotlib 地图中多类型图例的创建与优化

    Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化

    本教程旨在解决matplotlib地图可视化中,如何在一个图例中同时展示颜色块(如区域分类)和自定义标记(如特定兴趣点)的问题。文章详细介绍了当传统`patch`对象无法正确显示标记时,如何利用`matplotlib.lines.line2d`创建标记图例句柄,并将其与颜色块图例句柄合并,从而生成一…

    2026年5月10日 用户投稿
    100
  • Golang JSON序列化:控制敏感字段暴露的最佳实践

    本教程探讨golang中如何高效控制结构体字段在json序列化时的可见性。当需要将包含敏感信息的结构体数组转换为json响应时,通过利用`encoding/json`包提供的结构体标签,特别是`json:”-“`,可以轻松实现对特定字段的忽略,从而避免敏感数据泄露,确保api…

    2026年5月10日
    000
  • 利用海象运算符简化条件赋值:Python教程与最佳实践

    本文旨在探讨Python中海象运算符(:=)在条件赋值场景下的应用。通过对比传统if/else语句与海象运算符,以及条件表达式,分析海象运算符在简化代码、提高可读性方面的优势与局限性。并通过具体示例,展示如何在列表推导式等场景下合理使用海象运算符,同时强调其潜在的复杂性及替代方案,帮助开发者更好地掌…

    2026年5月10日
    000
  • 比特币新手教程 比特币交易平台有哪些

    比特币是一种去中心化的数字货币,基于区块链技术实现点对点交易,具有匿名性、有限发行和不可篡改等特点;新手可通过交易所购买,P2P交易获得比特币,常用平台包括Binance、OKX和Huobi;交易流程包括注册账户、实名认证、绑定支付方式、充值法币并下单购买,可选择市价单或限价单;比特币存储方式有交易…

    2026年5月10日
    000
  • c++中的SFINAE技术是什么_c++模板编程中的SFINAE原理与应用

    SFINAE 是“替换失败不是错误”的原则,指模板实例化时若参数替换导致错误,只要存在其他合法候选,编译器不报错而是继续重载决议。它用于条件启用模板、类型检测等场景,如通过 decltype 或 enable_if 控制函数重载,实现类型特征判断。尽管 C++20 引入 Concepts 简化了部分…

    2026年5月10日
    000
  • Go语言mgo查询构建:深入理解bson.M与日期范围查询的正确实践

    本文旨在解决go语言mgo库中构建复杂查询时,特别是涉及嵌套`bson.m`和日期范围筛选的常见错误。我们将深入剖析`bson.m`的类型特性,解释为何直接索引`interface{}`会导致“invalid operation”错误,并提供一种推荐的、结构清晰的代码重构方案,以确保查询条件能够正确…

    2026年5月10日
    100
  • RichHandler与Rich Progress集成:解决显示冲突的教程

    在使用rich库的`richhandler`进行日志输出并同时使用`progress`组件时,可能会遇到显示错乱或溢出问题。这通常是由于为`richhandler`和`progress`分别创建了独立的`console`实例导致的。解决方案是确保日志处理器和进度条组件共享同一个`console`实例…

    2026年5月10日
    000
  • Golang goroutine与channel调试技巧

    使用go run -race检测数据竞争,结合runtime.NumGoroutine监控协程数量,通过pprof分析阻塞调用栈,利用select超时避免永久阻塞,有效排查goroutine泄漏、死锁和数据竞争问题。 Go语言的goroutine和channel是并发编程的核心,但它们也带来了调试上…

    2026年5月10日
    000
  • 《魔兽世界》将于6月11日开启国服回归技术测试

    《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试

    《%ign%ignore_a_1%re_a_1%》官方宣布,将于6月11日开启国服回归技术测试,时间为7天,并称可以在6月内正式开服,玩家们可以访问官网下载战网客户端并预下载“巫妖王之怒”客户端,技术测试详情见下图。 WordAi WordAI是一个AI驱动的内容重写平台 53 查看详情 以上就是《…

    2026年5月10日 用户投稿
    200
  • 使用 Jupyter Notebook 进行探索性数据分析

    Jupyter Notebook通过单元格实现代码与Markdown结合,支持数据导入(pandas)、清洗(fillna)、探索(matplotlib/seaborn可视化)、统计分析(describe/corr)和特征工程,便于记录与分享分析过程。 Jupyter Notebook 是进行探索性…

    2026年5月10日
    000
  • 如何在HTML中插入表单元素_HTML表单控件与输入类型使用指南

    HTML表单通过标签构建,包含action和method属性定义数据提交目标与方式,常用input类型如text、password、email等适配不同输入需求,配合label、required、placeholder提升可用性,结合textarea、select、button等控件实现完整交互,是…

    2026年5月10日
    000
  • 创建指定大小并填充特定数据的Golang文件教程

    本文将介绍如何使用Golang创建一个指定大小的文件,并用特定数据填充它。我们将使用 `os` 包提供的函数来创建和截断文件,从而实现快速生成大文件的目的。示例代码展示了如何创建一个10MB的文件,并将其填充为全零数据。掌握这些方法,可以方便地在例如日志系统或磁盘队列等场景中,预先创建测试文件或初始…

    2026年5月10日
    000
  • Python命令怎样使用profile分析脚本性能 Python命令性能分析的基础教程

    使用Python的cProfile模块分析脚本性能最直接的方式是通过命令行执行python -m cProfile your_script.py,它会输出每个函数的调用次数、总耗时、累积耗时等关键指标,帮助定位性能瓶颈;为进一步分析,可将结果保存为文件python -m cProfile -o ou…

    2026年5月10日
    000
  • 如何插入查询结果数据_SQL插入Select查询结果方法

    如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法

    使用INSERT INTO…SELECT语句可高效插入数据,通过NOT EXISTS、LEFT JOIN、MERGE语句或唯一约束避免重复;表结构不一致时可通过别名、类型转换、默认值或计算字段处理;结合存储过程可提升可维护性,支持参数化与动态SQL。 将查询结果数据插入到另一个表中,可以…

    2026年5月10日 用户投稿
    000
  • 使用 WebCodecs VideoDecoder 实现精确逐帧回退

    本文档旨在解决在使用 WebCodecs VideoDecoder 进行视频解码时,实现精确逐帧回退的问题。通过比较帧的时间戳与目标帧的时间戳,可以避免渲染中间帧,从而提高用户体验。本文将提供详细的解决方案和示例代码,帮助开发者实现精确的视频帧控制。 在使用 WebCodecs VideoDecod…

    2026年5月10日
    000
  • Debian Copilot的社区活跃度如何

    debian copilot是codeberg社区维护的ai助手,旨在为debian用户提供服务。尽管搜索结果中没有直接提供关于debian copilot社区支持活跃度的具体数据,但我们可以通过debian社区的整体活跃度和特点来推断其活跃性。 Debian社区的一般情况: Debian拥有详尽的…

    2026年5月10日
    000
  • Discord.py 交互按钮超时与持久化解决方案

    本教程旨在解决Discord.py中交互按钮在一段时间后出现“This Interaction Failed”错误的问题。我们将深入探讨视图(View)的超时机制,并提供通过正确设置timeout参数以及利用bot.add_view()方法实现按钮持久化的具体方案,确保您的机器人交互功能稳定可靠,即…

    2026年5月10日
    000
  • Python递归函数追踪与性能考量:以序列打印为例

    本文深入探讨了Python中一种递归打印序列元素的方法,并着重演示了如何通过引入缩进参数来有效追踪递归函数的执行流程和参数变化。通过实际代码示例,文章揭示了递归调用可能带来的潜在性能开销,特别是对调用栈空间的需求,以及Python默认递归深度限制可能导致的错误,为读者提供了理解和优化递归算法的实用见…

    2026年5月10日
    000
  • python中zip函数详解 python多序列压缩zip函数应用场景

    zip函数的应用场景包括:1) 同时遍历多个序列,2) 合并多个列表的数据,3) 数据分析和科学计算中的元素运算,4) 处理csv文件,5) 性能优化。zip函数是一个强大的工具,能够简化代码并提高处理多个序列时的效率。 在Python中,zip函数是一个非常有用的工具,它能够将多个可迭代对象打包成…

    2026年5月10日
    000
  • JavaScript 动态菜单点击高亮效果实现教程

    本教程详细介绍了如何使用 JavaScript 实现动态菜单的点击高亮功能。通过事件委托和状态管理,当用户点击菜单项时,被点击项会高亮显示(绿色),同时其他菜单项恢复默认样式(白色)。这种方法避免了不必要的DOM操作,提高了性能和代码可维护性,确保了无论点击方向如何,功能都能稳定运行。 动态菜单高亮…

    2026年5月10日
    200

发表回复

登录后才能评论
关注微信