diff --git a/src/addon/aikefu/api/controller/WebSocket.php b/src/addon/aikefu/api/controller/WebSocket.php index 8bf1e47d9..22f229a42 100644 --- a/src/addon/aikefu/api/controller/WebSocket.php +++ b/src/addon/aikefu/api/controller/WebSocket.php @@ -63,6 +63,12 @@ class WebSocket extends WebSocketBase // 解析消息 try { + // 检查数据库连接 + $connStatus = $this->checkDatabaseConnection(); + if (!$connStatus) { + throw new \Exception('Database connection failed'); + } + $data = json_decode($message, true); if (json_last_error() !== JSON_ERROR_NONE) { throw new \Exception('Invalid JSON format'); @@ -1478,12 +1484,13 @@ class WebSocket extends WebSocketBase $requestData['conversation_id'] = $conversation_id; // ----- 只有会话ID的情况下,下列情况才添加相关的数据 - // 如果有files字段,添加到请求中 - if (!empty($origin_data['files']) && count($origin_data['files']) > 0) { - $requestData['files'] = $origin_data['files']; - } } + // 如果有files字段,添加到请求中 + if (!empty($origin_data['files'])) { + $requestData['files'] = $origin_data['files'] ?? []; + } + return $requestData; } diff --git a/src/addon/aikefu/docs/ws_multi_addon_test.html b/src/addon/aikefu/docs/ws_multi_addon_test.html index 549850c7b..793cddcc8 100644 --- a/src/addon/aikefu/docs/ws_multi_addon_test.html +++ b/src/addon/aikefu/docs/ws_multi_addon_test.html @@ -195,23 +195,16 @@ title: 'aikefu Addon', path: '/ws/aikefu', fullPath: '', - status: 'disconnected', // disconnected, connecting, connected, error + status: 'disconnected', // disconnected, connecting, connected, error, reconnecting statusText: '未连接', statusClass: 'disconnected', messages: [], inputMessage: '', - conversation_id: '' - }, - { - name: 'default', - title: '默认路径', - path: '/ws', - status: 'disconnected', - statusText: '未连接', - statusClass: 'disconnected', - messages: [], - inputMessage: '', - conversation_id: '' + conversation_id: '', + reconnectAttempts: 0, + maxReconnectAttempts: 5, + reconnectDelay: 1000, + reconnectTimer: null } ]); @@ -245,7 +238,8 @@ addon.status = status; addon.statusText = statusText; addon.statusClass = status === 'connected' ? 'connected' : - status === 'error' ? 'error' : 'disconnected'; + status === 'error' ? 'error' : + status === 'reconnecting' ? 'error' : 'disconnected'; } }; @@ -471,9 +465,12 @@ } }; - wsConnections[name].onclose = () => { + wsConnections[name].onclose = (event) => { updateAddonStatus(name, 'disconnected', '已断开'); - addMessage(name, '系统', 'WebSocket连接已断开'); + addMessage(name, '系统', 'WebSocket连接已断开' + (event.code ? ` (代码: ${event.code})` : '')); + + // 尝试重连 + reconnect(name); }; wsConnections[name].onerror = (error) => { @@ -488,6 +485,130 @@ } }; + // 重连逻辑 + const reconnect = (name) => { + const addon = addons.find(a => a.name === name); + if (!addon) return; + + // 清除之前的重连计时器 + if (addon.reconnectTimer) { + clearTimeout(addon.reconnectTimer); + addon.reconnectTimer = null; + } + + // 检查重连次数 + if (addon.reconnectAttempts >= addon.maxReconnectAttempts) { + updateAddonStatus(name, 'error', '重连失败,已达到最大尝试次数'); + addMessage(name, '系统', '重连失败,已达到最大尝试次数'); + addon.reconnectAttempts = 0; + return; + } + + // 计算重连延迟(指数退避) + const delay = addon.reconnectDelay * Math.pow(2, addon.reconnectAttempts); + + addon.reconnectAttempts++; + updateAddonStatus(name, 'reconnecting', `尝试重连中... (${addon.reconnectAttempts}/${addon.maxReconnectAttempts})`); + addMessage(name, '系统', `将在 ${delay/1000} 秒后尝试重连...`); + + // 设置重连计时器 + addon.reconnectTimer = setTimeout(() => { + try { + const url = `${websocketUrl.value}${addon.path}`; + wsConnections[name] = new WebSocket(url); + + wsConnections[name].onopen = () => { + updateAddonStatus(name, 'connected', '已重新连接'); + addMessage(name, '系统', 'WebSocket连接已重新建立'); + addon.reconnectAttempts = 0; + addon.reconnectTimer = null; + + // 重新发送认证信息 + const authMsg = JSON.stringify({ + action: 'auth', + uniacid: 1, + user_id: 1, + token: 'test_token' + }); + wsConnections[name].send(authMsg); + }; + + wsConnections[name].onmessage = (event) => { + try { + // 尝试解析JSON消息 + const message = JSON.parse(event.data); + + // 处理文件上传成功响应 + if (message.type === 'upload_success') { + addMessage(name, '服务器', event.data); + + // 更新上传文件列表 + uploadFiles.push({ + file_id: message.file_id, + file_name: message.file_name, + file_size: message.file_size, + file_extension: message.file_extension, + file_mime_type: message.file_mime_type, + file_created_by: message.file_created_by, + file_created_at: message.file_created_at, + file_url: message.file_url + }); + console.log('上传文件列表更新:', uploadFiles); + } else if (message.type === 'file_preview_success') { + addMessage(name, '服务器', `文件预览成功: ${message.file_id}\n文件URL: ${message.file_url}`); + console.log('文件预览成功:', message); + + // 打开文件预览 + window.open(message.file_url, '_blank'); + } else if (message.type === 'error' && message.code === 403 && message.error_type === 'file_access_denied') { + addMessage(name, '服务器', `文件访问被拒绝: ${message.message}`); + console.log('文件访问被拒绝:', message); + } else if (message.type === 'error' && message.code === 404 && message.error_type === 'file_not_found') { + addMessage(name, '服务器', `文件未找到: ${message.message}`); + console.log('文件未找到:', message); + } else if (message.type === 'error' && message.code === 400 && message.error_type === 'invalid_params') { + addMessage(name, '服务器', `参数输入异常: ${message.message}`); + console.log('参数输入异常:', message); + } else if (message.type === 'error' && message.code === 400 && message.error_type === 'unsupported_preview') { + addMessage(name, '服务器', `该文件不支持预览: ${message.message}`); + console.log('该文件不支持预览:', message); + } else { + // 其他消息,直接添加到聊天区域 + addMessage(name, '服务器', event.data); + } + } catch (e) { + // 不是JSON消息,直接添加到聊天区域 + addMessage(name, '服务器', event.data); + } + }; + + wsConnections[name].onclose = (event) => { + updateAddonStatus(name, 'disconnected', '已断开'); + addMessage(name, '系统', 'WebSocket连接已断开' + (event.code ? ` (代码: ${event.code})` : '')); + + // 尝试重连 + reconnect(name); + }; + + wsConnections[name].onerror = (error) => { + updateAddonStatus(name, 'error', '连接错误'); + console.log('WebSocket连接错误: ', error); + addMessage(name, '系统', 'WebSocket连接错误: ' + (error.message || 'NS_ERROR_WEBSOCKET_CONNECTION_REFUSED')); + + // 尝试重连 + reconnect(name); + }; + } catch (e) { + console.log('WebSocket重连失败: ', e); + updateAddonStatus(name, 'error', '重连失败'); + addMessage(name, '系统', '无法连接到WebSocket服务器: ' + e.message); + + // 继续尝试重连 + reconnect(name); + } + }, delay); + }; + // 初始化所有连接 const initConnections = () => { if (connecting.value) return; @@ -495,6 +616,13 @@ connecting.value = true; addons.forEach(addon => { + // 清除重连计时器 + if (addon.reconnectTimer) { + clearTimeout(addon.reconnectTimer); + addon.reconnectTimer = null; + } + addon.reconnectAttempts = 0; + // 如果已连接,先关闭 if (wsConnections[addon.name] && wsConnections[addon.name].readyState === WebSocket.OPEN) { diff --git a/src/app/api/controller/WebSocketBase.php b/src/app/api/controller/WebSocketBase.php index 1fdc96831..47030c55a 100644 --- a/src/app/api/controller/WebSocketBase.php +++ b/src/app/api/controller/WebSocketBase.php @@ -5,15 +5,33 @@ namespace app\api\controller; use Ratchet\ConnectionInterface; use Ratchet\MessageComponentInterface; +/** + * 数据库连接检查器接口 + */ +interface DatabaseCheckerAwareInterface { + /** + * 设置数据库连接检查器 + * @param callable $checker 数据库连接检查函数 + */ + public function setDatabaseChecker(callable $checker); + + /** + * 检查数据库连接 + * @return bool 连接是否有效 + */ + public function checkDatabaseConnection(); +} + /** * WebSocket基类,用于各个addon继承实现自己的WebSocket控制器 * 提供统一的接口和基本功能,确保各个addon的WebSocket相互隔离 */ -abstract class WebSocketBase implements MessageComponentInterface +abstract class WebSocketBase implements MessageComponentInterface, DatabaseCheckerAwareInterface { protected $clients; protected $clientData; protected $addonName; + protected $databaseChecker; // 数据库连接检查器 /** * 构造函数 @@ -23,9 +41,31 @@ abstract class WebSocketBase implements MessageComponentInterface $this->clients = new \SplObjectStorage; $this->clientData = []; $this->addonName = $addonName; + $this->databaseChecker = null; // 初始化检查器为空 } + /** + * 设置数据库连接检查器 + * @param callable $checker 数据库连接检查函数 + */ + public function setDatabaseChecker(callable $checker) + { + $this->databaseChecker = $checker; + } + + /** + * 检查数据库连接 + * @return bool 连接是否有效 + */ + public function checkDatabaseConnection() + { + if ($this->databaseChecker) { + return call_user_func($this->databaseChecker); + } + return false; + } + /** * 当有新客户端连接时调用 * @param ConnectionInterface $conn @@ -55,11 +95,17 @@ abstract class WebSocketBase implements MessageComponentInterface public function onMessage(ConnectionInterface $conn, $message) { $numRecv = count($this->clients) - 1; - echo sprintf('[{$this->addonName}] Connection %d sending message "%s" to %d other connection%s' . "\n", - $conn->resourceId, $message, $numRecv, $numRecv == 1 ? '' : 's'); + echo sprintf('[%s] Connection %d sending message "%s" to %d other connection%s' . "\n", + $this->addonName, $conn->resourceId, $message, $numRecv, $numRecv == 1 ? '' : 's'); // 解析消息 try { + // 检查数据库连接 + $connStatus = $this->checkDatabaseConnection(); + if (!$connStatus) { + throw new \Exception('Database connection failed'); + } + $data = json_decode($message, true); if (json_last_error() !== JSON_ERROR_NONE) { throw new \Exception('Invalid JSON format'); diff --git a/src/ws_server.php b/src/ws_server.php index 7d18a2341..bcbf2765d 100644 --- a/src/ws_server.php +++ b/src/ws_server.php @@ -176,71 +176,54 @@ use app\model\system\Addon; $addonDir = __DIR__ . '/addon'; $addonNames = []; -// 从数据库获取addon列表(使用Cache缓存,避免频繁查询数据库) -$cacheKey = 'websocket_addon_list'; -$cacheExpire = 300; // 缓存5分钟 - +// 从数据库获取addon列表 try { - // 尝试从缓存获取addon列表 - $cachedAddons = $cache->get($cacheKey); + ws_echo("[WebSocket服务器] 从数据库获取插件列表"); - if ($cachedAddons !== null && !empty($cachedAddons)) { - ws_echo("[WebSocket服务器] 从缓存获取插件列表"); - $current_addons = $cachedAddons; - } else { - ws_echo("[WebSocket服务器] 从数据库获取插件列表"); + // 尝试获取数据库连接并确保连接有效 + try { + // 尝试初始化数据库连接 + $addon_model = new Addon(); + $addon_data = $addon_model->getAddonList([], 'name,status'); + $current_addons = $addon_data['data']; + } catch (\Exception $dbEx) { + ws_echo("[WebSocket服务器] 数据库操作失败: {$dbEx->getMessage()}", 'error'); + ws_echo("[WebSocket服务器] 尝试重新初始化数据库连接..."); - // 尝试获取数据库连接并确保连接有效 + // 尝试重新初始化应用和数据库连接 try { - // 尝试初始化数据库连接 + // 重新初始化应用 + $app->initialize(); + $cache = $app->cache; + + // 再次尝试获取插件列表 $addon_model = new Addon(); $addon_data = $addon_model->getAddonList([], 'name,status'); $current_addons = $addon_data['data']; - // 将结果存入缓存 - $cache->set($cacheKey, $current_addons, $cacheExpire); - ws_echo("[WebSocket服务器] 插件列表已缓存(有效期: {$cacheExpire}秒)"); - } catch (\Exception $dbEx) { - ws_echo("[WebSocket服务器] 数据库操作失败: {$dbEx->getMessage()}", 'error'); - ws_echo("[WebSocket服务器] 尝试重新初始化数据库连接..."); + ws_echo("[WebSocket服务器] 重新连接数据库成功"); + } catch (\Exception $retryEx) { + ws_echo("[WebSocket服务器] 重新连接数据库失败: {$retryEx->getMessage()}", 'error'); + ws_echo("[WebSocket服务器] 回退到直接扫描目录获取插件列表"); - // 尝试重新初始化应用和数据库连接 - try { - // 重新初始化应用 - $app->initialize(); - $cache = $app->cache; - - // 再次尝试获取插件列表 - $addon_model = new Addon(); - $addon_data = $addon_model->getAddonList([], 'name,status'); - $current_addons = $addon_data['data']; - - // 将结果存入缓存 - $cache->set($cacheKey, $current_addons, $cacheExpire); - ws_echo("[WebSocket服务器] 重新连接数据库成功,插件列表已缓存"); - } catch (\Exception $retryEx) { - ws_echo("[WebSocket服务器] 重新连接数据库失败: {$retryEx->getMessage()}", 'error'); - ws_echo("[WebSocket服务器] 回退到直接扫描目录获取插件列表"); - - // 回退到直接扫描目录 - $addonNames = []; - if (is_dir($addonDir)) { - $handle = opendir($addonDir); - while (($file = readdir($handle)) !== false) { - if ($file != '.' && $file != '..' && is_dir($addonDir . '/' . $file)) { - $addonNames[] = $file; - } + // 回退到直接扫描目录 + $addonNames = []; + if (is_dir($addonDir)) { + $handle = opendir($addonDir); + while (($file = readdir($handle)) !== false) { + if ($file != '.' && $file != '..' && is_dir($addonDir . '/' . $file)) { + $addonNames[] = $file; } - closedir($handle); - sort($addonNames); } - - $current_addon_names = $addonNames; - $enabled_addons = $addonNames; // 回退模式下默认所有插件都启用 - - // 跳过后续的数据库操作 - throw new \Exception('数据库连接失败,已回退到目录扫描模式'); + closedir($handle); + sort($addonNames); } + + $current_addon_names = $addonNames; + $enabled_addons = $addonNames; // 回退模式下默认所有插件都启用 + + // 跳过后续的数据库操作 + throw new \Exception('数据库连接失败,已回退到目录扫描模式'); } } @@ -336,8 +319,17 @@ foreach ($current_addon_names as $addonName) { if (class_exists($webSocketClass)) { // 注册到/ws/{addonName}路径 $path = '/ws/' . $addonName; + // 创建实例并注入数据库检查器 + $instance = new $webSocketClass($addonName); + + // 检查是否实现了DatabaseCheckerAwareInterface + if ($instance instanceof \app\api\controller\DatabaseCheckerAwareInterface) { + $instance->setDatabaseChecker('checkDatabaseConnection'); + ws_echo("[{$addonName}] 已注入数据库连接检查器"); + } + // 允许任意 Origin,并且不限制 Host(支持通过任意 IP/域名访问) - $ratchetApp->route($path, new $webSocketClass(), array('*'), ''); + $ratchetApp->route($path, $instance, array('*'), ''); ws_echo("已注册WebSocket控制器:{$webSocketClass} 到路径 {$path}"); $registeredAddons[] = $addonName; } else { @@ -350,75 +342,14 @@ foreach ($current_addon_names as $addonName) { } } -// 实现默认的/ws路径的简单测试控制器 -class DefaultWebSocketController implements MessageComponentInterface -{ - protected $clients; - - public function __construct() - { - $this->clients = new \SplObjectStorage; - } - - public function onOpen(ConnectionInterface $conn) - { - $this->clients->attach($conn); - ws_echo("[默认路径] New connection! ({$conn->resourceId})"); - $conn->send(json_encode([ - 'type' => 'welcome', - 'message' => '欢迎连接到默认WebSocket测试路径', - 'info' => '此路径仅用于测试,不提供实际功能。请使用/ws/{addonName}连接到具体的addon服务。' - ])); - } - - public function onMessage(ConnectionInterface $conn, $msg) { - ws_echo("[默认路径] Received message from {$conn->resourceId}: $msg"); - try { - // 检查数据库连接状态 - if (function_exists('checkDatabaseConnection')) { - checkDatabaseConnection(); - } - - $data = json_decode($msg, true); - if (isset($data['action']) && $data['action'] === 'ping') { - $conn->send(json_encode(['type' => 'pong'])); - } else { - $conn->send(json_encode([ - 'type' => 'info', - 'message' => '收到消息,但默认路径不提供实际功能', - 'received' => $data - ])); - } - } catch (\Exception $e) { - ws_echo("[默认路径] 解析消息失败: {$e->getMessage()}", 'error'); - $conn->send(json_encode(['type' => 'error', 'message' => '解析消息失败: ' . $e->getMessage()])); - } - } - - public function onClose(ConnectionInterface $conn) - { - $this->clients->detach($conn); - ws_echo("[默认路径] Connection {$conn->resourceId} has disconnected"); - } - - public function onError(ConnectionInterface $conn, \Exception $e) - { - ws_echo("[默认路径] An error has occurred: {$e->getMessage()}", 'error'); - $conn->close(); - } -} - -// 注册默认的/ws路径测试控制器 -// 默认测试路径同样不限制 Host -$ratchetApp->route('/ws', new DefaultWebSocketController(), array('*'), ''); -ws_echo("已注册默认WebSocket测试控制器到路径 /ws"); - /** * 检查数据库连接状态并在需要时重新初始化 * @return bool 连接是否有效 */ function checkDatabaseConnection() { global $app, $cache; + + ws_echo("[WebSocket服务器] 检查数据库连接状态..."); try { // 检查缓存中的连接状态 @@ -527,8 +458,6 @@ if (extension_loaded('pcntl')) { // 运行服务器 ws_echo("[WebSocket服务器] 启动主服务器进程"); -ws_echo("\n默认测试路径:"); -ws_echo(" - ws://{$httpHost}:{$port}/ws (默认路径,用于连接测试)"); ws_echo("按 Ctrl+C 停止服务器"); ws_info("WebSocket服务器已启动,监听地址: ws://{$httpHost}:{$port}"); $ratchetApp->run();