
本文旨在解决在使用 WebSocket 实现视频帧预测结果广播时遇到的客户端无法接收数据或接收延迟的问题。通过分析问题代码,并对比 websockets.broadcast() 和 asyncio.wait() 的行为,提供了一种基于 asyncio.wait() 的解决方案,并解释了两种方法之间的差异,帮助开发者更有效地构建实时数据推送服务。
问题分析
原始代码中使用 websockets.broadcast(clients, result) 在 while True 循环中广播预测结果。然而,多个客户端连接时,发现只有第一个客户端能正常接收数据,后续客户端则无法接收,或者需要在服务端程序停止后才能接收到数据。这表明 websockets.broadcast() 可能存在阻塞问题,导致服务端无法及时处理新的客户端连接和数据发送。
解决方案:使用 asyncio.wait()
将 websockets.broadcast(clients, result) 替换为 await asyncio.wait([ws.send(result) for ws in clients]) 解决了该问题。
修改后的服务端代码:
import websocketsimport cv2import asyncioimport timedef predict(image): # 替换为你的预测模型 return "test"async def echo(websocket, path): global vidCap, i try: while True: ret, image = vidCap.read() if ret: start = time.time() result = predict(image) # 使用 asyncio.wait 进行广播 await asyncio.wait([ws.send(result) for ws in clients]) end = time.time() print("exec time:%f s" % (end - start)) else: # 视频读取结束或发生错误,退出循环 break await asyncio.sleep(0) # 释放事件循环控制权,避免CPU占用过高 except websockets.exceptions.ConnectionClosedError: print("Client disconnected unexpectedly.") except Exception as e: print(f"An error occurred: {e}") finally: clients.remove(websocket)async def handler(websocket, path): clients.add(websocket) try: await echo(websocket, path) finally: if websocket in clients: clients.remove(websocket)async def serve(): start_server = await websockets.serve(handler, "localhost", 8765) await start_server.wait_closed()if __name__ == '__main__': vidCap = cv2.VideoCapture('rtsp://xxx.xxx.xx') #rtsp or video clients = set() asyncio.run(serve())
客户端代码(保持不变):
import websocketsimport asyncioimport timeasync def get_result(uri): async with websockets.connect(uri) as websocket: while(True): try: start = time.time() recv_text = await websocket.recv() print(recv_text) end = time.time() print("exec:%f s" % (end - start)) except websockets.exceptions.ConnectionClosedError: print("Server disconnected.") break # 退出循环 except Exception as e: print(f"An error occurred: {e}") breakif __name__ == '__main__': asyncio.run(get_result("ws://127.0.0.1:8765/ws"))
关键修改说明:
await asyncio.wait([ws.send(result) for ws in clients]): 这行代码使用 asyncio.wait() 并发地向所有客户端发送消息。asyncio.wait() 接受一个 awaitable 对象的可迭代对象,并等待所有 awaitable 对象完成。 [ws.send(result) for ws in clients] 创建一个包含所有客户端发送消息任务的列表。
await asyncio.sleep(0): 在 while True 循环中添加 await asyncio.sleep(0) 可以将控制权交还给事件循环,避免 CPU 占用率过高。
异常处理: 在 echo 和 get_result 函数中添加了 try…except 块来处理 websockets.exceptions.ConnectionClosedError 和其他潜在的异常,使得程序更加健壮。
客户端断开处理: 在 handler 函数的 finally 块中,确保在客户端断开连接时,将其从 clients 集合中移除。
websockets.broadcast() vs asyncio.wait()
websockets.broadcast(): websockets.broadcast() 是一个方便的函数,用于将消息广播到所有连接的客户端。然而,它可能以阻塞的方式工作,这意味着它会逐个发送消息,并且在完成所有发送之前不会释放控制权。在快速循环中,这会导致服务端无法及时处理新的连接或响应其他事件。
asyncio.wait(): asyncio.wait() 允许并发地执行多个 awaitable 对象。通过将每个客户端的 ws.send(result) 操作作为一个独立的 awaitable 对象,asyncio.wait() 能够并行地发送消息,从而避免阻塞主事件循环。
简而言之,asyncio.wait() 提供了更细粒度的控制,允许异步地执行发送操作,从而提高了服务端的并发性能和响应能力。
注意事项
视频源: 确保视频源 ‘rtsp://xxx.xxx.xx’ 可用,并根据实际情况进行替换。预测模型: predict(image) 函数需要替换为实际的预测模型。性能优化: 对于高并发场景,需要进一步优化预测模型的性能,并考虑使用更高效的数据编码方式(例如,protobuf)来减少网络传输开销。错误处理: 完善错误处理机制,例如,在客户端断开连接时,服务端应该能够正确地处理异常,并清理资源。
总结
通过使用 asyncio.wait(),可以有效地解决 WebSocket 广播中的阻塞问题,实现高并发、低延迟的实时数据推送服务。理解 websockets.broadcast() 和 asyncio.wait() 的差异,有助于选择合适的广播策略,并优化 WebSocket 应用的性能。同时,良好的错误处理和资源管理也是构建健壮的 WebSocket 应用的关键。
以上就是使用 asyncio.wait 实现 WebSocket 广播:解决阻塞问题的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1368164.html
微信扫一扫
支付宝扫一扫