From 9ba444dbe7eb42fdeeb9ac8bc97bec0b4e2c84c6 Mon Sep 17 00:00:00 2001 From: ZF sun <34314687@qq.com> Date: Wed, 10 Dec 2025 16:01:14 +0800 Subject: [PATCH] =?UTF-8?q?chore(addon/aikefu):=20=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=AF=B9=E8=AF=9D=E4=BF=A1=E6=81=AF=E5=AD=98=E5=82=A8=E5=88=B0?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/addon/aikefu/api/controller/Kefu.php | 321 ++++++++++++++++++----- src/addon/aikefu/data/install.sql | 9 +- src/addon/aikefu/model/Message.php | 8 +- 3 files changed, 275 insertions(+), 63 deletions(-) diff --git a/src/addon/aikefu/api/controller/Kefu.php b/src/addon/aikefu/api/controller/Kefu.php index 8e23b44dd..0fc1aea56 100644 --- a/src/addon/aikefu/api/controller/Kefu.php +++ b/src/addon/aikefu/api/controller/Kefu.php @@ -83,6 +83,8 @@ class Kefu extends BaseApi if (curl_errno($ch)) { $error = curl_error($ch); + $this->log('Curl请求错误:' . $error, 'error'); + if (is_callable($on_error)) { $on_error($error); } else { @@ -346,12 +348,89 @@ class Kefu extends BaseApi // 更新会话状态 $conversation_model->updateConversation([ 'status' => 1, + 'update_time' => date('Y-m-d H:i:s') ], [ ['id', '=', $conversation_info['data']['id']], ]); } } + /** + * 实时保存助手消息内容(流式过程中) + * @param KefuMessageModel $message_model 消息模型 + * @param string $site_id 站点ID + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @param string $message_id 消息ID + * @param string $content 消息内容 + * @param string $status 消息状态:streaming(流式中)、completed(已完成) + * @return void + */ + private function saveStreamingAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content, $status = 'streaming') + { + // 检查是否已存在该消息(用于更新) + $existing_message = $message_model->getMessageInfo([ + ['site_id', '=', $site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $conversation_id], + ['message_id', '=', $message_id], + ['role', '=', 'assistant'] + ]); + + $message_data = [ + 'site_id' => $site_id, + 'user_id' => $user_id, + 'conversation_id' => $conversation_id, + 'message_id' => $message_id, + 'role' => 'assistant', + 'content' => $content, + 'status' => $status, // 新增状态字段 + 'update_time' => date('Y-m-d H:i:s') + ]; + + if (empty($existing_message['data'])) { + // 新增消息 + $message_model->addMessage($message_data); + } else { + // 更新消息内容 + $message_model->updateMessage($message_data, [ + ['site_id', '=', $site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $conversation_id], + ['message_id', '=', $message_id], + ['role', '=', 'assistant'] + ]); + } + } + + /** + * 清理临时消息和会话 + * @param KefuMessageModel $message_model 消息模型 + * @param KefuConversationModel $conversation_model 会话模型 + * @param string $site_id 站点ID + * @param string $user_id 用户ID + * @param string $temp_conversation_id 临时会话ID + * @return void + */ + private function cleanupTempData($message_model, $conversation_model, $site_id, $user_id, $temp_conversation_id) + { + // 删除临时消息 + $message_model->deleteMessage([ + ['site_id', '=', $site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $temp_conversation_id] + ]); + + // 删除临时会话 + $conversation_model->deleteConversation([ + ['site_id', '=', $site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $temp_conversation_id] + ]); + + $this->log('临时数据已清理,会话ID:' . $temp_conversation_id, 'info'); + } + /** * 日志记录封装方法 * @param string $message 日志内容 @@ -385,10 +464,19 @@ class Kefu extends BaseApi $assistant_content = ''; $user_message_saved = false; $user_message_content = $message; + $temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID,用于失败回滚 try { + // 立即保存用户消息,使用临时会话ID + $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $user_message_content); + $this->log('用户消息已立即保存,临时会话ID:' . $temp_conversation_id, 'info'); + + // 创建或更新临时会话 + $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id); + $this->log('临时会话已创建,ID:' . $temp_conversation_id, 'info'); + // 数据处理回调函数 - $on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id) { + $on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) { try { // 记录接收到的数据块 @@ -421,62 +509,97 @@ class Kefu extends BaseApi switch ($event_data['event']) { case 'message': // LLM返回文本块事件 - if (isset($event_data['data']['conversation_id'])) { - $real_conversation_id = $event_data['data']['conversation_id']; + if (isset($event_data['conversation_id'])) { + $real_conversation_id = $event_data['conversation_id']; $this->log('获取到会话ID:' . $real_conversation_id, 'info'); } - if (isset($event_data['data']['message_id'])) { - $real_assistant_message_id = $event_data['data']['message_id']; + if (isset($event_data['message_id'])) { + $real_assistant_message_id = $event_data['message_id']; $this->log('获取到助手消息ID:' . $real_assistant_message_id, 'info'); } // 积累助手回复内容 - if (isset($event_data['data']['answer'])) { - $assistant_content .= $event_data['data']['answer']; - $this->log('积累助手回复内容:' . $event_data['data']['answer'], 'debug'); + if (isset($event_data['answer'])) { + $assistant_content .= $event_data['answer']; + $this->log('积累助手回复内容:' . $event_data['answer'], 'debug'); + + // 实时保存助手回复内容(流式过程中) + if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) { + $this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming'); + $this->log('助手回复内容已实时保存,字数:' . strlen($assistant_content), 'debug'); + } } - // 保存用户消息(仅在第一次获取到会话ID时保存) + // 更新用户消息的会话ID(仅在第一次获取到真实会话ID时) if (!$user_message_saved && !empty($real_conversation_id)) { - // 使用Dify返回的真实会话ID保存用户消息 - $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); - $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + // 将临时会话ID更新为真实会话ID + $kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $temp_conversation_id], + ['role', '=', 'user'] + ]); + + $kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $temp_conversation_id] + ]); + $user_message_saved = true; - $this->log('用户消息已保存,会话ID:' . $real_conversation_id, 'info'); + $this->log('用户消息会话ID已更新为真实ID:' . $real_conversation_id, 'info'); } break; case 'agent_message': // Agent模式下返回文本块事件 - if (isset($event_data['data']['conversation_id'])) { - $real_conversation_id = $event_data['data']['conversation_id']; + if (isset($event_data['conversation_id'])) { + $real_conversation_id = $event_data['conversation_id']; $this->log('获取到Agent模式会话ID:' . $real_conversation_id, 'info'); } - if (isset($event_data['data']['message_id'])) { - $real_assistant_message_id = $event_data['data']['message_id']; + if (isset($event_data['message_id'])) { + $real_assistant_message_id = $event_data['message_id']; $this->log('获取到Agent模式助手消息ID:' . $real_assistant_message_id, 'info'); } // 积累助手回复内容 - if (isset($event_data['data']['answer'])) { - $assistant_content .= $event_data['data']['answer']; - $this->log('积累Agent回复内容:' . $event_data['data']['answer'], 'debug'); + if (isset($event_data['answer'])) { + $assistant_content .= $event_data['answer']; + $this->log('积累Agent回复内容:' . $event_data['answer'], 'debug'); + + // 实时保存助手回复内容(Agent模式流式过程中) + if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) { + $this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming'); + $this->log('Agent模式助手回复内容已实时保存,字数:' . strlen($assistant_content), 'debug'); + } } - // 保存用户消息(仅在第一次获取到会话ID时保存) + // 更新用户消息的会话ID(仅在第一次获取到真实会话ID时) if (!$user_message_saved && !empty($real_conversation_id)) { - $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); - $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + // 将临时会话ID更新为真实会话ID + $kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $temp_conversation_id], + ['role', '=', 'user'] + ]); + + $kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [ + ['site_id', '=', $site_id], + ['user_id', '=', $current_user_id], + ['conversation_id', '=', $temp_conversation_id] + ]); + $user_message_saved = true; - $this->log('Agent模式下用户消息已保存,会话ID:' . $real_conversation_id, 'info'); + $this->log('Agent模式下用户消息会话ID已更新为真实ID:' . $real_conversation_id, 'info'); } break; case 'agent_thought': - if (isset($event_data['data']['thought'])) { + if (isset($event_data['thought'])) { // 格式化思考过程 - $thought_content = "\n[思考过程]: " . $event_data['data']['thought']; - if (isset($event_data['data']['tool'])) { - $thought_content .= "\n[使用工具]: " . $event_data['data']['tool']; + $thought_content = "\n[思考过程]: " . $event_data['thought']; + if (isset($event_data['tool'])) { + $thought_content .= "\n[使用工具]: " . $event_data['tool']; } - if (isset($event_data['data']['observation'])) { - $thought_content .= "\n[观察结果]: " . $event_data['data']['observation']; + if (isset($event_data['observation'])) { + $thought_content .= "\n[观察结果]: " . $event_data['observation']; } $assistant_content .= $thought_content; $this->log('Agent思考过程:' . $thought_content, 'debug'); @@ -484,10 +607,10 @@ class Kefu extends BaseApi break; case 'file': - if (isset($event_data['data']['id']) && isset($event_data['data']['type']) && isset($event_data['data']['url'])) { - $file_id = $event_data['data']['id']; - $file_type = $event_data['data']['type']; - $file_url = $event_data['data']['url']; + if (isset($event_data['id']) && isset($event_data['type']) && isset($event_data['url'])) { + $file_id = $event_data['id']; + $file_type = $event_data['type']; + $file_url = $event_data['url']; // 可以将文件信息作为特殊内容添加到回复中 $file_content = "\n[文件]: " . $file_type . " - " . $file_url; $assistant_content .= $file_content; @@ -496,16 +619,22 @@ class Kefu extends BaseApi break; case 'message_start': - if (isset($event_data['data']['conversation_id'])) { - $real_conversation_id = $event_data['data']['conversation_id']; + if (isset($event_data['conversation_id'])) { + $real_conversation_id = $event_data['conversation_id']; $this->log('消息开始事件,会话ID:' . $real_conversation_id, 'info'); } break; case 'message_delta': - if (isset($event_data['data']['delta']['content'])) { - $assistant_content .= $event_data['data']['delta']['content']; - $this->log('积累增量内容:' . $event_data['data']['delta']['content'], 'debug'); + if (isset($event_data['delta']['content'])) { + $assistant_content .= $event_data['delta']['content']; + $this->log('积累增量内容:' . $event_data['delta']['content'], 'debug'); + + // 实时保存助手回复内容(增量流式过程中) + if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) { + $this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming'); + $this->log('助手增量回复内容已实时保存,字数:' . strlen($assistant_content), 'debug'); + } } break; @@ -527,15 +656,15 @@ class Kefu extends BaseApi break; case 'message_replace': - if (isset($event_data['data']['answer'])) { + if (isset($event_data['answer'])) { // 直接替换所有回复文本 - $assistant_content = $event_data['data']['answer']; - $this->log('消息内容替换为:' . $event_data['data']['answer'], 'debug'); + $assistant_content = $event_data['answer']; + $this->log('消息内容替换为:' . $event_data['answer'], 'debug'); } break; case 'error': - $error_message = isset($event_data['data']['message']) ? $event_data['data']['message'] : '流式输出异常'; + $error_message = isset($event_data['message']) ? $event_data['message'] : '流式输出异常'; $assistant_content .= "\n[错误]: " . $error_message; $this->log('AI客服错误事件:' . $error_message, 'error'); break; @@ -572,14 +701,14 @@ class Kefu extends BaseApi break; case 'workflow_node_start': - if (isset($event_data['data']['node_id']) && isset($event_data['data']['node_name'])) { - $this->log('工作流节点开始:' . $event_data['data']['node_name'], 'debug'); + if (isset($event_data['node_id']) && isset($event_data['node_name'])) { + $this->log('工作流节点开始:' . $event_data['node_name'], 'debug'); } break; case 'workflow_node_end': - if (isset($event_data['data']['node_id']) && isset($event_data['data']['outputs'])) { - $this->log('工作流节点结束:' . $event_data['data']['node_id'], 'debug'); + if (isset($event_data['node_id']) && isset($event_data['outputs'])) { + $this->log('工作流节点结束:' . $event_data['node_id'], 'debug'); } break; @@ -630,18 +759,25 @@ class Kefu extends BaseApi flush(); $this->log('发送done和close事件,会话ID:' . $real_conversation_id, 'info'); - // 保存助手的完整回复 + // 流式正常完成,标记助手消息为已完成状态 if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) { - $result = $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content); - if ($result) { - $this->log('AI客服回复保存成功,会话ID:' . $real_conversation_id, 'info'); - } else { - $this->log('AI客服回复保存失败,会话ID:' . $real_conversation_id, 'error'); - } + $this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'completed'); + $this->log('AI客服回复已标记为完成状态,会话ID:' . $real_conversation_id . ',总字数:' . strlen($assistant_content), 'info'); } + + // 清理临时数据 + $this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id); + } catch (Exception $e) { $error_msg = 'AI客服请求异常:' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile(); $this->log($error_msg, 'error'); + + // 异常时清理临时数据 + try { + $this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id); + } catch (Exception $cleanupException) { + $this->log('清理临时数据时也发生异常:' . $cleanupException->getMessage(), 'error'); + } } // 流式响应处理完成后必须退出脚本,避免EventSource协议错误 @@ -703,6 +839,66 @@ class Kefu extends BaseApi ]; } + /** + * 获取流式消息状态 + * @return \think\response\Json + */ + public function getMessageStatus() + { + // 获取请求参数 + $conversation_id = $this->params['conversation_id'] ?? ''; + $user_id = $this->params['user_id'] ?? $this->member_id; + + // 验证参数 + if (empty($conversation_id)) { + return $this->response($this->error('会话ID不能为空')); + } + + try { + $kefu_message_model = new KefuMessageModel(); + + // 获取所有状态的消息统计 + $status_stats = []; + $statuses = ['streaming', 'completed', 'failed']; + + foreach ($statuses as $status) { + $count = $this->db->name('aikefu_message') + ->where([ + ['site_id', '=', $this->site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $conversation_id], + ['role', '=', 'assistant'], + ['status', '=', $status] + ]) + ->count(); + + $status_stats[$status] = $count; + } + + // 获取最近的失败消息 + $failed_messages = $kefu_message_model->getMessageList([ + ['site_id', '=', $this->site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $conversation_id], + ['status', '=', 'failed'] + ], 'id, content, create_time', 'create_time desc', 5); + + return $this->response($this->success([ + 'conversation_id' => $conversation_id, + 'status_stats' => $status_stats, + 'failed_messages' => $failed_messages['data'] ?? [], + 'summary' => [ + 'total_assistant_messages' => array_sum($status_stats), + 'completed_messages' => $status_stats['completed'] ?? 0, + 'streaming_messages' => $status_stats['streaming'] ?? 0, + 'failed_messages' => $status_stats['failed'] ?? 0 + ] + ])); + } catch (\Exception $e) { + return $this->response($this->error('请求失败:' . $e->getMessage())); + } + } + /** * 获取会话历史 * @return \think\response\Json @@ -721,13 +917,15 @@ class Kefu extends BaseApi } try { - // 获取会话历史记录 + // 获取会话历史记录(只返回已完成的消息,过滤掉失败或进行中的消息) $kefu_message_model = new KefuMessageModel(); - $message_list = $kefu_message_model->getMessageList([ - ['site_id', '=', $this->site_id], - ['user_id', '=', $user_id], - ['conversation_id', '=', $conversation_id], - ], 'id, role, content, create_time', 'create_time asc', $limit, $offset); + $message_list = $kefu_message_model->getConversationMessages( + $this->site_id, + $conversation_id, + $limit, + $offset, + 'completed' // 只返回已完成的消息 + ); // 返回成功响应,保持与Dify API风格一致 return $this->response($this->success([ @@ -736,7 +934,8 @@ class Kefu extends BaseApi 'page_info' => [ 'limit' => $limit, 'offset' => $offset - ] + ], + 'status_filter' => 'completed' // 说明当前过滤状态 ])); } catch (\Exception $e) { return $this->response($this->error('请求失败:' . $e->getMessage())); diff --git a/src/addon/aikefu/data/install.sql b/src/addon/aikefu/data/install.sql index 3167b610f..e7e454db2 100644 --- a/src/addon/aikefu/data/install.sql +++ b/src/addon/aikefu/data/install.sql @@ -17,6 +17,9 @@ CREATE TABLE IF NOT EXISTS `lucky_aikefu_conversation` ( KEY `user_id` (`user_id`), KEY `conversation_id` (`conversation_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服会话表'; +-- 索引 +ALTER TABLE `lucky_aikefu_conversation` +ADD INDEX `idx_status` (`status`); -- 创建智能客服消息表 CREATE TABLE IF NOT EXISTS `lucky_aikefu_message` ( @@ -27,10 +30,14 @@ CREATE TABLE IF NOT EXISTS `lucky_aikefu_message` ( `message_id` varchar(100) NOT NULL COMMENT '消息ID', `role` varchar(20) NOT NULL COMMENT '角色:user用户,assistant助手', `content` text NOT NULL COMMENT '消息内容', + `status` varchar(20) NOT NULL DEFAULT 'completed' COMMENT '消息状态:streaming(流式中), completed(已完成), failed(失败)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `site_id` (`site_id`), KEY `user_id` (`user_id`), KEY `conversation_id` (`conversation_id`), KEY `message_id` (`message_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服消息表'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服消息表'; +-- 索引 +ALTER TABLE `lucky_aikefu_message` +ADD INDEX `idx_status` (`status`); \ No newline at end of file diff --git a/src/addon/aikefu/model/Message.php b/src/addon/aikefu/model/Message.php index d98e4650a..654851887 100644 --- a/src/addon/aikefu/model/Message.php +++ b/src/addon/aikefu/model/Message.php @@ -178,15 +178,21 @@ class Message extends Model * @param string $conversation_id * @param int $limit * @param int $offset + * @param string $status 状态过滤 * @return array */ - public function getConversationMessages($site_id, $conversation_id, $limit = 20, $offset = 0) + public function getConversationMessages($site_id, $conversation_id, $limit = 20, $offset = 0, $status = '') { $condition = [ ['site_id', '=', $site_id], ['conversation_id', '=', $conversation_id] ]; + // 添加状态过滤 + if (!empty($status)) { + $condition[] = ['status', '=', $status]; + } + return $this->getMessageList($condition, '*', 'create_time asc', ($offset / $limit) + 1, $limit); }