feat(websocket): 增强WebSocket连接的健壮性和重连机制

实现数据库连接检查接口并集成到WebSocket控制器
添加自动重连逻辑和指数退避策略
移除默认测试路径并优化插件列表获取流程
更新测试页面以支持重连状态显示
This commit is contained in:
2026-01-26 16:05:29 +08:00
parent 3a121e4db6
commit 2d21993720
4 changed files with 252 additions and 142 deletions

View File

@@ -63,6 +63,12 @@ class WebSocket extends WebSocketBase
// 解析消息 // 解析消息
try { try {
// 检查数据库连接
$connStatus = $this->checkDatabaseConnection();
if (!$connStatus) {
throw new \Exception('Database connection failed');
}
$data = json_decode($message, true); $data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) { if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON format'); throw new \Exception('Invalid JSON format');
@@ -1478,10 +1484,11 @@ class WebSocket extends WebSocketBase
$requestData['conversation_id'] = $conversation_id; $requestData['conversation_id'] = $conversation_id;
// ----- 只有会话ID的情况下下列情况才添加相关的数据 // ----- 只有会话ID的情况下下列情况才添加相关的数据
// 如果有files字段添加到请求中
if (!empty($origin_data['files']) && count($origin_data['files']) > 0) {
$requestData['files'] = $origin_data['files'];
} }
// 如果有files字段添加到请求中
if (!empty($origin_data['files'])) {
$requestData['files'] = $origin_data['files'] ?? [];
} }
return $requestData; return $requestData;

View File

