chore(addon/aikefu): 新增

This commit is contained in:
2025-12-09 13:46:33 +08:00
parent 2f9e1fabd6
commit 37b3d62c74

View File

@@ -6,10 +6,91 @@ use addon\aikefu\model\Config as KefuConfigModel;
use addon\aikefu\model\Conversation as KefuConversationModel; use addon\aikefu\model\Conversation as KefuConversationModel;
use addon\aikefu\model\Message as KefuMessageModel; use addon\aikefu\model\Message as KefuMessageModel;
use app\api\controller\BaseApi; use app\api\controller\BaseApi;
use think\Log;
class Kefu extends BaseApi class Kefu extends BaseApi
{ {
/**
* 通用的curl流式请求函数
* @param string $url 请求URL
* @param string $method 请求方法
* @param array $data 请求数据
* @param array $headers 请求头
* @param callable|null $on_data 数据回调函数,接收原始数据
* @param callable|null $on_error 错误回调函数,接收错误信息
* @return bool 请求是否成功
*/
private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null)
{
$ch = curl_init();
// 设置URL
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
// 设置POST数据
if ($method === 'POST' && !empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data);
}
// 设置请求头
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
} else {
// 默认请求头
curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
]);
}
// 设置cURL选项以支持流式输出
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false); // 不返回结果,直接输出
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 0); // 无超时限制,适用于长时间流式响应
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 30);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $data) use ($on_data) {
// 调用自定义数据处理回调
if (is_callable($on_data)) {
$on_data($data);
} else {
// 默认直接输出
echo $data;
ob_flush();
flush();
}
return strlen($data);
});
// 设置SSE响应头
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
ob_end_clean(); // 清除输出缓冲区
ob_implicit_flush(true); // 自动刷新输出
// 执行请求并流式输出响应
curl_exec($ch);
if (curl_errno($ch)) {
$error = curl_error($ch);
if (is_callable($on_error)) {
$on_error($error);
} else {
echo "data: " . json_encode(["event" => "error", "data" => $error]) . "\n\n";
}
curl_close($ch);
return false;
}
curl_close($ch);
return true;
}
/** /**
* 封装curl请求方法 * 封装curl请求方法
* @param string $url 请求URL * @param string $url 请求URL
@@ -21,46 +102,48 @@ class Kefu extends BaseApi
private function curlRequest($url, $method = 'GET', $data = [], $headers = []) private function curlRequest($url, $method = 'GET', $data = [], $headers = [])
{ {
$ch = curl_init(); $ch = curl_init();
// 设置URL // 设置URL
curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法 // 设置请求方法
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
// 设置POST数据 // 设置POST数据
if ($method === 'POST' && !empty($data)) { if ($method === 'POST' && !empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data); curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data);
} }
// 设置请求头 // 设置请求头
if (!empty($headers)) { if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
} }
// 设置返回值 // 设置返回值
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 30); curl_setopt($ch, CURLOPT_TIMEOUT, 30);
// 执行请求 // 执行请求
$response = curl_exec($ch); $response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 关闭连接 // 关闭连接
curl_close($ch); curl_close($ch);
if ($response === false) { if ($response === false) {
throw new \Exception('Curl请求失败'); throw new \Exception('Curl请求失败');
} }
if ($httpCode >= 400) { if ($httpCode >= 400) {
throw new \Exception('HTTP请求失败状态码' . $httpCode); throw new \Exception('HTTP请求失败状态码' . $httpCode);
} }
return $response; return $response;
} }
/** /**
* 为事件调用初始化属性 * 为事件调用初始化属性
* @param array $data 事件数据 * @param array $data 事件数据
@@ -89,117 +172,499 @@ class Kefu extends BaseApi
/** /**
* 智能客服聊天接口 * 智能客服聊天接口
* @return \think\response\Json * @return \think\response\Json | void
*/ */
public function chat() public function chat()
{ {
// 获取请求参数 try {
$message = $this->params['message'] ?? ''; // 获取请求参数
$user_id = $this->params['user_id'] ?? $this->member_id; $message = $this->params['message'] ?? '';
$conversation_id = $this->params['conversation_id'] ?? ''; $user_id = $this->params['user_id'] ?? $this->member_id;
$stream = $this->params['stream'] ?? false; $conversation_id = $this->params['conversation_id'] ?? '';
$stream = $this->params['stream'] ?? false;
// 验证参数并获取配置
$config = $this->validateAndGetConfig($message);
if ($config instanceof \think\response\Json) {
return $config;
}
// 构建请求数据和请求头
$requestData = $this->buildRequestData($message, $user_id, $conversation_id, $stream);
$headers = $this->buildRequestHeaders($config['api_key']);
// 发送请求到Dify API
$url = $config['base_url'] . $config['chat_endpoint'];
if ($stream) {
// 处理流式响应
$this->handleStreamingResponse($url, $requestData, $headers, $message, $user_id);
} else {
// 处理非流式响应
return $this->handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id);
}
} catch (\Exception $e) {
return $this->response($this->error('请求失败:' . $e->getMessage()));
}
}
/**
* 验证参数并获取配置
* @param string $message 用户消息
* @return array|\think\response\Json
*/
private function validateAndGetConfig($message)
{
// 验证参数 // 验证参数
if (empty($message)) { if (empty($message)) {
return $this->response($this->error('请输入消息内容')); return $this->response($this->error('请输入消息内容'));
} }
try { // 获取智能客服配置
// 获取智能客服配置 $kefu_config_model = new KefuConfigModel();
$kefu_config_model = new KefuConfigModel(); $config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? [];
$config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? [];
if (empty($config_info) || $config_info['status'] != 1) { if (empty($config_info) || $config_info['status'] != 1) {
return $this->response($this->error('智能客服暂未启用')); return $this->response($this->error('智能客服暂未启用'));
}
$config = $config_info;
$apiKey = $config['api_key'];
$baseUrl = $config['base_url'];
$chatEndpoint = $config['chat_endpoint'];
// 构建请求数据
$requestData = [
'inputs' => [],
'query' => $message,
'response_mode' => $stream ? 'streaming' : 'blocking',
'user' => $user_id,
];
// 如果有会话ID添加到请求中
if (!empty($conversation_id)) {
$requestData['conversation_id'] = $conversation_id;
}
// 构建请求头
$headers = [
'Authorization: Bearer ' . $apiKey,
'Content-Type: application/json',
];
// 发送请求到Dify API
$url = $baseUrl . $chatEndpoint;
$response = $this->curlRequest($url, 'POST', $requestData, $headers);
// 解析响应
$result = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
return $this->response($this->error('解析响应失败'));
}
// 保存消息记录
$kefu_message_model = new KefuMessageModel();
$kefu_conversation_model = new KefuConversationModel();
// 保存用户消息
$kefu_message_model->addMessage([
'site_id' => $this->site_id,
'user_id' => $user_id,
'conversation_id' => $result['conversation_id'] ?? $conversation_id,
'message_id' => $result['message_id'] ?? '',
'role' => 'user',
'content' => $message ?? '',
]);
// 保存机器人回复
$kefu_message_model->addMessage([
'site_id' => $this->site_id,
'user_id' => $user_id,
'conversation_id' => $result['conversation_id'] ?? $conversation_id,
'message_id' => $result['id'] ?? '',
'role' => 'assistant',
'content' => $result['answer'] ?? '',
]);
// 更新会话状态或创建新会话
$conversation_info = $kefu_conversation_model->getConversationInfo([
['site_id', '=', $this->site_id],
['conversation_id', '=', $result['conversation_id'] ?? $conversation_id],
]);
if (empty($conversation_info['data'])) {
// 创建新会话
$kefu_conversation_model->addConversation([
'site_id' => $this->site_id,
'user_id' => $user_id,
'conversation_id' => $result['conversation_id'] ?? '',
'name' => '智能客服会话',
]);
} else {
// 更新会话状态
$kefu_conversation_model->updateConversation([
'status' => 1,
], [
['id', '=', $conversation_info['data']['id']],
]);
}
// 返回成功响应保持与Dify API一致的响应结构
return $this->response($this->success($result));
} catch (\Exception $e) {
return $this->response($this->error('请求失败:' . $e->getMessage()));
} }
return $config_info;
}
/**
* 构建请求数据
* @param string $message 用户消息
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param bool $stream 是否使用流式响应
* @return array
*/
private function buildRequestData($message, $user_id, $conversation_id, $stream)
{
$requestData = [
'inputs' => [],
'query' => $message,
'response_mode' => $stream ? 'streaming' : 'blocking',
'user' => $user_id,
];
// 如果有会话ID添加到请求中
if (!empty($conversation_id)) {
$requestData['conversation_id'] = $conversation_id;
}
return $requestData;
}
/**
* 保存用户消息
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @return void
*/
private function saveUserMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content)
{
$message_model->addMessage([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'user',
'content' => $content,
]);
}
/**
* 保存助手消息
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @return void
*/
private function saveAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content)
{
$message_model->addMessage([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'assistant',
'content' => $content,
]);
}
/**
* 更新或创建会话
* @param KefuConversationModel $conversation_model 会话模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @return void
*/
private function updateOrCreateConversation($conversation_model, $site_id, $user_id, $conversation_id)
{
$conversation_info = $conversation_model->getConversationInfo([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
]);
if (empty($conversation_info['data'])) {
// 创建新会话
$conversation_model->addConversation([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'name' => '智能客服会话',
]);
} else {
// 更新会话状态
$conversation_model->updateConversation([
'status' => 1,
], [
['id', '=', $conversation_info['data']['id']],
]);
}
}
/**
* 日志记录封装方法
* @param string $message 日志内容
* @param string $level 日志级别默认为info
* @return void
*/
private function log($message, $level = 'info')
{
Log::write($message, $level);
}
private function handleStreamingResponse($url, $requestData, $headers, $message, $user_id)
{
// 记录开始处理流式请求
$this->log('AI客服流式请求开始处理用户ID' . $user_id . ',请求消息:' . $message, 'info');
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->member_id;
// 定义变量
$real_conversation_id = '';
$real_assistant_message_id = '';
$real_user_message_id = '';
$assistant_content = '';
$user_message_saved = false;
$user_message_content = $message;
// 数据处理回调函数
$on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id) {
// 记录接收到的数据块
$this->log('接收到AI客服数据块' . $data, 'debug');
// 输出原始数据
echo $data;
ob_flush();
flush();
// 解析Dify的流式响应
$lines = explode("\n", $data);
foreach ($lines as $line) {
$line = trim($line);
if (empty($line))
continue;
// 查找以"data: "开头的行
if (strpos($line, 'data: ') === 0) {
$json_data = substr($line, 6);
$event_data = json_decode($json_data, true);
if (json_last_error() === JSON_ERROR_NONE && isset($event_data['event'])) {
$event = $event_data['event'];
$this->log('处理AI客服事件' . $event, 'info');
switch ($event_data['event']) {
case 'message':
// LLM返回文本块事件
if (isset($event_data['data']['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id'];
$this->log('获取到会话ID' . $real_conversation_id, 'info');
}
if (isset($event_data['data']['message_id'])) {
$real_assistant_message_id = $event_data['data']['message_id'];
$this->log('获取到助手消息ID' . $real_assistant_message_id, 'info');
}
// 积累助手回复内容
if (isset($event_data['data']['answer'])) {
$assistant_content .= $event_data['data']['answer'];
$this->log('积累助手回复内容:' . $event_data['data']['answer'], 'debug');
}
// 保存用户消息仅在第一次获取到会话ID时保存
if (!$user_message_saved && !empty($real_conversation_id)) {
// 使用Dify返回的真实会话ID保存用户消息
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content);
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id);
$user_message_saved = true;
$this->log('用户消息已保存会话ID' . $real_conversation_id, 'info');
}
break;
case 'agent_message':
// Agent模式下返回文本块事件
if (isset($event_data['data']['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id'];
$this->log('获取到Agent模式会话ID' . $real_conversation_id, 'info');
}
if (isset($event_data['data']['message_id'])) {
$real_assistant_message_id = $event_data['data']['message_id'];
$this->log('获取到Agent模式助手消息ID' . $real_assistant_message_id, 'info');
}
// 积累助手回复内容
if (isset($event_data['data']['answer'])) {
$assistant_content .= $event_data['data']['answer'];
$this->log('积累Agent回复内容' . $event_data['data']['answer'], 'debug');
}
// 保存用户消息仅在第一次获取到会话ID时保存
if (!$user_message_saved && !empty($real_conversation_id)) {
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, '', $user_message_content);
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id);
$user_message_saved = true;
$this->log('Agent模式下用户消息已保存会话ID' . $real_conversation_id, 'info');
}
break;
case 'agent_thought':
if (isset($event_data['data']['thought'])) {
// 格式化思考过程
$thought_content = "\n[思考过程]: " . $event_data['data']['thought'];
if (isset($event_data['data']['tool'])) {
$thought_content .= "\n[使用工具]: " . $event_data['data']['tool'];
}
if (isset($event_data['data']['observation'])) {
$thought_content .= "\n[观察结果]: " . $event_data['data']['observation'];
}
$assistant_content .= $thought_content;
$this->log('Agent思考过程' . $thought_content, 'debug');
}
break;
case 'file':
if (isset($event_data['data']['id']) && isset($event_data['data']['type']) && isset($event_data['data']['url'])) {
$file_id = $event_data['data']['id'];
$file_type = $event_data['data']['type'];
$file_url = $event_data['data']['url'];
// 可以将文件信息作为特殊内容添加到回复中
$file_content = "\n[文件]: " . $file_type . " - " . $file_url;
$assistant_content .= $file_content;
$this->log('收到文件事件:' . $file_type . ' - ' . $file_url, 'info');
}
break;
case 'message_start':
if (isset($event_data['data']['conversation_id'])) {
$real_conversation_id = $event_data['data']['conversation_id'];
$this->log('消息开始事件会话ID' . $real_conversation_id, 'info');
}
break;
case 'message_delta':
if (isset($event_data['data']['delta']['content'])) {
$assistant_content .= $event_data['data']['delta']['content'];
$this->log('积累增量内容:' . $event_data['data']['delta']['content'], 'debug');
}
break;
case 'message_end':
// 最终内容已通过message或message_delta事件积累
$this->log('消息结束事件会话ID' . $real_conversation_id . '消息ID' . $real_assistant_message_id, 'info');
break;
case 'tts_message':
// TTS音频流事件
// 这里可以记录音频流信息,但由于是文本消息处理,暂时不做特殊处理
$this->log('收到TTS消息事件', 'debug');
break;
case 'tts_message_end':
// TTS音频流结束事件
// 同样不做特殊处理
$this->log('收到TTS消息结束事件', 'debug');
break;
case 'message_replace':
if (isset($event_data['data']['answer'])) {
// 直接替换所有回复文本
$assistant_content = $event_data['data']['answer'];
$this->log('消息内容替换为:' . $event_data['data']['answer'], 'debug');
}
break;
case 'error':
$error_message = isset($event_data['data']['message']) ? $event_data['data']['message'] : '流式输出异常';
$assistant_content .= "\n[错误]: " . $error_message;
$this->log('AI客服错误事件' . $error_message, 'error');
break;
case 'ping':
// 保持连接存活的ping事件
// 无需特殊处理,继续保持连接
$this->log('收到ping事件', 'debug');
break;
case 'tool_call_start':
// 工具调用开始事件
$this->log('工具调用开始事件', 'debug');
break;
case 'tool_call_delta':
// 工具调用增量事件
$this->log('工具调用增量事件', 'debug');
break;
case 'tool_call_end':
// 工具调用结束事件
$this->log('工具调用结束事件', 'debug');
break;
case 'done':
// 完成事件
$this->log('完成事件', 'debug');
break;
case 'workflow_start':
// 工作流开始事件
$this->log('工作流开始事件', 'debug');
break;
case 'workflow_node_start':
if (isset($event_data['data']['node_id']) && isset($event_data['data']['node_name'])) {
$this->log('工作流节点开始:' . $event_data['data']['node_name'], 'debug');
}
break;
case 'workflow_node_end':
if (isset($event_data['data']['node_id']) && isset($event_data['data']['outputs'])) {
$this->log('工作流节点结束:' . $event_data['data']['node_id'], 'debug');
}
break;
case 'workflow_end':
// 工作流结束事件
$this->log('工作流结束事件', 'debug');
break;
default:
// 处理未知事件类型
$this->log('未知事件类型:' . $event_data['event'], 'warning');
break;
}
} else {
$this->log('AI客服事件解析失败' . $json_data, 'error');
}
}
}
};
// 错误处理回调函数
$on_error = function ($error) use ($user_id) {
$this->log('AI客服请求错误用户ID' . $user_id . ',错误信息:' . json_encode($error), 'error');
};
try {
// 调用curl流式请求
$this->curlRequestStreaming($url, $requestData, $headers, $on_data, $on_error);
// 数据流结束时发送明确的"done"事件
$done_event = [
'event' => 'done',
'data' => [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content
]
];
echo "data: " . json_encode($done_event) . "\n\n";
ob_flush();
flush();
$this->log('发送done事件会话ID' . $real_conversation_id, 'info');
// 保存助手的完整回复
if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) {
$result = $this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content);
if ($result) {
$this->log('AI客服回复保存成功会话ID' . $real_conversation_id, 'info');
} else {
$this->log('AI客服回复保存失败会话ID' . $real_conversation_id, 'error');
}
}
} catch (Exception $e) {
$error_msg = 'AI客服请求异常' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile();
$this->log($error_msg, 'error');
}
}
/**
* 处理非流式响应
* @param string $url 请求URL
* @param array $requestData 请求数据
* @param array $headers 请求头
* @param string $message 用户消息
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @return \think\response\Json | void
*/
private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id)
{
// 记录开始处理阻塞请求
$this->log('AI客服阻塞请求开始处理用户ID' . $user_id . '会话ID' . $conversation_id, 'info');
// 初始化模型
$kefu_message_model = new KefuMessageModel();
$kefu_conversation_model = new KefuConversationModel();
// 发送请求
$response = $this->curlRequest($url, 'POST', $requestData, $headers);
// 解析响应
$result = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
return $this->response($this->error('解析响应失败'));
}
// 保存用户消息
$this->saveUserMessage($kefu_message_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id, $result['message_id'] ?? '', $message ?? '');
// 保存机器人回复
$this->saveAssistantMessage($kefu_message_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id, $result['id'] ?? '', $result['answer'] ?? '');
// 更新会话状态或创建新会话
$this->updateOrCreateConversation($kefu_conversation_model, $this->site_id, $user_id, $result['conversation_id'] ?? $conversation_id);
// 返回成功响应保持与Dify API一致的响应结构
return $this->response($this->success($result));
}
/**
* 构建请求头
* @param string $apiKey API密钥
* @return array
*/
private function buildRequestHeaders($apiKey)
{
return [
'Authorization: Bearer ' . $apiKey,
'Content-Type: application/json',
];
} }
/** /**
@@ -243,4 +708,4 @@ class Kefu extends BaseApi
} }
} }