chore(addon/aikefu): 流式对话信息存储到数据库中

This commit is contained in:
2025-12-10 16:01:14 +08:00
parent 357a479571
commit 9ba444dbe7
3 changed files with 275 additions and 63 deletions

View File

@@ -83,6 +83,8 @@ class Kefu extends BaseApi
if (curl_errno($ch)) { if (curl_errno($ch)) {
$error = curl_error($ch); $error = curl_error($ch);
$this->log('Curl请求错误' . $error, 'error');
if (is_callable($on_error)) { if (is_callable($on_error)) {
$on_error($error); $on_error($error);
} else { } else {
@@ -346,12 +348,89 @@ class Kefu extends BaseApi
// 更新会话状态 // 更新会话状态
$conversation_model->updateConversation([ $conversation_model->updateConversation([
'status' => 1, 'status' => 1,
'update_time' => date('Y-m-d H:i:s')
], [ ], [
['id', '=', $conversation_info['data']['id']], ['id', '=', $conversation_info['data']['id']],
]); ]);
} }
} }
/**
* 实时保存助手消息内容(流式过程中)
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @param string $status 消息状态streaming流式中、completed已完成
* @return void
*/
private function saveStreamingAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content, $status = 'streaming')
{
// 检查是否已存在该消息(用于更新)
$existing_message = $message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['message_id', '=', $message_id],
['role', '=', 'assistant']
]);
$message_data = [
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'assistant',
'content' => $content,
'status' => $status, // 新增状态字段
'update_time' => date('Y-m-d H:i:s')
];
if (empty($existing_message['data'])) {
// 新增消息
$message_model->addMessage($message_data);
} else {
// 更新消息内容
$message_model->updateMessage($message_data, [
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['message_id', '=', $message_id],
['role', '=', 'assistant']
]);
}
}
/**
* 清理临时消息和会话
* @param KefuMessageModel $message_model 消息模型
* @param KefuConversationModel $conversation_model 会话模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $temp_conversation_id 临时会话ID
* @return void
*/
private function cleanupTempData($message_model, $conversation_model, $site_id, $user_id, $temp_conversation_id)
{
// 删除临时消息
$message_model->deleteMessage([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $temp_conversation_id]
]);
// 删除临时会话
$conversation_model->deleteConversation([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$this->log('临时数据已清理会话ID' . $temp_conversation_id, 'info');
}
/** /**
* 日志记录封装方法 * 日志记录封装方法
* @param string $message 日志内容 * @param string $message 日志内容
@@ -385,10 +464,19 @@ class Kefu extends BaseApi
$assistant_content = ''; $assistant_content = '';
$user_message_saved = false; $user_message_saved = false;
$user_message_content = $message; $user_message_content = $message;
$temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID用于失败回滚
try { try {
// 立即保存用户消息使用临时会话ID
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $user_message_content);
$this->log('用户消息已立即保存临时会话ID' . $temp_conversation_id, 'info');
// 创建或更新临时会话
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('临时会话已创建ID' . $temp_conversation_id, 'info');
// 数据处理回调函数 // 数据处理回调函数
$on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id) { $on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) {
try { try {
// 记录接收到的数据块 // 记录接收到的数据块
@@ -421,62 +509,97 @@ class Kefu extends BaseApi
switch ($event_data['event']) { switch ($event_data['event']) {
case 'message': case 'message':
// LLM返回文本块事件 // LLM返回文本块事件
if (isset($event_data['data']['conversation_id'])) { if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id']; $real_conversation_id = $event_data['conversation_id'];
$this->log('获取到会话ID' . $real_conversation_id, 'info'); $this->log('获取到会话ID' . $real_conversation_id, 'info');
} }
if (isset($event_data['data']['message_id'])) { if (isset($event_data['message_id'])) {
$real_assistant_message_id = $event_data['data']['message_id']; $real_assistant_message_id = $event_data['message_id'];
$this->log('获取到助手消息ID' . $real_assistant_message_id, 'info'); $this->log('获取到助手消息ID' . $real_assistant_message_id, 'info');
} }
// 积累助手回复内容 // 积累助手回复内容
if (isset($event_data['data']['answer'])) { if (isset($event_data['answer'])) {
$assistant_content .= $event_data['data']['answer']; $assistant_content .= $event_data['answer'];
$this->log('积累助手回复内容:' . $event_data['data']['answer'], 'debug'); $this->log('积累助手回复内容:' . $event_data['answer'], 'debug');
// 实时保存助手回复内容(流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('助手回复内容已实时保存,字数:' . strlen($assistant_content), 'debug');
}
} }
// 保存用户消息仅在第一次获取到会话ID时保存 // 更新用户消息的会话ID(仅在第一次获取到真实会话ID时
if (!$user_message_saved && !empty($real_conversation_id)) { if (!$user_message_saved && !empty($real_conversation_id)) {
// 使用Dify返回的真实会话ID保存用户消息 // 将临时会话ID更新为真实会话ID
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); $kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); ['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true; $user_message_saved = true;
$this->log('用户消息已保存,会话ID' . $real_conversation_id, 'info'); $this->log('用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
} }
break; break;
case 'agent_message': case 'agent_message':
// Agent模式下返回文本块事件 // Agent模式下返回文本块事件
if (isset($event_data['data']['conversation_id'])) { if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id']; $real_conversation_id = $event_data['conversation_id'];
$this->log('获取到Agent模式会话ID' . $real_conversation_id, 'info'); $this->log('获取到Agent模式会话ID' . $real_conversation_id, 'info');
} }
if (isset($event_data['data']['message_id'])) { if (isset($event_data['message_id'])) {
$real_assistant_message_id = $event_data['data']['message_id']; $real_assistant_message_id = $event_data['message_id'];
$this->log('获取到Agent模式助手消息ID' . $real_assistant_message_id, 'info'); $this->log('获取到Agent模式助手消息ID' . $real_assistant_message_id, 'info');
} }
// 积累助手回复内容 // 积累助手回复内容
if (isset($event_data['data']['answer'])) { if (isset($event_data['answer'])) {
$assistant_content .= $event_data['data']['answer']; $assistant_content .= $event_data['answer'];
$this->log('积累Agent回复内容' . $event_data['data']['answer'], 'debug'); $this->log('积累Agent回复内容' . $event_data['answer'], 'debug');
// 实时保存助手回复内容Agent模式流式过程中
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('Agent模式助手回复内容已实时保存字数' . strlen($assistant_content), 'debug');
}
} }
// 保存用户消息仅在第一次获取到会话ID时保存 // 更新用户消息的会话ID(仅在第一次获取到真实会话ID时
if (!$user_message_saved && !empty($real_conversation_id)) { if (!$user_message_saved && !empty($real_conversation_id)) {
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); // 将临时会话ID更新为真实会话ID
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); $kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true; $user_message_saved = true;
$this->log('Agent模式下用户消息已保存,会话ID' . $real_conversation_id, 'info'); $this->log('Agent模式下用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
} }
break; break;
case 'agent_thought': case 'agent_thought':
if (isset($event_data['data']['thought'])) { if (isset($event_data['thought'])) {
// 格式化思考过程 // 格式化思考过程
$thought_content = "\n[思考过程]: " . $event_data['data']['thought']; $thought_content = "\n[思考过程]: " . $event_data['thought'];
if (isset($event_data['data']['tool'])) { if (isset($event_data['tool'])) {
$thought_content .= "\n[使用工具]: " . $event_data['data']['tool']; $thought_content .= "\n[使用工具]: " . $event_data['tool'];
} }
if (isset($event_data['data']['observation'])) { if (isset($event_data['observation'])) {
$thought_content .= "\n[观察结果]: " . $event_data['data']['observation']; $thought_content .= "\n[观察结果]: " . $event_data['observation'];
} }
$assistant_content .= $thought_content; $assistant_content .= $thought_content;
$this->log('Agent思考过程' . $thought_content, 'debug'); $this->log('Agent思考过程' . $thought_content, 'debug');
@@ -484,10 +607,10 @@ class Kefu extends BaseApi
break; break;
case 'file': case 'file':
if (isset($event_data['data']['id']) && isset($event_data['data']['type']) && isset($event_data['data']['url'])) { if (isset($event_data['id']) && isset($event_data['type']) && isset($event_data['url'])) {
$file_id = $event_data['data']['id']; $file_id = $event_data['id'];
$file_type = $event_data['data']['type']; $file_type = $event_data['type'];
$file_url = $event_data['data']['url']; $file_url = $event_data['url'];
// 可以将文件信息作为特殊内容添加到回复中 // 可以将文件信息作为特殊内容添加到回复中
$file_content = "\n[文件]: " . $file_type . " - " . $file_url; $file_content = "\n[文件]: " . $file_type . " - " . $file_url;
$assistant_content .= $file_content; $assistant_content .= $file_content;
@@ -496,16 +619,22 @@ class Kefu extends BaseApi
break; break;
case 'message_start': case 'message_start':
if (isset($event_data['data']['conversation_id'])) { if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id']; $real_conversation_id = $event_data['conversation_id'];
$this->log('消息开始事件会话ID' . $real_conversation_id, 'info'); $this->log('消息开始事件会话ID' . $real_conversation_id, 'info');
} }
break; break;
case 'message_delta': case 'message_delta':
if (isset($event_data['data']['delta']['content'])) { if (isset($event_data['delta']['content'])) {
$assistant_content .= $event_data['data']['delta']['content']; $assistant_content .= $event_data['delta']['content'];
$this->log('积累增量内容:' . $event_data['data']['delta']['content'], 'debug'); $this->log('积累增量内容:' . $event_data['delta']['content'], 'debug');
// 实时保存助手回复内容(增量流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('助手增量回复内容已实时保存,字数:' . strlen($assistant_content), 'debug');
}
} }
break; break;
@@ -527,15 +656,15 @@ class Kefu extends BaseApi
break; break;
case 'message_replace': case 'message_replace':
if (isset($event_data['data']['answer'])) { if (isset($event_data['answer'])) {
// 直接替换所有回复文本 // 直接替换所有回复文本
$assistant_content = $event_data['data']['answer']; $assistant_content = $event_data['answer'];
$this->log('消息内容替换为:' . $event_data['data']['answer'], 'debug'); $this->log('消息内容替换为:' . $event_data['answer'], 'debug');
} }
break; break;
case 'error': case 'error':
$error_message = isset($event_data['data']['message']) ? $event_data['data']['message'] : '流式输出异常'; $error_message = isset($event_data['message']) ? $event_data['message'] : '流式输出异常';
$assistant_content .= "\n[错误]: " . $error_message; $assistant_content .= "\n[错误]: " . $error_message;
$this->log('AI客服错误事件' . $error_message, 'error'); $this->log('AI客服错误事件' . $error_message, 'error');
break; break;
@@ -572,14 +701,14 @@ class Kefu extends BaseApi
break; break;
case 'workflow_node_start': case 'workflow_node_start':
if (isset($event_data['data']['node_id']) && isset($event_data['data']['node_name'])) { if (isset($event_data['node_id']) && isset($event_data['node_name'])) {
$this->log('工作流节点开始:' . $event_data['data']['node_name'], 'debug'); $this->log('工作流节点开始:' . $event_data['node_name'], 'debug');
} }
break; break;
case 'workflow_node_end': case 'workflow_node_end':
if (isset($event_data['data']['node_id']) && isset($event_data['data']['outputs'])) { if (isset($event_data['node_id']) && isset($event_data['outputs'])) {
$this->log('工作流节点结束:' . $event_data['data']['node_id'], 'debug'); $this->log('工作流节点结束:' . $event_data['node_id'], 'debug');
} }
break; break;
@@ -630,18 +759,25 @@ class Kefu extends BaseApi
flush(); flush();
$this->log('发送done和close事件会话ID' . $real_conversation_id, 'info'); $this->log('发送done和close事件会话ID' . $real_conversation_id, 'info');
// 保存助手的完整回复 // 流式正常完成,标记助手消息为已完成状态
if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) { if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) {
$result = $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content); $this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'completed');
if ($result) { $this->log('AI客服回复已标记为完成状态会话ID' . $real_conversation_id . ',总字数:' . strlen($assistant_content), 'info');
$this->log('AI客服回复保存成功会话ID' . $real_conversation_id, 'info');
} else {
$this->log('AI客服回复保存失败会话ID' . $real_conversation_id, 'error');
}
} }
// 清理临时数据
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
} catch (Exception $e) { } catch (Exception $e) {
$error_msg = 'AI客服请求异常' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile(); $error_msg = 'AI客服请求异常' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile();
$this->log($error_msg, 'error'); $this->log($error_msg, 'error');
// 异常时清理临时数据
try {
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
} catch (Exception $cleanupException) {
$this->log('清理临时数据时也发生异常:' . $cleanupException->getMessage(), 'error');
}
} }
// 流式响应处理完成后必须退出脚本避免EventSource协议错误 // 流式响应处理完成后必须退出脚本避免EventSource协议错误
@@ -703,6 +839,66 @@ class Kefu extends BaseApi
]; ];
} }
/**
* 获取流式消息状态
* @return \think\response\Json
*/
public function getMessageStatus()
{
// 获取请求参数
$conversation_id = $this->params['conversation_id'] ?? '';
$user_id = $this->params['user_id'] ?? $this->member_id;
// 验证参数
if (empty($conversation_id)) {
return $this->response($this->error('会话ID不能为空'));
}
try {
$kefu_message_model = new KefuMessageModel();
// 获取所有状态的消息统计
$status_stats = [];
$statuses = ['streaming', 'completed', 'failed'];
foreach ($statuses as $status) {
$count = $this->db->name('aikefu_message')
->where([
['site_id', '=', $this->site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['role', '=', 'assistant'],
['status', '=', $status]
])
->count();
$status_stats[$status] = $count;
}
// 获取最近的失败消息
$failed_messages = $kefu_message_model->getMessageList([
['site_id', '=', $this->site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['status', '=', 'failed']
], 'id, content, create_time', 'create_time desc', 5);
return $this->response($this->success([
'conversation_id' => $conversation_id,
'status_stats' => $status_stats,
'failed_messages' => $failed_messages['data'] ?? [],
'summary' => [
'total_assistant_messages' => array_sum($status_stats),
'completed_messages' => $status_stats['completed'] ?? 0,
'streaming_messages' => $status_stats['streaming'] ?? 0,
'failed_messages' => $status_stats['failed'] ?? 0
]
]));
} catch (\Exception $e) {
return $this->response($this->error('请求失败:' . $e->getMessage()));
}
}
/** /**
* 获取会话历史 * 获取会话历史
* @return \think\response\Json * @return \think\response\Json
@@ -721,13 +917,15 @@ class Kefu extends BaseApi
} }
try { try {
// 获取会话历史记录 // 获取会话历史记录(只返回已完成的消息,过滤掉失败或进行中的消息)
$kefu_message_model = new KefuMessageModel(); $kefu_message_model = new KefuMessageModel();
$message_list = $kefu_message_model->getMessageList([ $message_list = $kefu_message_model->getConversationMessages(
['site_id', '=', $this->site_id], $this->site_id,
['user_id', '=', $user_id], $conversation_id,
['conversation_id', '=', $conversation_id], $limit,
], 'id, role, content, create_time', 'create_time asc', $limit, $offset); $offset,
'completed' // 只返回已完成的消息
);
// 返回成功响应保持与Dify API风格一致 // 返回成功响应保持与Dify API风格一致
return $this->response($this->success([ return $this->response($this->success([
@@ -736,7 +934,8 @@ class Kefu extends BaseApi
'page_info' => [ 'page_info' => [
'limit' => $limit, 'limit' => $limit,
'offset' => $offset 'offset' => $offset
] ],
'status_filter' => 'completed' // 说明当前过滤状态
])); ]));
} catch (\Exception $e) { } catch (\Exception $e) {
return $this->response($this->error('请求失败:' . $e->getMessage())); return $this->response($this->error('请求失败:' . $e->getMessage()));

View File

@@ -17,6 +17,9 @@ CREATE TABLE IF NOT EXISTS `lucky_aikefu_conversation` (
KEY `user_id` (`user_id`), KEY `user_id` (`user_id`),
KEY `conversation_id` (`conversation_id`) KEY `conversation_id` (`conversation_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服会话表'; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服会话表';
-- 索引
ALTER TABLE `lucky_aikefu_conversation`
ADD INDEX `idx_status` (`status`);
-- 创建智能客服消息表 -- 创建智能客服消息表
CREATE TABLE IF NOT EXISTS `lucky_aikefu_message` ( CREATE TABLE IF NOT EXISTS `lucky_aikefu_message` (
@@ -27,10 +30,14 @@ CREATE TABLE IF NOT EXISTS `lucky_aikefu_message` (
`message_id` varchar(100) NOT NULL COMMENT '消息ID', `message_id` varchar(100) NOT NULL COMMENT '消息ID',
`role` varchar(20) NOT NULL COMMENT '角色user用户assistant助手', `role` varchar(20) NOT NULL COMMENT '角色user用户assistant助手',
`content` text NOT NULL COMMENT '消息内容', `content` text NOT NULL COMMENT '消息内容',
`status` varchar(20) NOT NULL DEFAULT 'completed' COMMENT '消息状态streaming(流式中), completed(已完成), failed(失败)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `site_id` (`site_id`), KEY `site_id` (`site_id`),
KEY `user_id` (`user_id`), KEY `user_id` (`user_id`),
KEY `conversation_id` (`conversation_id`), KEY `conversation_id` (`conversation_id`),
KEY `message_id` (`message_id`) KEY `message_id` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服消息表'; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='智能客服消息表';
-- 索引
ALTER TABLE `lucky_aikefu_message`
ADD INDEX `idx_status` (`status`);

View File

@@ -178,15 +178,21 @@ class Message extends Model
* @param string $conversation_id * @param string $conversation_id
* @param int $limit * @param int $limit
* @param int $offset * @param int $offset
* @param string $status 状态过滤
* @return array * @return array
*/ */
public function getConversationMessages($site_id, $conversation_id, $limit = 20, $offset = 0) public function getConversationMessages($site_id, $conversation_id, $limit = 20, $offset = 0, $status = '')
{ {
$condition = [ $condition = [
['site_id', '=', $site_id], ['site_id', '=', $site_id],
['conversation_id', '=', $conversation_id] ['conversation_id', '=', $conversation_id]
]; ];
// 添加状态过滤
if (!empty($status)) {
$condition[] = ['status', '=', $status];
}
return $this->getMessageList($condition, '*', 'create_time asc', ($offset / $limit) + 1, $limit); return $this->getMessageList($condition, '*', 'create_time asc', ($offset / $limit) + 1, $limit);
} }