From e000b61508070553f0cb88c148435881aa561792 Mon Sep 17 00:00:00 2001 From: ZF sun <34314687@qq.com> Date: Wed, 21 Jan 2026 13:49:16 +0800 Subject: [PATCH] refactor(WebSocket): Enhance authentication logic and improve message handling with transaction support --- src/addon/aikefu/api/controller/WebSocket.php | 159 +++++++++++++----- 1 file changed, 116 insertions(+), 43 deletions(-) diff --git a/src/addon/aikefu/api/controller/WebSocket.php b/src/addon/aikefu/api/controller/WebSocket.php index 841be52a5..c7c1d5ec2 100644 --- a/src/addon/aikefu/api/controller/WebSocket.php +++ b/src/addon/aikefu/api/controller/WebSocket.php @@ -6,9 +6,11 @@ use addon\aikefu\model\Config as KefuConfigModel; use addon\aikefu\model\Conversation as KefuConversationModel; use addon\aikefu\model\Message as KefuMessageModel; use app\api\controller\WebSocketBase; +use app\model\system\Api; use Ratchet\ConnectionInterface; use think\facade\Db as Db; use think\facade\Config; +use think\facade\Cache; use React\EventLoop\Loop; @@ -147,32 +149,69 @@ class WebSocket extends WebSocketBase } /** - * 处理客户端认证 - * @param ConnectionInterface $conn - * @param array $data + * 实际鉴权逻辑(复用 HTTP BaseApi::checkToken 同款规则) + * + * 说明: + * - WebSocket 连接场景没有 request()/input(),所以这里直接根据客户端传入的 site_id + token 解密校验 + * - 校验 token 解密成功、未过期、且 token 内 member_id 与传入 member_id 一致 */ - protected function handleAuth(ConnectionInterface $conn, $data) + protected function doAuth(ConnectionInterface $conn, $site_id, $member_id, $token) { - try { - $site_id = $data['site_id'] ?? null; - $member_id = $data['member_id'] ?? null; - $token = $data['token'] ?? null; + $site_id = (int)$site_id; + $member_id = (int)$member_id; + $token = (string)$token; - if (empty($site_id) || empty($member_id) || empty($token)) { - throw new \Exception('Missing authentication parameters'); + if ($site_id <= 0 || $member_id <= 0 || $token === '') { + throw new \Exception('Missing authentication parameters'); + } + + // 生成与 BaseApi::checkToken 一致的解密 key:private_key + 'site' . site_id(如启用 API 私钥) + $key = 'site' . $site_id; + $api_model = new Api(); + $api_config = $api_model->getApiConfig()['data'] ?? null; + if ( + !empty($api_config) && + !empty($api_config['is_use']) && + isset($api_config['value']['private_key']) && + !empty($api_config['value']['private_key']) + ) { + $key = $api_config['value']['private_key'] . $key; + } + + $decrypt = decrypt($token, $key); + if (empty($decrypt)) { + throw new \Exception('TOKEN_ERROR'); + } + + $data = json_decode($decrypt, true); + if (!is_array($data) || empty($data['member_id'])) { + throw new \Exception('TOKEN_ERROR'); + } + + // member_id 必须一致,避免冒用 + if ((int)$data['member_id'] !== $member_id) { + throw new \Exception('TOKEN_ERROR'); + } + + // 过期校验:expire_time=0 为永久,其余必须未过期 + $expire_time = (int)($data['expire_time'] ?? 0); + if ($expire_time !== 0 && $expire_time < time()) { + throw new \Exception('TOKEN_EXPIRE'); + } + + // 与 BaseApi 行为一致:临近过期时生成 refresh_token 放入缓存(可选,不强制给客户端) + if ($expire_time !== 0 && ($expire_time - time()) < 300 && !Cache::get('member_token' . $member_id)) { + try { + // WebSocket 场景不强制下发 refresh_token,但仍按原逻辑缓存,便于其他接口复用 + $refresh_token = encrypt(json_encode([ + 'member_id' => $member_id, + 'create_time' => time(), + 'expire_time' => $expire_time, + ]), $key); + Cache::set('member_token' . $member_id, $refresh_token, 360); + } catch (\Throwable $e) { + // 刷新失败不影响当前鉴权通过 } - - // 这里可以添加更严格的认证逻辑,例如验证token的有效性 - // 为了简单起见,我们暂时只检查参数是否存在 - - $this->clientData[$conn->resourceId]['site_id'] = $site_id; - $this->clientData[$conn->resourceId]['member_id'] = $member_id; - $this->clientData[$conn->resourceId]['token'] = $token; - $this->clientData[$conn->resourceId]['is_authenticated'] = true; - - $conn->send(json_encode(['type' => 'auth_success', 'message' => 'Authenticated successfully'])); - } catch (\Exception $e) { - $conn->send(json_encode(['type' => 'auth_error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()])); } } @@ -1086,35 +1125,69 @@ class WebSocket extends WebSocketBase */ private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id) { - // 发送请求 - $response = $this->curlRequest($url, 'POST', $requestData, $headers); - $response_data = json_decode($response, true); - // 初始化模型 $kefu_conversation_model = new KefuConversationModel(); $kefu_message_model = new KefuMessageModel(); $site_id = $this->site_id; $current_user_id = $this->member_id; - // 保存用户消息 - $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $conversation_id, '', $message); + // 开启事务,确保数据一致性(对齐 Kefu.php 的非流式存储行为) + Db::startTrans(); + try { + // 发送请求 + $response = $this->curlRequest($url, 'POST', $requestData, $headers); + $response_data = json_decode($response, true); - // 更新或创建会话 - $real_conversation_id = $response_data['conversation_id'] ?? $conversation_id; - $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \Exception('解析响应失败'); + } + if (empty($response_data) || !isset($response_data['conversation_id'])) { + throw new \Exception('API返回数据格式错误或缺少必要字段'); + } - // 保存助手消息 - $assistant_content = $response_data['answer'] ?? ''; - $real_assistant_message_id = $response_data['message_id'] ?? ''; - $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content); + $real_conversation_id = $response_data['conversation_id'] ?? $conversation_id; + $real_assistant_message_id = $response_data['message_id'] ?? ($response_data['id'] ?? ''); + $assistant_content = $response_data['answer'] ?? ''; - // 返回响应 - return [ - 'conversation_id' => $real_conversation_id, - 'message_id' => $real_assistant_message_id, - 'content' => $assistant_content, - 'answer' => $assistant_content, - 'status' => 'completed' - ]; + // 去重:用户消息(避免客户端重试导致重复写入) + $existing_user_message = $kefu_message_model->getMessageInfo([ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $real_conversation_id], + ['role', '=', 'user'], + ['content', '=', $message ?? ''] + ]); + if (empty($existing_user_message['data'])) { + $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $message ?? ''); + } + + // 去重:助手消息 + $existing_assistant_message = $kefu_message_model->getMessageInfo([ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $real_conversation_id], + ['role', '=', 'assistant'], + ['message_id', '=', $real_assistant_message_id] + ]); + if (empty($existing_assistant_message['data'])) { + $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content); + } + + // 更新或创建会话 + $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + + Db::commit(); + + return [ + 'conversation_id' => $real_conversation_id, + 'message_id' => $real_assistant_message_id, + 'content' => $assistant_content, + 'answer' => $assistant_content, + 'status' => 'completed' + ]; + } catch (\Exception $e) { + Db::rollback(); + throw $e; + } } } \ No newline at end of file