refactor(WebSocket): Enhance authentication logic and improve message handling with transaction support

This commit is contained in:
2026-01-21 13:49:16 +08:00
parent 31456469a3
commit e000b61508

View File

@@ -6,9 +6,11 @@ use addon\aikefu\model\Config as KefuConfigModel;
use addon\aikefu\model\Conversation as KefuConversationModel;
use addon\aikefu\model\Message as KefuMessageModel;
use app\api\controller\WebSocketBase;
use app\model\system\Api;
use Ratchet\ConnectionInterface;
use think\facade\Db as Db;
use think\facade\Config;
use think\facade\Cache;
use React\EventLoop\Loop;
@@ -147,32 +149,69 @@ class WebSocket extends WebSocketBase
}
/**
* 处理客户端认证
* @param ConnectionInterface $conn
* @param array $data
* 实际鉴权逻辑(复用 HTTP BaseApi::checkToken 同款规则)
*
* 说明:
* - WebSocket 连接场景没有 request()/input(),所以这里直接根据客户端传入的 site_id + token 解密校验
* - 校验 token 解密成功、未过期、且 token 内 member_id 与传入 member_id 一致
*/
protected function handleAuth(ConnectionInterface $conn, $data)
protected function doAuth(ConnectionInterface $conn, $site_id, $member_id, $token)
{
try {
$site_id = $data['site_id'] ?? null;
$member_id = $data['member_id'] ?? null;
$token = $data['token'] ?? null;
$site_id = (int)$site_id;
$member_id = (int)$member_id;
$token = (string)$token;
if (empty($site_id) || empty($member_id) || empty($token)) {
throw new \Exception('Missing authentication parameters');
if ($site_id <= 0 || $member_id <= 0 || $token === '') {
throw new \Exception('Missing authentication parameters');
}
// 生成与 BaseApi::checkToken 一致的解密 keyprivate_key + 'site' . site_id如启用 API 私钥)
$key = 'site' . $site_id;
$api_model = new Api();
$api_config = $api_model->getApiConfig()['data'] ?? null;
if (
!empty($api_config) &&
!empty($api_config['is_use']) &&
isset($api_config['value']['private_key']) &&
!empty($api_config['value']['private_key'])
) {
$key = $api_config['value']['private_key'] . $key;
}
$decrypt = decrypt($token, $key);
if (empty($decrypt)) {
throw new \Exception('TOKEN_ERROR');
}
$data = json_decode($decrypt, true);
if (!is_array($data) || empty($data['member_id'])) {
throw new \Exception('TOKEN_ERROR');
}
// member_id 必须一致,避免冒用
if ((int)$data['member_id'] !== $member_id) {
throw new \Exception('TOKEN_ERROR');
}
// 过期校验expire_time=0 为永久,其余必须未过期
$expire_time = (int)($data['expire_time'] ?? 0);
if ($expire_time !== 0 && $expire_time < time()) {
throw new \Exception('TOKEN_EXPIRE');
}
// 与 BaseApi 行为一致:临近过期时生成 refresh_token 放入缓存(可选,不强制给客户端)
if ($expire_time !== 0 && ($expire_time - time()) < 300 && !Cache::get('member_token' . $member_id)) {
try {
// WebSocket 场景不强制下发 refresh_token但仍按原逻辑缓存便于其他接口复用
$refresh_token = encrypt(json_encode([
'member_id' => $member_id,
'create_time' => time(),
'expire_time' => $expire_time,
]), $key);
Cache::set('member_token' . $member_id, $refresh_token, 360);
} catch (\Throwable $e) {
// 刷新失败不影响当前鉴权通过
}
// 这里可以添加更严格的认证逻辑例如验证token的有效性
// 为了简单起见,我们暂时只检查参数是否存在
$this->clientData[$conn->resourceId]['site_id'] = $site_id;
$this->clientData[$conn->resourceId]['member_id'] = $member_id;
$this->clientData[$conn->resourceId]['token'] = $token;
$this->clientData[$conn->resourceId]['is_authenticated'] = true;
$conn->send(json_encode(['type' => 'auth_success', 'message' => 'Authenticated successfully']));
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'auth_error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
@@ -1086,35 +1125,69 @@ class WebSocket extends WebSocketBase
*/
private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id)
{
// 发送请求
$response = $this->curlRequest($url, 'POST', $requestData, $headers);
$response_data = json_decode($response, true);
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->member_id;
// 保存用户消息
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $conversation_id, '', $message);
// 开启事务,确保数据一致性(对齐 Kefu.php 的非流式存储行为)
Db::startTrans();
try {
// 发送请求
$response = $this->curlRequest($url, 'POST', $requestData, $headers);
$response_data = json_decode($response, true);
// 更新或创建会话
$real_conversation_id = $response_data['conversation_id'] ?? $conversation_id;
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('解析响应失败');
}
if (empty($response_data) || !isset($response_data['conversation_id'])) {
throw new \Exception('API返回数据格式错误或缺少必要字段');
}
// 保存助手消息
$assistant_content = $response_data['answer'] ?? '';
$real_assistant_message_id = $response_data['message_id'] ?? '';
$this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content);
$real_conversation_id = $response_data['conversation_id'] ?? $conversation_id;
$real_assistant_message_id = $response_data['message_id'] ?? ($response_data['id'] ?? '');
$assistant_content = $response_data['answer'] ?? '';
// 返回响应
return [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
'answer' => $assistant_content,
'status' => 'completed'
];
// 去重:用户消息(避免客户端重试导致重复写入)
$existing_user_message = $kefu_message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $real_conversation_id],
['role', '=', 'user'],
['content', '=', $message ?? '']
]);
if (empty($existing_user_message['data'])) {
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $message ?? '');
}
// 去重:助手消息
$existing_assistant_message = $kefu_message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $real_conversation_id],
['role', '=', 'assistant'],
['message_id', '=', $real_assistant_message_id]
]);
if (empty($existing_assistant_message['data'])) {
$this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content);
}
// 更新或创建会话
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id);
Db::commit();
return [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
'answer' => $assistant_content,
'status' => 'completed'
];
} catch (\Exception $e) {
Db::rollback();
throw $e;
}
}
}