
本文旨在深入探讨如何利用Python的PyADS库高效地从倍福PLC获取实时数据,特别是针对高频、大规模数据采集场景。我们将重点介绍如何通过面向对象的方法(类)来管理回调函数的内部状态和累积数据,从而避免使用全局变量,并提供性能优化策略,尤其是在处理同类型大量数据时,通过自定义字节解析结合NumPy实现数据转换的显著加速。
PyADS通知机制与状态管理挑战
PyADS库通过其通知(Notification)机制,允许开发者订阅PLC变量的变化,并在变量值更新时触发回调函数。这对于需要高速、实时获取数据的应用场景(例如,每200毫秒或更快)至关重要。然而,回调函数的特性决定了它无法直接返回数据,这给需要累积大量数据或管理内部状态的应用带来了挑战。传统的解决方案可能倾向于使用全局变量,但这通常会导致代码难以维护、可读性差,并可能引发并发问题。
对于需要采集大量数据(例如,每周期1000个值,累积15个信号共100,000个值)的应用,直接在回调函数中处理所有数据并累积,同时避免全局变量,是核心问题。
基于类的PyADS通知管理与数据累积
解决回调函数状态管理问题的“Pythonic”方法是采用面向对象编程,将PyADS连接、通知设置以及数据处理逻辑封装在一个类中。这样,回调函数就可以作为类的方法,自然地访问和修改类的实例变量,从而实现数据的累积和状态的维护,而无需依赖全局变量。
类结构示例
以下是一个将PyADS通知封装到类中的基本结构示例:
import pyadsimport ctypesimport numpy as npimport time# 假设的PLC连接参数和变量定义PLC_IP = '192.168.1.100'PLC_AMS_NET_ID = '192.168.1.100.1.1'LOCAL_AMS_NET_ID = '192.168.1.50.1.1' # 根据实际情况配置本地AMS Net ID# 示例结构体定义,用于从PLC读取多个DINT数组structure_def = ( ('nVar1', pyads.PLCTYPE_DINT, 1000), ('nVar2', pyads.PLCTYPE_DINT, 1000), ('nVar3', pyads.PLCTYPE_DINT, 1000), # ... 可以根据需要添加更多变量)SIZE_OF_STRUCTURE = pyads.size_of_structure(structure_def)class PlcDataManager: def __init__(self, plc_ip, plc_ams_net_id, local_ams_net_id): self.plc = pyads.Connection(plc_ip, plc_ams_net_id, local_ams_net_id) self.data_buffer = [] # 用于累积接收到的数据 self.notification_handles = {} # 存储通知句柄,便于管理 self.plc_state_run = False # PLC运行状态标记 try: self.plc.open() print(f"成功连接到PLC: {plc_ip}") self._setup_notifications() except pyads.ADSError as e: print(f"连接PLC失败: {e}") self.plc = None # 标记连接失败 def _setup_notifications(self): """设置所有ADS通知。""" if not self.plc: return # 1. 订阅数据变量通知 data_var_name = 'global.sample_structure' # 假设PLC中存在此结构体变量 attr_data = pyads.NotificationAttrib(SIZE_OF_STRUCTURE) try: handle_data = self.plc.add_device_notification( data_var_name, attr_data, self._on_data_received, pyads.ADSTransMode.OnChange, 200 # 每200ms检查一次变化 ) self.notification_handles[data_var_name] = handle_data print(f"已为 '{data_var_name}' 设置数据通知,句柄: {handle_data}") except pyads.ADSError as e: print(f"设置数据通知失败: {e}") # 2. 订阅PLC状态变化通知 (ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE) # 这对于监控PLC是否处于运行模式非常有用 plc_state_addr = (int("0xF100", 16), int("0x0000", 16)) # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE attr_state = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT)) try: handle_state = self.plc.add_device_notification( plc_state_addr, attr_state, self._on_plc_status_change, pyads.ADSTransMode.OnChange, 500 # 每500ms检查一次状态变化 ) self.notification_handles['plc_status'] = handle_state print(f"已为PLC状态设置通知,句柄: {handle_state}") except pyads.ADSError as e: print(f"设置PLC状态通知失败: {e}") def _on_data_received(self, handle, name, timestamp, value): """数据变量通知回调函数。""" # 使用pyads.dict_from_bytes进行初始转换,适用于混合类型或小批量数据 # values = pyads.dict_from_bytes(value, structure_def) # self.data_buffer.append(values) # 对于大规模同类型数据,推荐手动解析以优化性能 # 假设所有变量都是PLCTYPE_DINT,且长度相同 dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT) num_elements_per_var = structure_def[0][2] # 假设所有变量长度相同 total_elements = len(structure_def) * num_elements_per_var # 将字节数据直接转换为NumPy数组 # 注意:这里假设字节序与系统匹配,通常ADS是小端序 # 如果需要,可以使用 np.frombuffer(value, dtype='<i4') 指定小端序32位整数 raw_data = np.frombuffer(value, dtype=np.int32) # 将一维NumPy数组重新塑形为字典,方便访问 current_data = {} for i, (var_name, _, var_len) in enumerate(structure_def): start_idx = i * var_len end_idx = start_idx + var_len current_data[var_name] = raw_data[start_idx:end_idx] self.data_buffer.append(current_data) # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}") def _on_plc_status_change(self, notification, _): """PLC状态变化通知回调函数。""" # pyads.parse_notification用于解析原始通知字节数据 *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT) if value == pyads.ADSSTATE_RUN: self.plc_state_run = True print("PLC进入运行模式 (ADSSTATE_RUN)") else: self.plc_state_run = False print(f"PLC退出运行模式,当前状态: {pyads.ADSSTATE_NAMES.get(value, '未知状态')}") # 可以触发错误处理或重连逻辑 def get_accumulated_data(self): """获取并清空累积的数据缓冲区。""" data = self.data_buffer[:] self.data_buffer.clear() return data def close(self): """关闭PLC连接并移除所有通知。""" if self.plc: for name, handle in self.notification_handles.items(): try: self.plc.del_device_notification(handle) print(f"已移除通知: {name} (句柄: {handle})") except pyads.ADSError as e: print(f"移除通知 '{name}' 失败: {e}") self.plc.close() print("PLC连接已关闭。")# 示例使用if __name__ == "__main__": manager = PlcDataManager(PLC_IP, PLC_AMS_NET_ID, LOCAL_AMS_NET_ID) if manager.plc: # 确保PLC连接成功 print("等待数据...") try: # 模拟主程序循环,等待数据并进行处理 for i in range(20): # 运行约4秒 (20 * 200ms) time.sleep(0.2) # 模拟主程序等待 if manager.data_buffer: collected_data = manager.get_accumulated_data() print(f"循环 {i+1}: 收集到 {len(collected_data)} 批数据。") # 这里可以对 collected_data 进行进一步处理,例如存储到文件或数据库 # print(f"最新一批数据示例: {collected_data[-1]['nVar1'][0:5]}") # 打印第一批数据的nVar1前5个元素 else: print(f"循环 {i+1}: 暂无新数据。") except KeyboardInterrupt: print("n程序被用户中断。") finally: manager.close() else: print("无法运行,PLC连接失败。")
在上述示例中:
PlcDataManager 类封装了与PLC的连接、通知的设置以及数据缓冲区 self.data_buffer。_on_data_received 方法作为回调函数,它是一个实例方法,可以直接访问 self.data_buffer 并将接收到的数据追加进去。_on_plc_status_change 方法展示了如何使用 pyads.parse_notification 来解析PLC状态变化的通知,这对于监控PLC运行状态非常有用。get_accumulated_data 方法允许主程序在需要时获取并清空累积的数据,实现数据消费与生产的分离。
高性能数据解析与优化
对于大规模、同类型数据的采集,pyads.dict_from_bytes 或默认的变量读取方式可能会因为Python层面的频繁类型转换而成为性能瓶颈。当PLC发送的数据是连续的字节流,且其中包含大量相同类型的元素(如1000个DINT),更高效的方法是:
直接获取原始字节数据: 在 add_device_notification 中,回调函数接收到的 value 参数即为原始字节数据。利用NumPy进行批量转换: Python的NumPy库在处理数值数组方面具有极高的效率。可以将原始字节数据直接转换为NumPy数组,然后根据结构体定义进行切片和重塑。
# 在 _on_data_received 方法中进行优化# ... def _on_data_received(self, handle, name, timestamp, value): """数据变量通知回调函数,优化大规模数据解析。""" # 假设所有变量都是PLCTYPE_DINT,且长度相同 # 例如:structure_def = (('nVar1', pyads.PLCTYPE_DINT, 1000), ...) # 将字节数据直接转换为NumPy数组 # dtype='<i4' 表示小端序32位带符号整数 (DINT) # 根据实际PLC的字节序和数据类型调整 dtype raw_data_np = np.frombuffer(value, dtype='<i4') current_data = {} offset = 0 for var_name, var_type, var_len in structure_def: # 获取每个变量的字节大小 type_size = ctypes.sizeof(var_type) # 计算当前变量在NumPy数组中的元素范围 start_idx = offset end_idx = offset + var_len current_data[var_name] = raw_data_np[start_idx:end_idx] offset += var_len # 更新偏移量 self.data_buffer.append(current_data) # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")# ...
这种方法通过NumPy的底层C实现进行批量数据转换,可以带来数量级的性能提升,尤其适用于数据记录和大数据量处理场景。
注意事项与总结
装饰器限制: pyads.notification 装饰器不能直接用于类方法。必须通过 plc.add_device_notification 方法显式地将类方法注册为回调函数。字节序与数据类型: 在手动解析字节数据时,务必注意PLC的数据字节序(通常为小端序)以及正确的数据类型映射(例如,PyADS的 PLCTYPE_DINT 对应NumPy的 np.int32 或 np.intc,且需考虑字节序)。错误处理: 监控PLC的状态变化(如 _on_plc_status_change 所示)对于构建健壮的通信应用至关重要,可以在PLC退出运行模式时触发告警或重连机制。资源管理: 确保在程序结束时正确关闭PLC连接并移除所有通知,释放资源。
通过采用类封装和高性能的数据解析策略,开发者可以构建出更加健壮、高效且易于维护的PyADS应用程序,从而有效地处理来自PLC的高速、大规模数据流。
以上就是高效管理PyADS通知与大规模数据采集的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1366956.html
微信扫一扫
支付宝扫一扫