@@ -195,23 +195,16 @@
title: 'aikefu Addon', title: 'aikefu Addon',
path: '/ws/aikefu', path: '/ws/aikefu',
fullPath: '', fullPath: '',
status: 'disconnected', // disconnected, connecting, connected, error status: 'disconnected', // disconnected, connecting, connected, error, reconnecting
statusText: '未连接', statusText: '未连接',
statusClass: 'disconnected', statusClass: 'disconnected',
messages: [], messages: [],
inputMessage: '', inputMessage: '',
conversation_id: '' conversation_id: '',
}, reconnectAttempts: 0,
{ maxReconnectAttempts: 5,
name: 'default', reconnectDelay: 1000,
title: '默认路径', reconnectTimer: null
path: '/ws',
status: 'disconnected',
statusText: '未连接',
statusClass: 'disconnected',
messages: [],
inputMessage: '',
conversation_id: ''
} }
]); ]);
@@ -245,7 +238,8 @@
addon.status = status; addon.status = status;
addon.statusText = statusText; addon.statusText = statusText;
addon.statusClass = status === 'connected' ? 'connected' : addon.statusClass = status === 'connected' ? 'connected' :
status === 'error' ? 'error' : 'disconnected'; status === 'error' ? 'error' :
status === 'reconnecting' ? 'error' : 'disconnected';
} }
}; };
@@ -471,9 +465,12 @@
} }
}; };
wsConnections[name].onclose = () => { wsConnections[name].onclose = (event) => {
updateAddonStatus(name, 'disconnected', '已断开'); updateAddonStatus(name, 'disconnected', '已断开');
addMessage(name, '系统', 'WebSocket连接已断开'); addMessage(name, '系统', 'WebSocket连接已断开' + (event.code ? ` (代码: ${event.code})` : ''));
// 尝试重连
reconnect(name);
}; };
wsConnections[name].onerror = (error) => { wsConnections[name].onerror = (error) => {
@@ -488,6 +485,130 @@
} }
}; };
// 重连逻辑
const reconnect = (name) => {
const addon = addons.find(a => a.name === name);
if (!addon) return;
// 清除之前的重连计时器
if (addon.reconnectTimer) {
clearTimeout(addon.reconnectTimer);
addon.reconnectTimer = null;
}
// 检查重连次数
if (addon.reconnectAttempts >= addon.maxReconnectAttempts) {
updateAddonStatus(name, 'error', '重连失败,已达到最大尝试次数');
addMessage(name, '系统', '重连失败,已达到最大尝试次数');
addon.reconnectAttempts = 0;
return;
}
// 计算重连延迟(指数退避)
const delay = addon.reconnectDelay * Math.pow(2, addon.reconnectAttempts);
addon.reconnectAttempts++;
updateAddonStatus(name, 'reconnecting', `尝试重连中... (${addon.reconnectAttempts}/${addon.maxReconnectAttempts})`);
addMessage(name, '系统', `将在 ${delay/1000} 秒后尝试重连...`);
// 设置重连计时器
addon.reconnectTimer = setTimeout(() => {
try {
const url = `${websocketUrl.value}${addon.path}`;
wsConnections[name] = new WebSocket(url);
wsConnections[name].onopen = () => {
updateAddonStatus(name, 'connected', '已重新连接');
addMessage(name, '系统', 'WebSocket连接已重新建立');
addon.reconnectAttempts = 0;
addon.reconnectTimer = null;
// 重新发送认证信息
const authMsg = JSON.stringify({
action: 'auth',
uniacid: 1,
user_id: 1,
token: 'test_token'
});
wsConnections[name].send(authMsg);
};
wsConnections[name].onmessage = (event) => {
try {
// 尝试解析JSON消息
const message = JSON.parse(event.data);
// 处理文件上传成功响应
if (message.type === 'upload_success') {
addMessage(name, '服务器', event.data);
// 更新上传文件列表
uploadFiles.push({
file_id: message.file_id,
file_name: message.file_name,
file_size: message.file_size,
file_extension: message.file_extension,
file_mime_type: message.file_mime_type,
file_created_by: message.file_created_by,
file_created_at: message.file_created_at,
file_url: message.file_url
});
console.log('上传文件列表更新:', uploadFiles);
} else if (message.type === 'file_preview_success') {
addMessage(name, '服务器', `文件预览成功: ${message.file_id}\n文件URL: ${message.file_url}`);
console.log('文件预览成功:', message);
// 打开文件预览
window.open(message.file_url, '_blank');
} else if (message.type === 'error' && message.code === 403 && message.error_type === 'file_access_denied') {
addMessage(name, '服务器', `文件访问被拒绝: ${message.message}`);
console.log('文件访问被拒绝:', message);
} else if (message.type === 'error' && message.code === 404 && message.error_type === 'file_not_found') {
addMessage(name, '服务器', `文件未找到: ${message.message}`);
console.log('文件未找到:', message);
} else if (message.type === 'error' && message.code === 400 && message.error_type === 'invalid_params') {
addMessage(name, '服务器', `参数输入异常: ${message.message}`);
console.log('参数输入异常:', message);
} else if (message.type === 'error' && message.code === 400 && message.error_type === 'unsupported_preview') {
addMessage(name, '服务器', `该文件不支持预览: ${message.message}`);
console.log('该文件不支持预览:', message);
} else {
// 其他消息,直接添加到聊天区域
addMessage(name, '服务器', event.data);
}
} catch (e) {
// 不是JSON消息直接添加到聊天区域
addMessage(name, '服务器', event.data);
}
};
wsConnections[name].onclose = (event) => {
updateAddonStatus(name, 'disconnected', '已断开');
addMessage(name, '系统', 'WebSocket连接已断开' + (event.code ? ` (代码: ${event.code})` : ''));
// 尝试重连
reconnect(name);
};
wsConnections[name].onerror = (error) => {
updateAddonStatus(name, 'error', '连接错误');
console.log('WebSocket连接错误: ', error);
addMessage(name, '系统', 'WebSocket连接错误: ' + (error.message || 'NS_ERROR_WEBSOCKET_CONNECTION_REFUSED'));
// 尝试重连
reconnect(name);
};
} catch (e) {
console.log('WebSocket重连失败: ', e);
updateAddonStatus(name, 'error', '重连失败');
addMessage(name, '系统', '无法连接到WebSocket服务器: ' + e.message);
// 继续尝试重连
reconnect(name);
}
}, delay);
};
// 初始化所有连接 // 初始化所有连接
const initConnections = () => { const initConnections = () => {
if (connecting.value) return; if (connecting.value) return;
@@ -495,6 +616,13 @@
connecting.value = true; connecting.value = true;
addons.forEach(addon => { addons.forEach(addon => {
// 清除重连计时器
if (addon.reconnectTimer) {
clearTimeout(addon.reconnectTimer);
addon.reconnectTimer = null;
}
addon.reconnectAttempts = 0;
// 如果已连接,先关闭 // 如果已连接,先关闭
if (wsConnections[addon.name] && if (wsConnections[addon.name] &&
wsConnections[addon.name].readyState === WebSocket.OPEN) { wsConnections[addon.name].readyState === WebSocket.OPEN) {

View File

@@ -5,15 +5,33 @@ namespace app\api\controller;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface; use Ratchet\MessageComponentInterface;
/**
* 数据库连接检查器接口
*/
interface DatabaseCheckerAwareInterface {
/**
* 设置数据库连接检查器
* @param callable $checker 数据库连接检查函数
*/
public function setDatabaseChecker(callable $checker);
/**
* 检查数据库连接
* @return bool 连接是否有效
*/
public function checkDatabaseConnection();
}
/** /**
* WebSocket基类用于各个addon继承实现自己的WebSocket控制器 * WebSocket基类用于各个addon继承实现自己的WebSocket控制器
* 提供统一的接口和基本功能确保各个addon的WebSocket相互隔离 * 提供统一的接口和基本功能确保各个addon的WebSocket相互隔离
*/ */
abstract class WebSocketBase implements MessageComponentInterface abstract class WebSocketBase implements MessageComponentInterface, DatabaseCheckerAwareInterface
{ {
protected $clients; protected $clients;
protected $clientData; protected $clientData;
protected $addonName; protected $addonName;
protected $databaseChecker; // 数据库连接检查器
/** /**
* 构造函数 * 构造函数
@@ -23,9 +41,31 @@ abstract class WebSocketBase implements MessageComponentInterface
$this->clients = new \SplObjectStorage; $this->clients = new \SplObjectStorage;
$this->clientData = []; $this->clientData = [];
$this->addonName = $addonName; $this->addonName = $addonName;
$this->databaseChecker = null; // 初始化检查器为空
} }
/**
* 设置数据库连接检查器
* @param callable $checker 数据库连接检查函数
*/
public function setDatabaseChecker(callable $checker)
{
$this->databaseChecker = $checker;
}
/**
* 检查数据库连接
* @return bool 连接是否有效
*/
public function checkDatabaseConnection()
{
if ($this->databaseChecker) {
return call_user_func($this->databaseChecker);
}
return false;
}
/** /**
* 当有新客户端连接时调用 * 当有新客户端连接时调用
* @param ConnectionInterface $conn * @param ConnectionInterface $conn
@@ -55,11 +95,17 @@ abstract class WebSocketBase implements MessageComponentInterface
public function onMessage(ConnectionInterface $conn, $message) public function onMessage(ConnectionInterface $conn, $message)
{ {
$numRecv = count($this->clients) - 1; $numRecv = count($this->clients) - 1;
echo sprintf('[{$this->addonName}] Connection %d sending message "%s" to %d other connection%s' . "\n", echo sprintf('[%s] Connection %d sending message "%s" to %d other connection%s' . "\n",
$conn->resourceId, $message, $numRecv, $numRecv == 1 ? '' : 's'); $this->addonName, $conn->resourceId, $message, $numRecv, $numRecv == 1 ? '' : 's');
// 解析消息 // 解析消息
try { try {
// 检查数据库连接
$connStatus = $this->checkDatabaseConnection();
if (!$connStatus) {
throw new \Exception('Database connection failed');
}
$data = json_decode($message, true); $data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) { if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON format'); throw new \Exception('Invalid JSON format');

View File

@@ -176,18 +176,8 @@ use app\model\system\Addon;
$addonDir = __DIR__ . '/addon'; $addonDir = __DIR__ . '/addon';
$addonNames = []; $addonNames = [];
// 从数据库获取addon列表使用Cache缓存避免频繁查询数据库 // 从数据库获取addon列表
$cacheKey = 'websocket_addon_list';
$cacheExpire = 300; // 缓存5分钟
try { try {
// 尝试从缓存获取addon列表
$cachedAddons = $cache->get($cacheKey);
if ($cachedAddons !== null && !empty($cachedAddons)) {
ws_echo("[WebSocket服务器] 从缓存获取插件列表");
$current_addons = $cachedAddons;
} else {
ws_echo("[WebSocket服务器] 从数据库获取插件列表"); ws_echo("[WebSocket服务器] 从数据库获取插件列表");
// 尝试获取数据库连接并确保连接有效 // 尝试获取数据库连接并确保连接有效
@@ -196,10 +186,6 @@ try {
$addon_model = new Addon(); $addon_model = new Addon();
$addon_data = $addon_model->getAddonList([], 'name,status'); $addon_data = $addon_model->getAddonList([], 'name,status');
$current_addons = $addon_data['data']; $current_addons = $addon_data['data'];
// 将结果存入缓存
$cache->set($cacheKey, $current_addons, $cacheExpire);
ws_echo("[WebSocket服务器] 插件列表已缓存(有效期: {$cacheExpire}秒)");
} catch (\Exception $dbEx) { } catch (\Exception $dbEx) {
ws_echo("[WebSocket服务器] 数据库操作失败: {$dbEx->getMessage()}", 'error'); ws_echo("[WebSocket服务器] 数据库操作失败: {$dbEx->getMessage()}", 'error');
ws_echo("[WebSocket服务器] 尝试重新初始化数据库连接..."); ws_echo("[WebSocket服务器] 尝试重新初始化数据库连接...");
@@ -215,9 +201,7 @@ try {
$addon_data = $addon_model->getAddonList([], 'name,status'); $addon_data = $addon_model->getAddonList([], 'name,status');
$current_addons = $addon_data['data']; $current_addons = $addon_data['data'];
// 将结果存入缓存 ws_echo("[WebSocket服务器] 重新连接数据库成功");
$cache->set($cacheKey, $current_addons, $cacheExpire);
ws_echo("[WebSocket服务器] 重新连接数据库成功,插件列表已缓存");
} catch (\Exception $retryEx) { } catch (\Exception $retryEx) {
ws_echo("[WebSocket服务器] 重新连接数据库失败: {$retryEx->getMessage()}", 'error'); ws_echo("[WebSocket服务器] 重新连接数据库失败: {$retryEx->getMessage()}", 'error');
ws_echo("[WebSocket服务器] 回退到直接扫描目录获取插件列表"); ws_echo("[WebSocket服务器] 回退到直接扫描目录获取插件列表");
@@ -242,7 +226,6 @@ try {
throw new \Exception('数据库连接失败,已回退到目录扫描模式'); throw new \Exception('数据库连接失败,已回退到目录扫描模式');
} }
} }
}
$db_addon_names = array_column($current_addons, 'name'); $db_addon_names = array_column($current_addons, 'name');
@@ -336,8 +319,17 @@ foreach ($current_addon_names as $addonName) {
if (class_exists($webSocketClass)) { if (class_exists($webSocketClass)) {
// 注册到/ws/{addonName}路径 // 注册到/ws/{addonName}路径
$path = '/ws/' . $addonName; $path = '/ws/' . $addonName;
// 创建实例并注入数据库检查器
$instance = new $webSocketClass($addonName);
// 检查是否实现了DatabaseCheckerAwareInterface
if ($instance instanceof \app\api\controller\DatabaseCheckerAwareInterface) {
$instance->setDatabaseChecker('checkDatabaseConnection');
ws_echo("[{$addonName}] 已注入数据库连接检查器");
}
// 允许任意 Origin并且不限制 Host支持通过任意 IP/域名访问) // 允许任意 Origin并且不限制 Host支持通过任意 IP/域名访问)
$ratchetApp->route($path, new $webSocketClass(), array('*'), ''); $ratchetApp->route($path, $instance, array('*'), '');
ws_echo("已注册WebSocket控制器{$webSocketClass} 到路径 {$path}"); ws_echo("已注册WebSocket控制器{$webSocketClass} 到路径 {$path}");
$registeredAddons[] = $addonName; $registeredAddons[] = $addonName;
} else { } else {
@@ -350,69 +342,6 @@ foreach ($current_addon_names as $addonName) {
} }
} }
// 实现默认的/ws路径的简单测试控制器
class DefaultWebSocketController implements MessageComponentInterface
{
protected $clients;
public function __construct()
{
$this->clients = new \SplObjectStorage;
}
public function onOpen(ConnectionInterface $conn)
{
$this->clients->attach($conn);
ws_echo("[默认路径] New connection! ({$conn->resourceId})");
$conn->send(json_encode([
'type' => 'welcome',
'message' => '欢迎连接到默认WebSocket测试路径',
'info' => '此路径仅用于测试,不提供实际功能。请使用/ws/{addonName}连接到具体的addon服务。'
]));
}
public function onMessage(ConnectionInterface $conn, $msg) {
ws_echo("[默认路径] Received message from {$conn->resourceId}: $msg");
try {
// 检查数据库连接状态
if (function_exists('checkDatabaseConnection')) {
checkDatabaseConnection();
}
$data = json_decode($msg, true);
if (isset($data['action']) && $data['action'] === 'ping') {
$conn->send(json_encode(['type' => 'pong']));
} else {
$conn->send(json_encode([
'type' => 'info',
'message' => '收到消息,但默认路径不提供实际功能',
'received' => $data
]));
}
} catch (\Exception $e) {
ws_echo("[默认路径] 解析消息失败: {$e->getMessage()}", 'error');
$conn->send(json_encode(['type' => 'error', 'message' => '解析消息失败: ' . $e->getMessage()]));
}
}
public function onClose(ConnectionInterface $conn)
{
$this->clients->detach($conn);
ws_echo("[默认路径] Connection {$conn->resourceId} has disconnected");
}
public function onError(ConnectionInterface $conn, \Exception $e)
{
ws_echo("[默认路径] An error has occurred: {$e->getMessage()}", 'error');
$conn->close();
}
}
// 注册默认的/ws路径测试控制器
// 默认测试路径同样不限制 Host
$ratchetApp->route('/ws', new DefaultWebSocketController(), array('*'), '');
ws_echo("已注册默认WebSocket测试控制器到路径 /ws");
/** /**
* 检查数据库连接状态并在需要时重新初始化 * 检查数据库连接状态并在需要时重新初始化
* @return bool 连接是否有效 * @return bool 连接是否有效
@@ -420,6 +349,8 @@ ws_echo("已注册默认WebSocket测试控制器到路径 /ws");
function checkDatabaseConnection() { function checkDatabaseConnection() {
global $app, $cache; global $app, $cache;
ws_echo("[WebSocket服务器] 检查数据库连接状态...");
try { try {
// 检查缓存中的连接状态 // 检查缓存中的连接状态
$connStatus = $cache->get('db_connection_status'); $connStatus = $cache->get('db_connection_status');
@@ -527,8 +458,6 @@ if (extension_loaded('pcntl')) {
// 运行服务器 // 运行服务器
ws_echo("[WebSocket服务器] 启动主服务器进程"); ws_echo("[WebSocket服务器] 启动主服务器进程");
ws_echo("\n默认测试路径:");
ws_echo(" - ws://{$httpHost}:{$port}/ws (默认路径,用于连接测试)");
ws_echo("按 Ctrl+C 停止服务器"); ws_echo("按 Ctrl+C 停止服务器");
ws_info("WebSocket服务器已启动监听地址: ws://{$httpHost}:{$port}"); ws_info("WebSocket服务器已启动监听地址: ws://{$httpHost}:{$port}");
$ratchetApp->run(); $ratchetApp->run();