diff --git a/src/addon/aikefu/api/controller/Kefu.php b/src/addon/aikefu/api/controller/Kefu.php index 4124abb65..5e9ab121f 100644 --- a/src/addon/aikefu/api/controller/Kefu.php +++ b/src/addon/aikefu/api/controller/Kefu.php @@ -6,10 +6,91 @@ use addon\aikefu\model\Config as KefuConfigModel; use addon\aikefu\model\Conversation as KefuConversationModel; use addon\aikefu\model\Message as KefuMessageModel; use app\api\controller\BaseApi; +use think\Log; class Kefu extends BaseApi { + /** + * 通用的curl流式请求函数 + * @param string $url 请求URL + * @param string $method 请求方法 + * @param array $data 请求数据 + * @param array $headers 请求头 + * @param callable|null $on_data 数据回调函数,接收原始数据 + * @param callable|null $on_error 错误回调函数,接收错误信息 + * @return bool 请求是否成功 + */ + private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null) + { + $ch = curl_init(); + + // 设置URL + curl_setopt($ch, CURLOPT_URL, $url); + + // 设置请求方法 + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); + + // 设置POST数据 + if ($method === 'POST' && !empty($data)) { + curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data); + } + + // 设置请求头 + if (!empty($headers)) { + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + } else { + // 默认请求头 + curl_setopt($ch, CURLOPT_HTTPHEADER, [ + 'Content-Type: application/json', + ]); + } + + // 设置cURL选项以支持流式输出 + curl_setopt($ch, CURLOPT_RETURNTRANSFER, false); // 不返回结果,直接输出 + curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); + curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); + curl_setopt($ch, CURLOPT_TIMEOUT, 0); // 无超时限制,适用于长时间流式响应 + curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 30); + curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $data) use ($on_data) { + // 调用自定义数据处理回调 + if (is_callable($on_data)) { + $on_data($data); + } else { + // 默认直接输出 + echo $data; + ob_flush(); + flush(); + } + + return strlen($data); + }); + + // 设置SSE响应头 + header('Content-Type: text/event-stream'); + header('Cache-Control: no-cache'); + header('Connection: keep-alive'); + ob_end_clean(); // 清除输出缓冲区 + ob_implicit_flush(true); // 自动刷新输出 + + // 执行请求并流式输出响应 + curl_exec($ch); + + if (curl_errno($ch)) { + $error = curl_error($ch); + if (is_callable($on_error)) { + $on_error($error); + } else { + echo "data: " . json_encode(["event" => "error", "data" => $error]) . "\n\n"; + } + curl_close($ch); + return false; + } + + curl_close($ch); + return true; + } + /** * 封装curl请求方法 * @param string $url 请求URL @@ -21,46 +102,48 @@ class Kefu extends BaseApi private function curlRequest($url, $method = 'GET', $data = [], $headers = []) { $ch = curl_init(); - + // 设置URL curl_setopt($ch, CURLOPT_URL, $url); - + // 设置请求方法 curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); - + // 设置POST数据 if ($method === 'POST' && !empty($data)) { curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data); } - + // 设置请求头 if (!empty($headers)) { curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); } - + // 设置返回值 curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); curl_setopt($ch, CURLOPT_TIMEOUT, 30); - + // 执行请求 $response = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); - + // 关闭连接 curl_close($ch); - + if ($response === false) { throw new \Exception('Curl请求失败'); } - + if ($httpCode >= 400) { throw new \Exception('HTTP请求失败,状态码:' . $httpCode); } - + return $response; } + + /** * 为事件调用初始化属性 * @param array $data 事件数据 @@ -89,117 +172,499 @@ class Kefu extends BaseApi /** * 智能客服聊天接口 - * @return \think\response\Json + * @return \think\response\Json | void */ public function chat() { - // 获取请求参数 - $message = $this->params['message'] ?? ''; - $user_id = $this->params['user_id'] ?? $this->member_id; - $conversation_id = $this->params['conversation_id'] ?? ''; - $stream = $this->params['stream'] ?? false; + try { + // 获取请求参数 + $message = $this->params['message'] ?? ''; + $user_id = $this->params['user_id'] ?? $this->member_id; + $conversation_id = $this->params['conversation_id'] ?? ''; + $stream = $this->params['stream'] ?? false; + // 验证参数并获取配置 + $config = $this->validateAndGetConfig($message); + if ($config instanceof \think\response\Json) { + return $config; + } + + // 构建请求数据和请求头 + $requestData = $this->buildRequestData($message, $user_id, $conversation_id, $stream); + $headers = $this->buildRequestHeaders($config['api_key']); + + // 发送请求到Dify API + $url = $config['base_url'] . $config['chat_endpoint']; + + if ($stream) { + // 处理流式响应 + $this->handleStreamingResponse($url, $requestData, $headers, $message, $user_id); + } else { + // 处理非流式响应 + return $this->handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id); + } + } catch (\Exception $e) { + return $this->response($this->error('请求失败:' . $e->getMessage())); + } + } + + /** + * 验证参数并获取配置 + * @param string $message 用户消息 + * @return array|\think\response\Json + */ + private function validateAndGetConfig($message) + { // 验证参数 if (empty($message)) { return $this->response($this->error('请输入消息内容')); } - try { - // 获取智能客服配置 - $kefu_config_model = new KefuConfigModel(); - $config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? []; + // 获取智能客服配置 + $kefu_config_model = new KefuConfigModel(); + $config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? []; - if (empty($config_info) || $config_info['status'] != 1) { - return $this->response($this->error('智能客服暂未启用')); - } - - $config = $config_info; - $apiKey = $config['api_key']; - $baseUrl = $config['base_url']; - $chatEndpoint = $config['chat_endpoint']; - - // 构建请求数据 - $requestData = [ - 'inputs' => [], - 'query' => $message, - 'response_mode' => $stream ? 'streaming' : 'blocking', - 'user' => $user_id, - ]; - - // 如果有会话ID,添加到请求中 - if (!empty($conversation_id)) { - $requestData['conversation_id'] = $conversation_id; - } - - // 构建请求头 - $headers = [ - 'Authorization: Bearer ' . $apiKey, - 'Content-Type: application/json', - ]; - - // 发送请求到Dify API - $url = $baseUrl . $chatEndpoint; - $response = $this->curlRequest($url, 'POST', $requestData, $headers); - - // 解析响应 - $result = json_decode($response, true); - - if (json_last_error() !== JSON_ERROR_NONE) { - return $this->response($this->error('解析响应失败')); - } - - // 保存消息记录 - $kefu_message_model = new KefuMessageModel(); - $kefu_conversation_model = new KefuConversationModel(); - - // 保存用户消息 - $kefu_message_model->addMessage([ - 'site_id' => $this->site_id, - 'user_id' => $user_id, - 'conversation_id' => $result['conversation_id'] ?? $conversation_id, - 'message_id' => $result['message_id'] ?? '', - 'role' => 'user', - 'content' => $message ?? '', - ]); - - // 保存机器人回复 - $kefu_message_model->addMessage([ - 'site_id' => $this->site_id, - 'user_id' => $user_id, - 'conversation_id' => $result['conversation_id'] ?? $conversation_id, - 'message_id' => $result['id'] ?? '', - 'role' => 'assistant', - 'content' => $result['answer'] ?? '', - ]); - - // 更新会话状态或创建新会话 - $conversation_info = $kefu_conversation_model->getConversationInfo([ - ['site_id', '=', $this->site_id], - ['conversation_id', '=', $result['conversation_id'] ?? $conversation_id], - ]); - - if (empty($conversation_info['data'])) { - // 创建新会话 - $kefu_conversation_model->addConversation([ - 'site_id' => $this->site_id, - 'user_id' => $user_id, - 'conversation_id' => $result['conversation_id'] ?? '', - 'name' => '智能客服会话', - ]); - } else { - // 更新会话状态 - $kefu_conversation_model->updateConversation([ - 'status' => 1, - ], [ - ['id', '=', $conversation_info['data']['id']], - ]); - } - - // 返回成功响应,保持与Dify API一致的响应结构 - return $this->response($this->success($result)); - } catch (\Exception $e) { - return $this->response($this->error('请求失败:' . $e->getMessage())); + if (empty($config_info) || $config_info['status'] != 1) { + return $this->response($this->error('智能客服暂未启用')); } + + return $config_info; + } + + /** + * 构建请求数据 + * @param string $message 用户消息 + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @param bool $stream 是否使用流式响应 + * @return array + */ + private function buildRequestData($message, $user_id, $conversation_id, $stream) + { + $requestData = [ + 'inputs' => [], + 'query' => $message, + 'response_mode' => $stream ? 'streaming' : 'blocking', + 'user' => $user_id, + ]; + + // 如果有会话ID,添加到请求中 + if (!empty($conversation_id)) { + $requestData['conversation_id'] = $conversation_id; + } + + return $requestData; + } + + + + /** + * 保存用户消息 + * @param KefuMessageModel $message_model 消息模型 + * @param string $site_id 站点ID + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @param string $message_id 消息ID + * @param string $content 消息内容 + * @return void + */ + private function saveUserMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content) + { + $message_model->addMessage([ + 'site_id' => $site_id, + 'user_id' => $user_id, + 'conversation_id' => $conversation_id, + 'message_id' => $message_id, + 'role' => 'user', + 'content' => $content, + ]); + } + + /** + * 保存助手消息 + * @param KefuMessageModel $message_model 消息模型 + * @param string $site_id 站点ID + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @param string $message_id 消息ID + * @param string $content 消息内容 + * @return void + */ + private function saveAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content) + { + $message_model->addMessage([ + 'site_id' => $site_id, + 'user_id' => $user_id, + 'conversation_id' => $conversation_id, + 'message_id' => $message_id, + 'role' => 'assistant', + 'content' => $content, + ]); + } + + /** + * 更新或创建会话 + * @param KefuConversationModel $conversation_model 会话模型 + * @param string $site_id 站点ID + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @return void + */ + private function updateOrCreateConversation($conversation_model, $site_id, $user_id, $conversation_id) + { + $conversation_info = $conversation_model->getConversationInfo([ + ['site_id', '=', $site_id], + ['user_id', '=', $user_id], + ['conversation_id', '=', $conversation_id], + ]); + + if (empty($conversation_info['data'])) { + // 创建新会话 + $conversation_model->addConversation([ + 'site_id' => $site_id, + 'user_id' => $user_id, + 'conversation_id' => $conversation_id, + 'name' => '智能客服会话', + ]); + } else { + // 更新会话状态 + $conversation_model->updateConversation([ + 'status' => 1, + ], [ + ['id', '=', $conversation_info['data']['id']], + ]); + } + } + + /** + * 日志记录封装方法 + * @param string $message 日志内容 + * @param string $level 日志级别,默认为info + * @return void + */ + private function log($message, $level = 'info') + { + Log::write($message, $level); + } + + private function handleStreamingResponse($url, $requestData, $headers, $message, $user_id) + { + // 记录开始处理流式请求 + $this->log('AI客服流式请求开始处理,用户ID:' . $user_id . ',请求消息:' . $message, 'info'); + + // 初始化模型 + $kefu_conversation_model = new KefuConversationModel(); + $kefu_message_model = new KefuMessageModel(); + $site_id = $this->site_id; + $current_user_id = $this->member_id; + + // 定义变量 + $real_conversation_id = ''; + $real_assistant_message_id = ''; + $real_user_message_id = ''; + $assistant_content = ''; + $user_message_saved = false; + $user_message_content = $message; + + // 数据处理回调函数 + $on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id) { + // 记录接收到的数据块 + $this->log('接收到AI客服数据块:' . $data, 'debug'); + + // 输出原始数据 + echo $data; + ob_flush(); + flush(); + + // 解析Dify的流式响应 + $lines = explode("\n", $data); + foreach ($lines as $line) { + $line = trim($line); + if (empty($line)) + continue; + + // 查找以"data: "开头的行 + if (strpos($line, 'data: ') === 0) { + $json_data = substr($line, 6); + $event_data = json_decode($json_data, true); + + if (json_last_error() === JSON_ERROR_NONE && isset($event_data['event'])) { + $event = $event_data['event']; + $this->log('处理AI客服事件:' . $event, 'info'); + + switch ($event_data['event']) { + case 'message': + // LLM返回文本块事件 + if (isset($event_data['data']['conversation_id'])) { + $real_conversation_id = $event_data['data']['conversation_id']; + $this->log('获取到会话ID:' . $real_conversation_id, 'info'); + } + if (isset($event_data['data']['message_id'])) { + $real_assistant_message_id = $event_data['data']['message_id']; + $this->log('获取到助手消息ID:' . $real_assistant_message_id, 'info'); + } + // 积累助手回复内容 + if (isset($event_data['data']['answer'])) { + $assistant_content .= $event_data['data']['answer']; + $this->log('积累助手回复内容:' . $event_data['data']['answer'], 'debug'); + } + // 保存用户消息(仅在第一次获取到会话ID时保存) + if (!$user_message_saved && !empty($real_conversation_id)) { + // 使用Dify返回的真实会话ID保存用户消息 + $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); + $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + $user_message_saved = true; + $this->log('用户消息已保存,会话ID:' . $real_conversation_id, 'info'); + } + break; + + case 'agent_message': + // Agent模式下返回文本块事件 + if (isset($event_data['data']['conversation_id'])) { + $real_conversation_id = $event_data['data']['conversation_id']; + $this->log('获取到Agent模式会话ID:' . $real_conversation_id, 'info'); + } + if (isset($event_data['data']['message_id'])) { + $real_assistant_message_id = $event_data['data']['message_id']; + $this->log('获取到Agent模式助手消息ID:' . $real_assistant_message_id, 'info'); + } + // 积累助手回复内容 + if (isset($event_data['data']['answer'])) { + $assistant_content .= $event_data['data']['answer']; + $this->log('积累Agent回复内容:' . $event_data['data']['answer'], 'debug'); + } + // 保存用户消息(仅在第一次获取到会话ID时保存) + if (!$user_message_saved && !empty($real_conversation_id)) { + $this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content); + $this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id); + $user_message_saved = true; + $this->log('Agent模式下用户消息已保存,会话ID:' . $real_conversation_id, 'info'); + } + break; + + case 'agent_thought': + if (isset($event_data['data']['thought'])) { + // 格式化思考过程 + $thought_content = "\n[思考过程]: " . $event_data['data']['thought']; + if (isset($event_data['data']['tool'])) { + $thought_content .= "\n[使用工具]: " . $event_data['data']['tool']; + } + if (isset($event_data['data']['observation'])) { + $thought_content .= "\n[观察结果]: " . $event_data['data']['observation']; + } + $assistant_content .= $thought_content; + $this->log('Agent思考过程:' . $thought_content, 'debug'); + } + break; + + case 'file': + if (isset($event_data['data']['id']) && isset($event_data['data']['type']) && isset($event_data['data']['url'])) { + $file_id = $event_data['data']['id']; + $file_type = $event_data['data']['type']; + $file_url = $event_data['data']['url']; + // 可以将文件信息作为特殊内容添加到回复中 + $file_content = "\n[文件]: " . $file_type . " - " . $file_url; + $assistant_content .= $file_content; + $this->log('收到文件事件:' . $file_type . ' - ' . $file_url, 'info'); + } + break; + + case 'message_start': + if (isset($event_data['data']['conversation_id'])) { + $real_conversation_id = $event_data['data']['conversation_id']; + $this->log('消息开始事件,会话ID:' . $real_conversation_id, 'info'); + } + break; + + case 'message_delta': + if (isset($event_data['data']['delta']['content'])) { + $assistant_content .= $event_data['data']['delta']['content']; + $this->log('积累增量内容:' . $event_data['data']['delta']['content'], 'debug'); + } + break; + + case 'message_end': + // 最终内容已通过message或message_delta事件积累 + $this->log('消息结束事件,会话ID:' . $real_conversation_id . ',消息ID:' . $real_assistant_message_id, 'info'); + break; + + case 'tts_message': + // TTS音频流事件 + // 这里可以记录音频流信息,但由于是文本消息处理,暂时不做特殊处理 + $this->log('收到TTS消息事件', 'debug'); + break; + + case 'tts_message_end': + // TTS音频流结束事件 + // 同样不做特殊处理 + $this->log('收到TTS消息结束事件', 'debug'); + break; + + case 'message_replace': + if (isset($event_data['data']['answer'])) { + // 直接替换所有回复文本 + $assistant_content = $event_data['data']['answer']; + $this->log('消息内容替换为:' . $event_data['data']['answer'], 'debug'); + } + break; + + case 'error': + $error_message = isset($event_data['data']['message']) ? $event_data['data']['message'] : '流式输出异常'; + $assistant_content .= "\n[错误]: " . $error_message; + $this->log('AI客服错误事件:' . $error_message, 'error'); + break; + + case 'ping': + // 保持连接存活的ping事件 + // 无需特殊处理,继续保持连接 + $this->log('收到ping事件', 'debug'); + break; + + case 'tool_call_start': + // 工具调用开始事件 + $this->log('工具调用开始事件', 'debug'); + break; + + case 'tool_call_delta': + // 工具调用增量事件 + $this->log('工具调用增量事件', 'debug'); + break; + + case 'tool_call_end': + // 工具调用结束事件 + $this->log('工具调用结束事件', 'debug'); + break; + + case 'done': + // 完成事件 + $this->log('完成事件', 'debug'); + break; + + case 'workflow_start': + // 工作流开始事件 + $this->log('工作流开始事件', 'debug'); + break; + + case 'workflow_node_start': + if (isset($event_data['data']['node_id']) && isset($event_data['data']['node_name'])) { + $this->log('工作流节点开始:' . $event_data['data']['node_name'], 'debug'); + } + break; + + case 'workflow_node_end': + if (isset($event_data['data']['node_id']) && isset($event_data['data']['outputs'])) { + $this->log('工作流节点结束:' . $event_data['data']['node_id'], 'debug'); + } + break; + + case 'workflow_end': + // 工作流结束事件 + $this->log('工作流结束事件', 'debug'); + break; + + default: + // 处理未知事件类型 + $this->log('未知事件类型:' . $event_data['event'], 'warning'); + break; + } + } else { + $this->log('AI客服事件解析失败:' . $json_data, 'error'); + } + } + } + }; + + // 错误处理回调函数 + $on_error = function ($error) use ($user_id) { + $this->log('AI客服请求错误,用户ID:' . $user_id . ',错误信息:' . json_encode($error), 'error'); + }; + + try { + // 调用curl流式请求 + $this->curlRequestStreaming($url, $requestData, $headers, $on_data, $on_error); + + // 数据流结束时发送明确的"done"事件 + $done_event = [ + 'event' => 'done', + 'data' => [ + 'conversation_id' => $real_conversation_id, + 'message_id' => $real_assistant_message_id, + 'content' => $assistant_content + ] + ]; + echo "data: " . json_encode($done_event) . "\n\n"; + ob_flush(); + flush(); + $this->log('发送done事件,会话ID:' . $real_conversation_id, 'info'); + + // 保存助手的完整回复 + if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) { + $result = $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content); + if ($result) { + $this->log('AI客服回复保存成功,会话ID:' . $real_conversation_id, 'info'); + } else { + $this->log('AI客服回复保存失败,会话ID:' . $real_conversation_id, 'error'); + } + } + } catch (Exception $e) { + $error_msg = 'AI客服请求异常:' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile(); + $this->log($error_msg, 'error'); + } + } + + /** + * 处理非流式响应 + * @param string $url 请求URL + * @param array $requestData 请求数据 + * @param array $headers 请求头 + * @param string $message 用户消息 + * @param string $user_id 用户ID + * @param string $conversation_id 会话ID + * @return \think\response\Json | void + */ + private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id) + { + // 记录开始处理阻塞请求 + $this->log('AI客服阻塞请求开始处理,用户ID:' . $user_id . ',会话ID:' . $conversation_id, 'info'); + + // 初始化模型 + $kefu_message_model = new KefuMessageModel(); + $kefu_conversation_model = new KefuConversationModel(); + + // 发送请求 + $response = $this->curlRequest($url, 'POST', $requestData, $headers); + + // 解析响应 + $result = json_decode($response, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + return $this->response($this->error('解析响应失败')); + } + + // 保存用户消息 + $this->saveUserMessage($kefu_message_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id, $result['message_id'] ?? '', $message ?? ''); + + // 保存机器人回复 + $this->saveAssistantMessage($kefu_message_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id, $result['id'] ?? '', $result['answer'] ?? ''); + + // 更新会话状态或创建新会话 + $this->updateOrCreateConversation($kefu_conversation_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id); + + // 返回成功响应,保持与Dify API一致的响应结构 + return $this->response($this->success($result)); + } + + /** + * 构建请求头 + * @param string $apiKey API密钥 + * @return array + */ + private function buildRequestHeaders($apiKey) + { + return [ + 'Authorization: Bearer ' . $apiKey, + 'Content-Type: application/json', + ]; } /** @@ -243,4 +708,4 @@ class Kefu extends BaseApi } -} +} \ No newline at end of file