fix(websocket): 统一WebSocket与Kefu的请求参数处理

调整WebSocket.php和前端测试页面的参数命名和处理逻辑,使其与Kefu.php保持一致:
1. 将message参数改为query
2. 增加response_mode参数
3. 统一参数优先级处理
4. 优化流式响应判断逻辑
This commit is contained in:
2026-01-21 15:49:31 +08:00
parent 96b61ba533
commit 673678a0e4
2 changed files with 38 additions and 28 deletions

View File

@@ -97,18 +97,18 @@ class WebSocket extends WebSocketBase
return; return;
} }
// 处理聊天消息
if (isset($data['action']) && $data['action'] === 'chat') {
$this->handleChat($conn, $data);
return;
}
// 处理心跳 // 处理心跳
if (isset($data['action']) && $data['action'] === 'ping') { if (isset($data['action']) && $data['action'] === 'ping') {
$conn->send(json_encode(['type' => 'pong'])); $conn->send(json_encode(['type' => 'pong']));
return; return;
} }
// 处理聊天消息
if (isset($data['action']) && $data['action'] === 'chat') {
$this->handleChat($conn, $data);
return;
}
$conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action'])); $conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action']));
} catch (\Exception $e) { } catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()])); $conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
@@ -157,6 +157,7 @@ class WebSocket extends WebSocketBase
*/ */
protected function doAuth(ConnectionInterface $conn, $site_id, $member_id, $token) protected function doAuth(ConnectionInterface $conn, $site_id, $member_id, $token)
{ {
// 与 Kefu.php 保持一致,支持使用 uniacid 作为站点ID
$site_id = (int)$site_id; $site_id = (int)$site_id;
$member_id = (int)$member_id; $member_id = (int)$member_id;
$token = (string)$token; $token = (string)$token;
@@ -232,42 +233,50 @@ class WebSocket extends WebSocketBase
try { try {
$clientInfo = $this->clientData[$conn->resourceId]; $clientInfo = $this->clientData[$conn->resourceId];
// 获取请求参数 // 获取请求参数,与 Kefu.php 保持一致
$message = $data['message'] ?? ''; $query = $data['query'] ?? $data['message'] ?? '';
$user_id = $data['user_id'] ?? $clientInfo['member_id']; $user_id = $data['user_id'] ?? $clientInfo['member_id'];
$conversation_id = $data['conversation_id'] ?? ''; $conversation_id = $data['conversation_id'] ?? '';
$stream = $data['stream'] ?? true; // WebSocket默认使用流式响应 $stream = $data['stream'] ?? false;
$response_mode = $data['response_mode'] ?? 'streaming'; // 与 Kefu.php 保持一致
// 设置当前控制器的属性 // 设置当前控制器的属性,与 Kefu.php 保持一致的参数优先级
$this->site_id = $clientInfo['site_id']; $this->site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$this->member_id = $clientInfo['member_id']; $this->member_id = $data['member_id'] ?? $clientInfo['member_id'];
$this->token = $clientInfo['token']; $this->token = $data['token'] ?? $clientInfo['token'];
$this->params = [ $this->params = [
'message' => $message, 'query' => $query,
'user_id' => $user_id, 'user_id' => $user_id,
'conversation_id' => $conversation_id, 'conversation_id' => $conversation_id,
'stream' => $stream, 'stream' => $stream,
'response_mode' => $response_mode,
'uniacid' => $this->site_id,
'member_id' => $this->member_id,
'token' => $this->token,
]; ];
// 验证参数并获取配置 // 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([ $config = $this->validateAndGetConfig([
'message' => ['required' => true, 'message' => '请求参数 `message` 不能为空. 为消息内容', 'description' => '消息内容'], 'query' => ['required' => true, 'message' => '参数错误,请检查 `query` 参数是否设置正确', 'description' => '消息内容'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID'] 'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
]); ]);
// 是否启用流式响应,与 Kefu.php 保持一致
$enable_stream = $stream || $response_mode == 'streaming';
// 构建请求数据和请求头 // 构建请求数据和请求头
$requestData = $this->buildRequestData($message, $user_id, $conversation_id, $stream); $requestData = $this->buildRequestData($query, $user_id, $conversation_id, $enable_stream);
$headers = $this->buildRequestHeaders($config['api_key']); $headers = $this->buildRequestHeaders($config['api_key']);
// 发送请求到Dify API // 发送请求到Dify API
$url = $config['base_url'] . $config['chat_endpoint']; $url = $config['base_url'] . $config['chat_endpoint'];
if ($stream) { if ($enable_stream) {
// 处理流式响应 // 处理流式响应
$this->handleStreamingResponse($conn, $url, $requestData, $headers, $message, $user_id); $this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id);
} else { } else {
// 处理非流式响应 // 处理非流式响应
$response = $this->handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id); $response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id);
$conn->send(json_encode(['type' => 'message', 'data' => $response])); $conn->send(json_encode(['type' => 'message', 'data' => $response]));
} }
} catch (\Exception $e) { } catch (\Exception $e) {
@@ -281,14 +290,14 @@ class WebSocket extends WebSocketBase
* @param string $url * @param string $url
* @param array $requestData * @param array $requestData
* @param array $headers * @param array $headers
* @param string $message * @param string $query
* @param string $user_id * @param string $user_id
*/ */
private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $message, $user_id) private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id)
{ {
try { try {
// 记录开始处理流式请求 // 记录开始处理流式请求
$this->log('AI客服WebSocket流式请求开始处理用户ID' . $user_id . ',请求消息:' . $message, 'info'); $this->log('AI客服WebSocket流式请求开始处理用户ID' . $user_id . ',请求消息:' . $query, 'info');
// 初始化模型 // 初始化模型
$kefu_conversation_model = new KefuConversationModel(); $kefu_conversation_model = new KefuConversationModel();
@@ -302,11 +311,11 @@ class WebSocket extends WebSocketBase
$real_user_message_id = ''; $real_user_message_id = '';
$assistant_content = ''; $assistant_content = '';
$user_message_saved = false; $user_message_saved = false;
$user_message_content = $message; $user_message_content = $query;
$temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID用于失败回滚 $temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID用于失败回滚
// 立即保存用户消息使用临时会话ID // 立即保存用户消息使用临时会话ID
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $user_message_content); $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $query);
$this->log('用户消息已立即保存临时会话ID' . $temp_conversation_id, 'info'); $this->log('用户消息已立即保存临时会话ID' . $temp_conversation_id, 'info');
// 创建或更新临时会话 // 创建或更新临时会话

View File

@@ -436,12 +436,13 @@
const message = addon.inputMessage.trim(); const message = addon.inputMessage.trim();
addMessage(name, '用户', message); addMessage(name, '用户', message);
// 发送聊天消息 // 发送聊天消息,与 WebSocket.php 保持一致使用 query 参数
const chatMsg = JSON.stringify({ const chatMsg = JSON.stringify({
action: 'chat', action: 'chat',
message: message, query: message,
user_id: 1, user_id: 1,
stream: true stream: true,
response_mode: 'streaming'
}); });
if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) { if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) {