
本教程将指导您如何在laravel websockets中定制连接的生命周期事件,包括连接的打开与关闭。通过扩展默认的websocket处理器,我们将演示如何捕获并关联客户端的业务上下文(如用户id、订单id),从而在连接断开时执行特定的业务逻辑,例如自动解锁正在处理的订单,实现对应用资源的精确状态管理。
引言:定制WebSocket连接的必要性
在实时应用开发中,WebSocket连接不仅仅是数据传输的通道,它更代表了客户端与服务器之间的一种持续性会话。很多业务场景需要我们能够感知并响应这些会话的生命周期事件,例如:
资源锁定与解锁:当用户打开一个订单页面进行编辑时,锁定该订单以防止其他用户同时修改;当用户关闭页面或断开连接时,自动解锁订单。用户在线状态:实时显示用户的在线或离线状态。协作编辑:跟踪文档编辑者的连接状态,实现实时协作。
Laravel WebSockets 包(Beyondcode 的 Pusher 替代方案)提供了强大的功能,但其默认处理器可能无法满足所有复杂的业务需求。为了实现上述场景,我们需要扩展其核心处理器,介入连接的打开、关闭及消息处理过程,并注入自定义的业务逻辑。
理解WebSocket处理器
Laravel WebSockets 的核心是 BeyondCodeLaravelWebSocketsWebSocketsWebSocketHandler 接口,它定义了处理WebSocket连接生命周期的方法:
onOpen(ConnectionInterface $connection, RequestInterface $request, $appId): 当新的WebSocket连接建立时调用。onClose(ConnectionInterface $connection): 当WebSocket连接关闭时调用。onMessage(ConnectionInterface $connection, MessageInterface $msg): 当收到来自客户端的消息时调用。onError(ConnectionInterface $connection, Exception $e): 当连接发生错误时调用。onPong(ConnectionInterface $connection, MessageInterface $msg): 当收到客户端的 Pong 消息时调用。
通常,我们不是直接实现 WebSocketHandler 接口,而是继承 BeyondCodeLaravelWebSocketsWebSocketsPusherHandler。PusherHandler 已经实现了 Pusher 协议的诸多细节,我们可以在此基础上重写或增强特定方法,以集成我们的业务逻辑。
创建自定义WebSocket处理器
为了定制连接行为,我们首先需要创建一个自定义的处理器类。我们将使用 SplObjectStorage 来存储与每个连接关联的业务上下文数据,因为 ConnectionInterface 对象是唯一的且可以作为 SplObjectStorage 的键。
首先,在 app/WebSockets 目录下创建 CustomWebSocketHandler.php 文件:
// app/WebSockets/CustomWebSocketHandler.phpconnections = new SplObjectStorage(); } /** * 当新的WebSocket连接建立时调用。 * * @param ConnectionInterface $connection * @param PsrHttpMessageRequestInterface $request * @param string $appId * @return void */ public function onOpen(ConnectionInterface $connection, PsrHttpMessageRequestInterface $request, $appId) { // 调用父类的onOpen方法,确保Pusher协议的正常初始化 parent::onOpen($connection, $request, $appId); Log::info("Connection opened: {$connection->resourceId}"); // 尝试从请求中获取业务上下文,例如用户ID或订单ID // 客户端可以通过WebSocket URL的查询参数传递这些信息 $queryParams = $request->getQueryParams(); $userId = $queryParams['user_id'] ?? null; $orderId = $queryParams['order_id'] ?? null; // 存储连接与业务上下文 $this->connections->attach($connection, [ 'resource_id' => $connection->resourceId, 'user_id' => $userId, 'order_id' => $orderId, 'connected_at' => now(), 'channels' => [], // 用于存储该连接订阅的频道 ]); if ($orderId) { Log::info("Order {$orderId} is now being processed by user {$userId} via connection {$connection->resourceId}"); // 触发事件以锁定订单 event(new AppEventsOrderLocked($orderId, $userId, $connection->resourceId)); } } /** * 当收到客户端消息时调用。 * * @param ConnectionInterface $connection * @param RatchetMessageComponentMessageInterface $msg * @return void */ public function onMessage(ConnectionInterface $connection, RatchetMessageComponentMessageInterface $msg) { parent::onMessage($connection, $msg); $payload = json_decode($msg->getPayload()); // 进一步处理消息,例如当客户端订阅特定频道时更新上下文 if (isset($payload->event) && $payload->event === 'pusher:subscribe' && isset($payload->data->channel)) { $channelName = $payload->data->channel; $context = $this->connections->offsetGet($connection); $context['channels'][] = $channelName; $this->connections->offsetSet($connection, $context); // 更新存储的上下文 Log::info("Connection {$connection->resourceId} subscribed to channel: {$channelName}"); // 如果频道名包含订单ID,可以进一步提取并更新 if (preg_match('/^private-order.(d+)$/', $channelName, $matches)) { $orderId = $matches[1]; if ($context['order_id'] !== $orderId) { Log::warning("Connection {$connection->resourceId} subscribed to order {$orderId}, but initial order was {$context['order_id']}"); // 可以在这里更新或处理冲突 } } } } /** * 当WebSocket连接关闭时调用。 * * @param ConnectionInterface $connection * @return void */ public function onClose(ConnectionInterface $connection) { Log::info("Connection closed: {$connection->resourceId}"); // 确保该连接存在于我们的存储中 if ($this->connections->contains($connection)) { $context = $this->connections->offsetGet($connection); $userId = $context['user_id']; $orderId = $context['order_id']; if ($orderId) { Log::info("Order {$orderId} is no longer processed by user {$userId} via connection {$connection->resourceId}"); // 触发事件以解锁订单 event(new AppEventsOrderUnlocked($orderId, $userId, $connection->resourceId)); } // 清理连接上下文 $this->connections->detach($connection); } // 调用父类的onClose方法 parent::onClose($connection); } /** * 当连接发生错误时调用。 * * @param ConnectionInterface $connection * @param Exception $e * @return void */ public function onError(ConnectionInterface $connection, Exception $e) { Log::error("Connection error for {$connection->resourceId}: " . $e->getMessage()); parent::onError($connection, $e); }}
代码说明:
SplObjectStorage $connections: 这是关键,用于存储每个 ConnectionInterface 对象及其关联的业务数据。onOpen 方法:在调用 parent::onOpen 之后,我们从 RequestInterface $request 的查询参数中尝试提取 user_id 和 order_id。将这些信息与 connection 对象一起存储到 $this->connections 中。如果成功获取到 orderId,则触发一个 OrderLocked 事件,通知应用层锁定该订单。onMessage 方法 (可选但推荐):此方法用于处理客户端发送的所有消息。在这里,我们特别关注 pusher:subscribe 事件。当客户端订阅一个频道时,我们可以解析频道名称(例如 private-order.123),从中提取更具体的业务ID,并更新 SplObjectStorage 中该连接的上下文信息。这在初始 onOpen 无法获得所有上下文时非常有用。onClose 方法:在连接关闭时,我们通过 ConnectionInterface $connection 从 $this->connections 中检索之前存储的业务上下文。根据上下文中的 order_id,触发一个 OrderUnlocked 事件,通知应用层解锁订单。最后,从 $this->connections 中移除该连接的上下文,防止内存泄漏。onError 方法: 记录错误信息,以便调试。
定义业务事件
为了解耦 WebSocket 处理器与具体的业务逻辑,我们推荐使用 Laravel 事件。
OrderLocked 事件:
// app/Events/OrderLocked.phporderId = $orderId; $this->userId = $userId; $this->connectionId = $connectionId; }}
OrderUnlocked 事件:
// app/Events/OrderUnlocked.phporderId = $orderId; $this->userId = $userId; $this->connectionId = $connectionId; }}
然后,您可以在 app/Listeners 中创建相应的监听器来处理这些事件,例如更新数据库中的订单状态。
注册自定义处理器
最后一步是告诉 Laravel WebSockets 使用您的自定义处理器。修改 config/websockets.php 文件:
// config/websockets.phpreturn [ // ... 其他配置 'handler' => AppWebSocketsCustomWebSocketHandler::class, // ... 其他配置];
客户端实现
为了让 onOpen 方法能够获取到 user_id 和 order_id,客户端在建立 WebSocket 连接时需要将这些信息作为查询参数传递。
使用 Laravel Echo 和 JavaScript:
import Echo from 'laravel-echo';window.Pusher = require('pusher-js');// 假设您在后端视图中将这些ID传递给前端const currentUserId = @json(auth()->id());const currentOrderId = @json($order->id ?? null); // 如果在订单页面window.Echo = new Echo({ broadcaster: 'pusher', key: process.env.MIX_PUSHER_APP_KEY, wsHost: window.location.hostname, wsPort: 6001
以上就是在Laravel WebSockets中实现连接生命周期管理与业务逻辑绑定的详细内容,更多请关注php中文网其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1339205.html
微信扫一扫
支付宝扫一扫