From d975abb3de0b7597d5fd6a60180fa3634ca8396d Mon Sep 17 00:00:00 2001 From: ZF sun <34314687@qq.com> Date: Sat, 24 Jan 2026 15:04:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(websocket):=20=E5=AE=9E=E7=8E=B0=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=88=86=E7=89=87=E4=B8=8A=E4=BC=A0=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B9=B6=E9=87=8D=E6=9E=84=E8=AE=A4=E8=AF=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构WebSocketBase类,移除冗余属性,简化认证参数传递方式。新增文件分片上传功能,包括分片上传、合并、状态检查等完整流程。前端页面添加文件上传UI组件,支持断点续传和进度显示。优化认证逻辑,统一使用data参数传递认证信息,提高代码可维护性。 --- src/addon/aikefu/api/controller/WebSocket.php | 526 +++++++++++++++--- .../aikefu/docs/ws_multi_addon_test.html | 373 ++++++++++++- src/app/api/controller/WebSocketBase.php | 26 +- 3 files changed, 834 insertions(+), 91 deletions(-) diff --git a/src/addon/aikefu/api/controller/WebSocket.php b/src/addon/aikefu/api/controller/WebSocket.php index dcd9f2e95..32b08bfa1 100644 --- a/src/addon/aikefu/api/controller/WebSocket.php +++ b/src/addon/aikefu/api/controller/WebSocket.php @@ -16,15 +16,6 @@ use React\EventLoop\Loop; class WebSocket extends WebSocketBase { - // 控制器属性,用于替代BaseApi中的属性 - public $params; - public $token; - protected $user_id; - protected $site_id; - protected $uniacid; - protected $site_ids = []; - public $app_type; - // 存储正在进行的流式请求信息 protected $streamingRequests = []; @@ -32,14 +23,6 @@ class WebSocket extends WebSocketBase { // 调用父类构造函数,传入当前addon名称 parent::__construct('aikefu'); - - // 初始化控制器属性 - $this->params = []; - $this->token = ''; - $this->user_id = 0; - $this->site_id = 0; - $this->uniacid = 0; - $this->app_type = 'weapp'; // 默认微信小程序 } /** @@ -101,7 +84,7 @@ class WebSocket extends WebSocketBase if (isset($data['action']) && $data['action'] === 'ping') { $conn->send(json_encode(['type' => 'pong'])); return; - } + } // 处理聊天消息 if (isset($data['action']) && $data['action'] === 'chat') { @@ -109,7 +92,28 @@ class WebSocket extends WebSocketBase return; } + + + // 处理分片上传 + if (isset($data['action']) && $data['action'] === 'upload_chunk') { + $this->handleUploadChunk($conn, $data); + return; + } + + // 处理分片合并 + if (isset($data['action']) && $data['action'] === 'upload_merge') { + $this->handleMergeChunks($conn, $data); + return; + } + + // 处理分片状态检查 + if (isset($data['action']) && $data['action'] === 'upload_check') { + $this->handleCheckChunks($conn, $data); + return; + } + $conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action'])); + } catch (\Exception $e) { $conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()])); } @@ -155,12 +159,12 @@ class WebSocket extends WebSocketBase * - WebSocket 连接场景没有 request()/input(),所以这里直接根据客户端传入的 site_id + token 解密校验 * - 校验 token 解密成功、未过期、且 token 内 member_id 与传入 member_id 一致 */ - protected function doAuth(ConnectionInterface $conn, $site_id, $user_id, $token) + protected function doAuth(ConnectionInterface $conn, $data) { // 与 Kefu.php 保持一致,支持使用 uniacid 作为站点ID - $site_id = (int)$site_id; - $user_id = (int)$user_id; - $token = (string)$token; + $site_id = (int) ($data['uniacid'] ?? $data['site_id'] ?? 0); + $user_id = (int) ($data['user_id'] ?? 0); + $token = (string) ($data['token'] ?? ''); if ($site_id <= 0 || $user_id <= 0 || $token === '') { throw new \Exception('Missing authentication parameters'); @@ -189,18 +193,18 @@ class WebSocket extends WebSocketBase $this->log('decrypt:' . $decrypt, 'info'); - $data = json_decode($decrypt, true); - if (!is_array($data) || empty($data['member_id'])) { + $decrypted_data = json_decode($decrypt, true); + if (!is_array($decrypted_data) || empty($decrypted_data['member_id'])) { throw new \Exception('TOKEN_ERROR'); } // member_id 必须一致,避免冒用 - if ((int)$data['member_id'] !== $user_id) { + if ((int) $decrypted_data['member_id'] !== $user_id) { throw new \Exception('TOKEN_ERROR'); } // 过期校验:expire_time=0 为永久,其余必须未过期 - $expire_time = (int)($data['expire_time'] ?? 0); + $expire_time = (int) ($decrypted_data['expire_time'] ?? 0); if ($expire_time !== 0 && $expire_time < time()) { throw new \Exception('TOKEN_EXPIRE'); } @@ -240,25 +244,24 @@ class WebSocket extends WebSocketBase $stream = $data['stream'] ?? false; $response_mode = $data['response_mode'] ?? 'streaming'; // 与 Kefu.php 保持一致 - // 设置当前控制器的属性,与 Kefu.php 保持一致的参数优先级 - $this->site_id = $data['uniacid'] ?? $clientInfo['site_id']; - $this->user_id = $data['user_id'] ?? $clientInfo['user_id']; - $this->token = $data['token'] ?? $clientInfo['token']; - $this->params = [ - 'query' => $query, - 'user_id' => $user_id, - 'conversation_id' => $conversation_id, - 'stream' => $stream, - 'response_mode' => $response_mode, - 'uniacid' => $this->site_id, - 'user_id' => $this->user_id, - 'token' => $this->token, - ]; + // 获取当前连接的客户端信息 + $site_id = $data['uniacid'] ?? $clientInfo['site_id']; + $user_id = $data['user_id'] ?? $clientInfo['user_id']; + $token = $data['token'] ?? $clientInfo['token']; // 验证参数并获取配置,与 Kefu.php 保持一致 $config = $this->validateAndGetConfig([ 'query' => ['required' => true, 'message' => '参数错误,请检查 `query` 参数是否设置正确', 'description' => '消息内容'], 'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID'] + ], [ + 'query' => $query, + 'user_id' => $user_id, + 'conversation_id' => $conversation_id, + 'stream' => $stream, + 'response_mode' => $response_mode, + 'uniacid' => $site_id, + 'user_id' => $user_id, + 'token' => $token, ]); // 是否启用流式响应,与 Kefu.php 保持一致 @@ -273,10 +276,10 @@ class WebSocket extends WebSocketBase if ($enable_stream) { // 处理流式响应 - $this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id); + $this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id, $site_id); } else { // 处理非流式响应 - $response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id); + $response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id, $site_id); $conn->send(json_encode(['type' => 'message', 'data' => $response])); } } catch (\Exception $e) { @@ -284,6 +287,401 @@ class WebSocket extends WebSocketBase } } + + + /** + * 处理分片上传 + * @param ConnectionInterface $conn + * @param array $data + */ + private function handleUploadChunk(ConnectionInterface $conn, $data) + { + try { + // 获取客户端信息 + $clientInfo = $this->clientData[$conn->resourceId]; + + // 获取分片上传相关参数 + $file_id = $data['file_id'] ?? ''; + $file_name = $data['file_name'] ?? ''; + $file_type = $data['file_type'] ?? ''; + $chunk_index = $data['chunk_index'] ?? 0; + $total_chunks = $data['total_chunks'] ?? 0; + $chunk_size = $data['chunk_size'] ?? 0; + $file_size = $data['file_size'] ?? 0; + $file_content = $data['file_content'] ?? ''; + $user_id = $data['user_id'] ?? $clientInfo['user_id']; + $site_id = $data['uniacid'] ?? $clientInfo['site_id']; + + // 验证参数 + if (empty($file_id) || empty($file_name) || empty($file_content)) { + throw new \Exception('分片上传参数不完整'); + } + + // 创建临时目录存储分片 + $temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id; + if (!is_dir($temp_dir)) { + mkdir($temp_dir, 0777, true); + } + + // 保存分片文件 + $chunk_file = $temp_dir . '/' . $chunk_index; + $file_data = base64_decode($file_content); + if ($file_data === false) { + throw new \Exception('分片内容base64解码失败'); + } + + // 写入分片文件 + file_put_contents($chunk_file, $file_data); + + // 发送分片上传成功响应 + $conn->send(json_encode([ + 'type' => 'chunk_uploaded', + 'file_id' => $file_id, + 'chunk_index' => $chunk_index, + 'message' => '分片上传成功' + ])); + + $this->log('分片上传成功,文件ID:' . $file_id . ',分片索引:' . $chunk_index . '/' . $total_chunks, 'info'); + + } catch (\Exception $e) { + $conn->send(json_encode(['type' => 'error', 'message' => '分片上传失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? ''])); + } + } + + /** + * 处理分片合并 + * @param ConnectionInterface $conn + * @param array $data + */ + private function handleMergeChunks(ConnectionInterface $conn, $data) + { + try { + // 获取客户端信息 + $clientInfo = $this->clientData[$conn->resourceId]; + + // 获取合并相关参数 + $file_id = $data['file_id'] ?? ''; + $file_name = $data['file_name'] ?? ''; + $file_type = $data['file_type'] ?? ''; + $total_chunks = $data['total_chunks'] ?? 0; + $file_size = $data['file_size'] ?? 0; + $user_id = $data['user_id'] ?? $clientInfo['user_id']; + $site_id = $data['uniacid'] ?? $clientInfo['site_id']; + $token = $data['token'] ?? $clientInfo['token']; + + // 验证参数 + if (empty($file_id) || empty($file_name)) { + throw new \Exception('合并参数不完整'); + } + + // 获取临时目录 + $temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id; + if (!is_dir($temp_dir)) { + throw new \Exception('分片存储目录不存在'); + } + + // 验证所有分片是否存在 + for ($i = 0; $i < $total_chunks; $i++) { + $chunk_file = $temp_dir . '/' . $i; + if (!file_exists($chunk_file)) { + throw new \Exception('分片文件缺失:' . $i); + } + } + + // 合并分片 + $merged_file = $temp_dir . '/merged_' . $file_name; + $merged_handle = fopen($merged_file, 'wb'); + if (!$merged_handle) { + throw new \Exception('创建合并文件失败'); + } + + // 按顺序读取并合并分片 + for ($i = 0; $i < $total_chunks; $i++) { + $chunk_file = $temp_dir . '/' . $i; + $chunk_handle = fopen($chunk_file, 'rb'); + if (!$chunk_handle) { + fclose($merged_handle); + throw new \Exception('打开分片文件失败:' . $i); + } + + // 读取分片内容并写入合并文件 + while (!feof($chunk_handle)) { + $buffer = fread($chunk_handle, 8192); + fwrite($merged_handle, $buffer); + } + + fclose($chunk_handle); + } + + fclose($merged_handle); + + // 验证合并后的文件大小 + if (filesize($merged_file) !== $file_size) { + throw new \Exception('合并后的文件大小与预期不符'); + } + + // 验证参数并获取配置,与 Kefu.php 保持一致 + $config = $this->validateAndGetConfig([ + 'file_name' => ['required' => true, 'message' => '文件名不能为空', 'description' => '文件名'], + 'file_type' => ['required' => true, 'message' => '文件类型不能为空', 'description' => '文件类型'], + 'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID'] + ], [ + 'file_name' => $file_name, + 'file_type' => $file_type, + 'user_id' => $user_id, + 'uniacid' => $site_id, + 'token' => $token + ]); + + // 发送请求到Dify API + $url = $config['base_url'] . '/files/upload'; + + // 构建请求头 + $headers = [ + 'Authorization: Bearer ' . $config['api_key'], + ]; + + // 读取合并后的文件内容 + $file_content = base64_encode(file_get_contents($merged_file)); + + // 发送文件上传请求(使用 multipart/form-data 格式) + $response = $this->curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers); + + // 解析响应 + $result = json_decode($response, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \Exception('解析响应失败'); + } + + // 验证响应数据 + if (empty($result) || !isset($result['id'])) { + throw new \Exception('API返回数据格式错误或缺少必要字段'); + } + + // 清理临时文件 + $this->cleanupTempFiles($temp_dir); + + // 发送合并成功响应 + $conn->send(json_encode([ + 'type' => 'upload_success', + 'file_id' => $result['id'] ?? '', + 'file_name' => $result['name'] ?? $file_name, + 'file_size' => $result['size'] ?? $file_size, + 'file_extension' => $result['extension'] ?? '', + 'file_mime_type' => $result['mime_type'] ?? $file_type, + 'file_created_by' => $result['created_by'] ?? '', + 'file_created_at' => $result['created_at'] ?? '', + 'file_url' => $result['url'] ?? '' + ])); + + $this->log('文件上传成功,用户ID:' . $user_id . ',文件名:' . $file_name . ',文件ID:' . $result['id'], 'info'); + + } catch (\Exception $e) { + // 清理临时文件 + if (!empty($data['file_id'])) { + $temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $data['file_id']; + if (is_dir($temp_dir)) { + $this->cleanupTempFiles($temp_dir); + } + } + + // 解析错误信息 + $errorMessage = $e->getMessage(); + $errorCode = 500; + $errorType = 'upload_failed'; + + // 提取HTTP错误码和Dify错误信息 + if (preg_match('/HTTP请求失败,状态码:(\d+),响应:(.*)/', $errorMessage, $matches)) { + $errorCode = (int)$matches[1]; + $errorResponse = $matches[2]; + + try { + $errorData = json_decode($errorResponse, true); + if (isset($errorData['code'])) { + $errorType = $errorData['code']; + } + if (isset($errorData['message'])) { + $errorMessage = $errorData['message']; + } + } catch (\Exception $decodeEx) { + // 解析失败,使用原始错误信息 + } + } + + $conn->send(json_encode([ + 'type' => 'error', + 'code' => $errorCode, + 'error_type' => $errorType, + 'message' => '文件上传失败:' . $errorMessage + ])); + } + } + + /** + * 清理临时文件 + * @param string $dir 临时目录 + */ + private function cleanupTempFiles($dir) + { + if (!is_dir($dir)) { + return; + } + + $files = scandir($dir); + foreach ($files as $file) { + if ($file === '.' || $file === '..') { + continue; + } + + $file_path = $dir . '/' . $file; + if (is_file($file_path)) { + unlink($file_path); + } elseif (is_dir($file_path)) { + $this->cleanupTempFiles($file_path); + } + } + + rmdir($dir); + } + + /** + * 处理分片状态检查 + * @param ConnectionInterface $conn + * @param array $data + */ + private function handleCheckChunks(ConnectionInterface $conn, $data) + { + try { + // 获取客户端信息 + $clientInfo = $this->clientData[$conn->resourceId]; + + // 获取检查相关参数 + $file_id = $data['file_id'] ?? ''; + $total_chunks = $data['total_chunks'] ?? 0; + $user_id = $data['user_id'] ?? $clientInfo['user_id']; + + // 验证参数 + if (empty($file_id)) { + throw new \Exception('文件ID不能为空'); + } + + // 获取临时目录 + $temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id; + $uploaded_chunks = []; + + // 检查临时目录是否存在 + if (is_dir($temp_dir)) { + // 扫描目录中的分片文件 + $files = scandir($temp_dir); + foreach ($files as $file) { + if ($file === '.' || $file === '..' || strpos($file, 'merged_') === 0) { + continue; + } + + // 检查是否是数字文件名(分片索引) + if (is_numeric($file)) { + $chunk_index = (int) $file; + // 检查分片文件是否存在且不为空 + if (file_exists($temp_dir . '/' . $file) && filesize($temp_dir . '/' . $file) > 0) { + $uploaded_chunks[] = $chunk_index; + } + } + } + } + + // 发送分片状态响应 + $conn->send(json_encode([ + 'type' => 'chunks_status', + 'file_id' => $file_id, + 'uploaded_chunks' => $uploaded_chunks, + 'total_chunks' => $total_chunks, + 'message' => '分片状态检查成功' + ])); + + $this->log('分片状态检查成功,文件ID:' . $file_id . ',已上传分片数:' . count($uploaded_chunks) . '/' . $total_chunks, 'info'); + + } catch (\Exception $e) { + $conn->send(json_encode(['type' => 'error', 'message' => '分片状态检查失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? ''])); + } + } + + /** + * 封装文件上传的curl请求方法(适用于 Dify 1.9.0 版本) + * @param string $url 请求URL + * @param string $file_name 文件名 + * @param string $file_type 文件类型 + * @param string $file_content base64编码的文件内容 + * @param string $user_id 用户ID + * @param array $headers 请求头 + * @return string 响应内容 + */ + private function curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers = []) + { + // 解码base64文件内容 + $file_data = base64_decode($file_content); + if ($file_data === false) { + throw new \Exception('文件内容base64解码失败'); + } + + // 创建临时文件 + $temp_file = tempnam(sys_get_temp_dir(), 'dify_upload_'); + file_put_contents($temp_file, $file_data); + + try { + $ch = curl_init(); + + // 设置URL + curl_setopt($ch, CURLOPT_URL, $url); + + // 设置请求方法 + curl_setopt($ch, CURLOPT_POST, true); + + // 设置文件上传 + $cfile = curl_file_create($temp_file, $file_type, $file_name); + $post_data = [ + 'file' => $cfile, + 'user' => $user_id + ]; + + // 设置POST数据 + curl_setopt($ch, CURLOPT_POSTFIELDS, $post_data); + + // 设置请求头 - 不设置 Content-Type,让 curl 自动设置为 multipart/form-data + if (!empty($headers)) { + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + } + + // 设置返回值 + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); + curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); + curl_setopt($ch, CURLOPT_TIMEOUT, 60); + + // 执行请求 + $response = curl_exec($ch); + $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); + + // 关闭连接 + curl_close($ch); + + if ($response === false) { + throw new \Exception('Curl请求失败'); + } + + if ($httpCode >= 400) { + throw new \Exception('HTTP请求失败,状态码:' . $httpCode . ',响应:' . $response); + } + + return $response; + } finally { + // 清理临时文件 + if (file_exists($temp_file)) { + unlink($temp_file); + } + } + } + /** * 处理流式响应 * @param ConnectionInterface $conn @@ -292,8 +690,9 @@ class WebSocket extends WebSocketBase * @param array $headers * @param string $query * @param string $user_id + * @param string $site_id */ - private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id) + private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id, $site_id) { try { // 记录开始处理流式请求 @@ -302,8 +701,7 @@ class WebSocket extends WebSocketBase // 初始化模型 $kefu_conversation_model = new KefuConversationModel(); $kefu_message_model = new KefuMessageModel(); - $site_id = $this->site_id; - $current_user_id = $this->user_id; + $current_user_id = $user_id; // 定义变量 $real_conversation_id = ''; @@ -613,7 +1011,7 @@ class WebSocket extends WebSocketBase $this->log('开始流式请求,请求ID:' . $requestId, 'info'); // 检查客户端连接状态的回调 - $on_check = function() use ($conn, $requestId) { + $on_check = function () use ($conn, $requestId) { // 检查连接是否仍然在客户端列表中(通过检查clientData) if (!isset($this->clientData[$requestId])) { $this->log('客户端连接已关闭,停止流式请求:' . $requestId, 'info'); @@ -628,19 +1026,7 @@ class WebSocket extends WebSocketBase }; // 流式完成回调:仅在上游流真正结束后才触发(避免立刻发送 done) - $on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use ( - $conn, - $requestId, - $user_id, - &$real_conversation_id, - &$real_assistant_message_id, - &$assistant_content, - $kefu_message_model, - $kefu_conversation_model, - $site_id, - $current_user_id, - $temp_conversation_id - ) { + $on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use ($conn, $requestId, $user_id, &$real_conversation_id, &$real_assistant_message_id, &$assistant_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) { // 从流式请求列表中移除 if (isset($this->streamingRequests[$requestId])) { unset($this->streamingRequests[$requestId]); @@ -698,9 +1084,7 @@ class WebSocket extends WebSocketBase try { $kefu_conversation_model = new KefuConversationModel(); $kefu_message_model = new KefuMessageModel(); - $site_id = $this->site_id; - $current_user_id = $this->member_id; - $this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id); + $this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $user_id, $temp_conversation_id); } catch (\Exception $cleanupException) { $this->log('清理临时数据时也发生异常:' . $cleanupException->getMessage(), 'error'); } @@ -894,7 +1278,7 @@ class WebSocket extends WebSocketBase * @return array * @throws \Exception */ - private function validateAndGetConfig($params_rules = []) + private function validateAndGetConfig($params_rules = [], $params = []) { // 参数验证规则 $rules = []; @@ -904,14 +1288,20 @@ class WebSocket extends WebSocketBase // 验证参数 foreach ($rules as $field => $rule) { - if (isset($rule['required']) && $rule['required'] && empty($this->params[$field])) { + if (isset($rule['required']) && $rule['required'] && empty($params[$field])) { throw new \Exception($rule['message']); } } + // 获取站点ID + $site_id = $params['uniacid'] ?? 0; + if (empty($site_id)) { + throw new \Exception('站点ID不能为空'); + } + // 获取智能客服配置 $kefu_config_model = new KefuConfigModel(); - $config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? []; + $config_info = $kefu_config_model->getConfig($site_id)['data']['value'] ?? []; if (empty($config_info) || $config_info['status'] != 1) { throw new \Exception('智能客服暂未启用'); @@ -1136,16 +1526,16 @@ class WebSocket extends WebSocketBase * @param string $message 用户消息 * @param string $user_id 用户ID * @param string $conversation_id 会话ID + * @param string $site_id 站点ID * @return array * @throws \Exception */ - private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id) + private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id, $site_id) { // 初始化模型 $kefu_conversation_model = new KefuConversationModel(); $kefu_message_model = new KefuMessageModel(); - $site_id = $this->site_id; - $current_user_id = $this->member_id; + $current_user_id = $user_id; // 开启事务,确保数据一致性(对齐 Kefu.php 的非流式存储行为) Db::startTrans(); diff --git a/src/addon/aikefu/docs/ws_multi_addon_test.html b/src/addon/aikefu/docs/ws_multi_addon_test.html index 211923b0a..70c50b3ff 100644 --- a/src/addon/aikefu/docs/ws_multi_addon_test.html +++ b/src/addon/aikefu/docs/ws_multi_addon_test.html @@ -148,6 +148,19 @@ 发送 +
+ + +
+
上传进度: 0%
+
+
+
+
+
@@ -449,6 +462,7 @@ action: 'chat', query: message, user_id: 1, + uniacid: 1, stream: true, response_mode: 'streaming', conversation_id: addon.conversation_id @@ -462,6 +476,362 @@ } }; + // 分片上传相关配置 + const chunkSize = 5 * 1024 * 1024; // 5MB 分片大小 + + // 存储上传状态 + const uploadStates = new Map(); + + // 读取文件的指定部分 + const readFileChunk = (file, start, end) => { + return new Promise((resolve, reject) => { + const reader = new FileReader(); + const blob = file.slice(start, end); + + reader.onload = (e) => { + const base64Content = e.target.result.split(',')[1]; // 去除 base64 前缀 + resolve(base64Content); + }; + + reader.onerror = () => { + reject(new Error('文件分片读取失败')); + }; + + reader.readAsDataURL(blob); + }); + }; + + // 更新上传进度 + const updateUploadProgress = (name, progress) => { + const progressDiv = document.getElementById('upload-progress-' + name); + const progressText = document.getElementById('progress-text-' + name); + const progressBar = document.getElementById('progress-bar-' + name); + + if (progressDiv && progressText && progressBar) { + progressDiv.style.display = 'block'; + progressText.textContent = Math.round(progress) + '%'; + progressBar.style.width = progress + '%'; + } + }; + + // 生成文件唯一标识 + const generateFileId = (file) => { + return file.name + '_' + file.size + '_' + file.lastModified; + }; + + // 保存上传状态 + const saveUploadState = (name, fileId, state) => { + const key = name + '_' + fileId; + uploadStates.set(key, state); + // 也可以存储到 localStorage 中,以便刷新页面后仍能恢复 + try { + localStorage.setItem('upload_state_' + key, JSON.stringify(state)); + } catch (e) { + console.error('保存上传状态失败:', e); + } + }; + + // 获取上传状态 + const getUploadState = (name, fileId) => { + const key = name + '_' + fileId; + // 先从内存中获取 + if (uploadStates.has(key)) { + return uploadStates.get(key); + } + // 再从 localStorage 中获取 + try { + const stateStr = localStorage.getItem('upload_state_' + key); + if (stateStr) { + const state = JSON.parse(stateStr); + uploadStates.set(key, state); + return state; + } + } catch (e) { + console.error('获取上传状态失败:', e); + } + return null; + }; + + // 删除上传状态 + const deleteUploadState = (name, fileId) => { + const key = name + '_' + fileId; + uploadStates.delete(key); + try { + localStorage.removeItem('upload_state_' + key); + } catch (e) { + console.error('删除上传状态失败:', e); + } + }; + + // 检查分片状态 + const checkChunkStatus = async (name, fileId, totalChunks) => { + return new Promise((resolve, reject) => { + if (!wsConnections[name] || wsConnections[name].readyState !== WebSocket.OPEN) { + reject(new Error('WebSocket未连接')); + return; + } + + // 发送检查分片状态的请求 + const checkMsg = JSON.stringify({ + action: 'upload_check', + file_id: fileId, + total_chunks: totalChunks, + user_id: 1, + uniacid: 1, + }); + + // 存储原始的 onmessage 处理函数 + const originalOnMessage = wsConnections[name].onmessage; + + // 临时替换 onmessage 处理函数,等待检查响应 + wsConnections[name].onmessage = (event) => { + // 先调用原始的 onmessage 处理函数 + if (originalOnMessage) { + originalOnMessage(event); + } + + try { + const response = JSON.parse(event.data); + if (response.type === 'chunks_status' && response.file_id === fileId) { + // 恢复原始的 onmessage 处理函数 + wsConnections[name].onmessage = originalOnMessage; + resolve(response.uploaded_chunks || []); + } else if (response.type === 'error' && response.file_id === fileId) { + // 恢复原始的 onmessage 处理函数 + wsConnections[name].onmessage = originalOnMessage; + reject(new Error(response.message)); + } + } catch (e) { + // 非JSON响应,忽略 + } + }; + + // 发送检查请求 + wsConnections[name].send(checkMsg); + + // 设置超时 + setTimeout(() => { + // 恢复原始的 onmessage 处理函数 + wsConnections[name].onmessage = originalOnMessage; + reject(new Error('检查分片状态超时')); + }, 30000); + }); + }; + + // 上传文件 + const uploadFile = async (name) => { + const addon = addons.find(a => a.name === name); + if (!addon || addon.status !== 'connected') return; + + const fileInput = document.getElementById('file-input-' + name); + const file = fileInput.files[0]; + if (!file) { + addMessage(name, '系统', '请选择要上传的文件'); + return; + } + + // 添加文件上传消息 + addMessage(name, '用户', `正在上传文件: ${file.name}`); + + // 初始化上传参数 + const fileName = file.name; + const fileType = file.type; + const fileSize = file.size; + const totalChunks = Math.ceil(fileSize / chunkSize); + const fileId = generateFileId(file); // 生成基于文件信息的唯一ID + + // 显示上传进度 + updateUploadProgress(name, 0); + + try { + // 检查是否有未完成的上传 + let uploadedChunks = []; + let uploadedCount = 0; + + // 尝试获取上传状态 + const savedState = getUploadState(name, fileId); + if (savedState) { + uploadedChunks = savedState.uploadedChunks || []; + uploadedCount = uploadedChunks.length; + addMessage(name, '系统', `发现未完成的上传,已上传 ${uploadedCount}/${totalChunks} 分片`); + } else { + // 检查服务器上已上传的分片 + try { + uploadedChunks = await checkChunkStatus(name, fileId, totalChunks); + uploadedCount = uploadedChunks.length; + if (uploadedCount > 0) { + addMessage(name, '系统', `发现服务器上已上传 ${uploadedCount}/${totalChunks} 分片`); + } + } catch (e) { + console.error('检查分片状态失败:', e); + // 检查失败,默认所有分片都未上传 + uploadedChunks = []; + uploadedCount = 0; + } + } + + // 更新上传进度 + updateUploadProgress(name, (uploadedCount / totalChunks) * 100); + + // 保存初始上传状态 + saveUploadState(name, fileId, { + fileName, + fileType, + fileSize, + totalChunks, + uploadedChunks, + fileId + }); + + // 逐片上传未上传的分片 + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { + // 跳过已上传的分片 + if (uploadedChunks.includes(chunkIndex)) { + console.log(`跳过已上传的分片: ${chunkIndex}`); + continue; + } + + const start = chunkIndex * chunkSize; + const end = Math.min(start + chunkSize, fileSize); + const chunkSizeActual = end - start; + + // 读取当前分片 + const chunkContent = await readFileChunk(file, start, end); + + // 构建分片上传消息 + const uploadMsg = JSON.stringify({ + action: 'upload_chunk', + file_id: fileId, + file_name: fileName, + file_type: fileType, + chunk_index: chunkIndex, + total_chunks: totalChunks, + chunk_size: chunkSizeActual, + file_size: fileSize, + file_content: chunkContent, + user_id: 1, + uniacid: 1, + }); + + // 发送分片 + if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) { + await new Promise((resolve, reject) => { + // 存储原始的 onmessage 处理函数 + const originalOnMessage = wsConnections[name].onmessage; + + // 临时替换 onmessage 处理函数,等待分片上传响应 + wsConnections[name].onmessage = (event) => { + // 先调用原始的 onmessage 处理函数 + if (originalOnMessage) { + originalOnMessage(event); + } + + try { + const response = JSON.parse(event.data); + if (response.type === 'chunk_uploaded' && response.file_id === fileId && response.chunk_index === chunkIndex) { + // 分片上传成功,更新已上传分片列表 + uploadedChunks.push(chunkIndex); + uploadedCount++; + // 保存上传状态 + saveUploadState(name, fileId, { + fileName, + fileType, + fileSize, + totalChunks, + uploadedChunks, + fileId + }); + resolve(); + } else if (response.type === 'error' && response.file_id === fileId) { + // 分片上传失败 + reject(new Error(response.message)); + } + } catch (e) { + // 非JSON响应,忽略 + } + }; + + // 发送分片 + wsConnections[name].send(uploadMsg); + + // 设置超时 + setTimeout(() => { + reject(new Error('分片上传超时')); + }, 30000); + }); + + // 更新上传进度 + const progress = ((uploadedCount) / totalChunks) * 100; + updateUploadProgress(name, progress); + } else { + throw new Error('WebSocket未连接,无法上传文件'); + } + } + + // 所有分片上传完成,发送合并请求 + const mergeMsg = JSON.stringify({ + action: 'upload_merge', + file_id: fileId, + file_name: fileName, + file_type: fileType, + total_chunks: totalChunks, + file_size: fileSize, + user_id: 1, + uniacid: 1, + }); + + if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) { + wsConnections[name].send(mergeMsg); + addMessage(name, '系统', '文件上传完成,正在处理...'); + // 清空文件输入 + fileInput.value = ''; + // 删除上传状态 + deleteUploadState(name, fileId); + } else { + throw new Error('WebSocket未连接,无法发送合并请求'); + } + } catch (error) { + addMessage(name, '系统', '文件上传失败: ' + error.message); + console.error('文件上传失败:', error); + // 上传失败,但保留上传状态,以便后续恢复 + } + }; + + // 恢复上传 + const resumeUpload = async (name, fileId) => { + const addon = addons.find(a => a.name === name); + if (!addon || addon.status !== 'connected') return; + + // 获取上传状态 + const state = getUploadState(name, fileId); + if (!state) { + addMessage(name, '系统', '未找到上传状态'); + return; + } + + // 显示上传进度 + updateUploadProgress(name, (state.uploadedChunks.length / state.totalChunks) * 100); + + try { + // 检查服务器上已上传的分片 + const uploadedChunks = await checkChunkStatus(name, fileId, state.totalChunks); + const uploadedCount = uploadedChunks.length; + addMessage(name, '系统', `恢复上传,已上传 ${uploadedCount}/${state.totalChunks} 分片`); + + // 保存上传状态 + saveUploadState(name, fileId, { + ...state, + uploadedChunks + }); + + // 这里需要用户重新选择文件,因为浏览器安全限制,无法直接访问之前的文件 + addMessage(name, '系统', '请重新选择文件以恢复上传'); + } catch (e) { + addMessage(name, '系统', '恢复上传失败: ' + e.message); + console.error('恢复上传失败:', e); + } + }; + // 关闭所有连接 const closeAllConnections = () => { Object.keys(wsConnections).forEach(name => { @@ -483,7 +853,8 @@ chatAreas, setWebsocketUrl, initConnections, - sendMessage + sendMessage, + uploadFile }; } }).mount('#app'); diff --git a/src/app/api/controller/WebSocketBase.php b/src/app/api/controller/WebSocketBase.php index a467d5213..1fdc96831 100644 --- a/src/app/api/controller/WebSocketBase.php +++ b/src/app/api/controller/WebSocketBase.php @@ -14,16 +14,7 @@ abstract class WebSocketBase implements MessageComponentInterface protected $clients; protected $clientData; protected $addonName; - - // 控制器属性 - public $params; - public $token; - protected $user_id; - protected $site_id; - protected $uniacid; - protected $site_ids = []; - public $app_type; - + /** * 构造函数 */ @@ -33,13 +24,6 @@ abstract class WebSocketBase implements MessageComponentInterface $this->clientData = []; $this->addonName = $addonName; - // 初始化控制器属性 - $this->params = []; - $this->token = ''; - $this->user_id = 0; - $this->site_id = 0; - $this->uniacid = 0; - $this->app_type = 'weapp'; // 默认微信小程序 } /** @@ -159,7 +143,7 @@ abstract class WebSocketBase implements MessageComponentInterface // 子类可以重写此方法来实现更严格的认证逻辑 $devMode = true; // 开发模式下,关闭严格认证 if (!$devMode) { - $this->doAuth($conn, $site_id, $user_id, $token); + $this->doAuth($conn, $data); } $this->clientData[$conn->resourceId]['site_id'] = $site_id; @@ -176,11 +160,9 @@ abstract class WebSocketBase implements MessageComponentInterface /** * 实际的认证逻辑,子类可以重写此方法 * @param ConnectionInterface $conn - * @param int $site_id - * @param int $user_id - * @param string $token + * @param array $data 认证数据 */ - protected function doAuth(ConnectionInterface $conn, $site_id, $user_id, $token) + protected function doAuth(ConnectionInterface $conn, $data) { // 默认实现,子类应该重写此方法 // 这里可以添加更严格的认证逻辑,例如验证token的有效性