Files
shop-platform/src/addon/aikefu/api/controller/WebSocket.php
ZF sun ef708e6b40 feat(WebSocket): 添加数据库连接检查和文件预览功能
- 在DefaultWebSocketController中添加数据库连接检查功能
- 实现文件预览和下载功能及相关API接口
- 更新测试页面支持文件预览和下载操作
- 移除旧的数据库维护子进程机制,改为函数检查
- 在构建请求数据时添加文件字段支持
2026-01-26 08:40:07 +08:00

1751 lines
74 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace addon\aikefu\api\controller;
use addon\aikefu\model\Config as KefuConfigModel;
use addon\aikefu\model\Conversation as KefuConversationModel;
use addon\aikefu\model\Message as KefuMessageModel;
use app\api\controller\WebSocketBase;
use app\model\system\Api;
use Ratchet\ConnectionInterface;
use think\facade\Db as Db;
use think\facade\Config;
use think\facade\Cache;
use React\EventLoop\Loop;
class WebSocket extends WebSocketBase
{
// 存储正在进行的流式请求信息
protected $streamingRequests = [];
public function __construct()
{
// 调用父类构造函数传入当前addon名称
parent::__construct('aikefu');
}
/**
* 当有新客户端连接时调用
* @param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn)
{
// 存储新连接的客户端
$this->clients->attach($conn);
$this->clientData[$conn->resourceId] = [
'connection' => $conn,
'site_id' => null,
'user_id' => null,
'token' => null,
'is_authenticated' => false,
'conversation_id' => null,
];
echo "New connection! ({$conn->resourceId})\n";
}
/**
* 当从客户端收到消息时调用
* @param ConnectionInterface $conn
* @param string $message
*/
public function onMessage(ConnectionInterface $conn, $message)
{
$numRecv = count($this->clients) - 1;
echo sprintf(
'Connection %d sending message "%s" to %d other connection%s' . "\n",
$conn->resourceId,
$message,
$numRecv,
$numRecv == 1 ? '' : 's'
);
// 解析消息
try {
$data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON format');
}
// 处理认证
if (isset($data['action']) && $data['action'] === 'auth') {
$this->handleAuth($conn, $data);
return;
}
// 检查是否已认证
if (!$this->clientData[$conn->resourceId]['is_authenticated']) {
$conn->send(json_encode(['type' => 'error', 'message' => 'Not authenticated']));
return;
}
// 处理心跳
if (isset($data['action']) && $data['action'] === 'ping') {
$conn->send(json_encode(['type' => 'pong']));
return;
}
// 处理聊天消息
if (isset($data['action']) && $data['action'] === 'chat') {
$this->handleChat($conn, $data);
return;
}
// 处理分片上传
if (isset($data['action']) && $data['action'] === 'upload_chunk') {
$this->handleUploadChunk($conn, $data);
return;
}
// 处理分片合并
if (isset($data['action']) && $data['action'] === 'upload_merge') {
$this->handleMergeChunks($conn, $data);
return;
}
// 处理分片状态检查
if (isset($data['action']) && $data['action'] === 'upload_check') {
$this->handleCheckChunks($conn, $data);
return;
}
// 处理文件预览
if (isset($data['action']) && $data['action'] === 'file_preview') {
$this->handleFilePreview($conn, $data);
return;
}
$conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action']));
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
/**
* 当客户端连接关闭时调用
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn)
{
$resourceId = $conn->resourceId;
// 移除连接
$this->clients->detach($conn);
unset($this->clientData[$resourceId]);
// 停止与该连接相关的所有流式请求
if (isset($this->streamingRequests[$resourceId])) {
$this->streamingRequests[$resourceId]['is_active'] = false;
$this->log('客户端连接已关闭,标记流式请求为停止:' . $resourceId, 'info');
}
echo "Connection {$resourceId} has disconnected\n";
}
/**
* 当连接发生错误时调用
* @param ConnectionInterface $conn
* @param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e)
{
echo "An error has occurred: {$e->getMessage()}\n";
$conn->close();
}
/**
* 实际鉴权逻辑(复用 HTTP BaseApi::checkToken 同款规则)
*
* 说明:
* - WebSocket 连接场景没有 request()/input(),所以这里直接根据客户端传入的 site_id + token 解密校验
* - 校验 token 解密成功、未过期、且 token 内 member_id 与传入 member_id 一致
*/
protected function doAuth(ConnectionInterface $conn, $data)
{
// 与 Kefu.php 保持一致,支持使用 uniacid 作为站点ID
$site_id = (int) ($data['uniacid'] ?? $data['site_id'] ?? 0);
$user_id = (int) ($data['user_id'] ?? 0);
$token = (string) ($data['token'] ?? '');
if ($site_id <= 0 || $user_id <= 0 || $token === '') {
throw new \Exception('Missing authentication parameters');
}
$this->log('doAuth: ' . json_encode(['site_id' => $site_id, 'user_id' => $user_id, 'token' => $token]), 'info');
// 生成与 BaseApi::checkToken 一致的解密 keyprivate_key + 'site' . site_id如启用 API 私钥)
$key = 'site' . $site_id;
$api_model = new Api();
$api_config = $api_model->getApiConfig()['data'] ?? null;
if (
!empty($api_config) &&
!empty($api_config['is_use']) &&
isset($api_config['value']['private_key']) &&
!empty($api_config['value']['private_key'])
) {
$key = $api_config['value']['private_key'] . $key;
}
$this->log('key:' . $key, 'info');
$decrypt = decrypt($token, $key);
if (empty($decrypt)) {
throw new \Exception('TOKEN_ERROR');
}
$this->log('decrypt:' . $decrypt, 'info');
$decrypted_data = json_decode($decrypt, true);
if (!is_array($decrypted_data) || empty($decrypted_data['member_id'])) {
throw new \Exception('TOKEN_ERROR');
}
// member_id 必须一致,避免冒用
if ((int) $decrypted_data['member_id'] !== $user_id) {
throw new \Exception('TOKEN_ERROR');
}
// 过期校验expire_time=0 为永久,其余必须未过期
$expire_time = (int) ($decrypted_data['expire_time'] ?? 0);
if ($expire_time !== 0 && $expire_time < time()) {
throw new \Exception('TOKEN_EXPIRE');
}
$this->log('expire_time:' . $expire_time, 'info');
// 与 BaseApi 行为一致:临近过期时生成 refresh_token 放入缓存(可选,不强制给客户端)
if ($expire_time !== 0 && ($expire_time - time()) < 300 && !Cache::get('member_token' . $user_id)) {
try {
// WebSocket 场景不强制下发 refresh_token但仍按原逻辑缓存便于其他接口复用
$refresh_token = encrypt(json_encode([
'member_id' => $user_id,
'create_time' => time(),
'expire_time' => $expire_time,
]), $key);
Cache::set('member_token' . $user_id, $refresh_token, 360);
} catch (\Throwable $e) {
// 刷新失败不影响当前鉴权通过
}
}
}
/**
* 处理聊天消息
* @param ConnectionInterface $conn
* @param array $data
*/
protected function handleChat(ConnectionInterface $conn, $data)
{
try {
$clientInfo = $this->clientData[$conn->resourceId];
// 获取请求参数,与 Kefu.php 保持一致
$query = $data['query'] ?? $data['message'] ?? '';
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$conversation_id = $data['conversation_id'] ?? '';
$stream = $data['stream'] ?? false;
$response_mode = $data['response_mode'] ?? 'streaming'; // 与 Kefu.php 保持一致
// 获取当前连接的客户端信息
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$token = $data['token'] ?? $clientInfo['token'];
// 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([
'query' => ['required' => true, 'message' => '参数错误,请检查 `query` 参数是否设置正确', 'description' => '消息内容'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
], [
'query' => $query,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'stream' => $stream,
'response_mode' => $response_mode,
'uniacid' => $site_id,
'user_id' => $user_id,
'token' => $token,
]);
// 是否启用流式响应,与 Kefu.php 保持一致
$enable_stream = $stream || $response_mode == 'streaming';
// 构建请求数据和请求头
$requestData = $this->buildRequestData($query, $user_id, $conversation_id, $enable_stream, $data);
$headers = $this->buildRequestHeaders($config['api_key']);
// 发送请求到Dify API
$url = $config['base_url'] . $config['chat_endpoint'];
if ($enable_stream) {
// 处理流式响应
$this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id, $site_id);
} else {
// 处理非流式响应
$response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id, $site_id);
$conn->send(json_encode(['type' => 'message', 'data' => $response]));
}
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '请求失败:' . $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
/**
* 处理分片上传
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleUploadChunk(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取分片上传相关参数
$file_id = $data['file_id'] ?? '';
$file_name = $data['file_name'] ?? '';
$file_type = $data['file_type'] ?? '';
$chunk_index = $data['chunk_index'] ?? 0;
$total_chunks = $data['total_chunks'] ?? 0;
$chunk_size = $data['chunk_size'] ?? 0;
$file_size = $data['file_size'] ?? 0;
$file_content = $data['file_content'] ?? '';
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
// 验证参数
if (empty($file_id) || empty($file_name) || empty($file_content)) {
throw new \Exception('分片上传参数不完整');
}
// 创建临时目录存储分片
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
if (!is_dir($temp_dir)) {
mkdir($temp_dir, 0777, true);
}
// 保存分片文件
$chunk_file = $temp_dir . '/' . $chunk_index;
$file_data = base64_decode($file_content);
if ($file_data === false) {
throw new \Exception('分片内容base64解码失败');
}
// 写入分片文件
file_put_contents($chunk_file, $file_data);
// 发送分片上传成功响应
$conn->send(json_encode([
'type' => 'chunk_uploaded',
'file_id' => $file_id,
'chunk_index' => $chunk_index,
'message' => '分片上传成功'
]));
$this->log('分片上传成功文件ID' . $file_id . ',分片索引:' . $chunk_index . '/' . $total_chunks, 'info');
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '分片上传失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? '']));
}
}
/**
* 处理分片合并
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleMergeChunks(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取合并相关参数
$file_id = $data['file_id'] ?? '';
$file_name = $data['file_name'] ?? '';
$file_type = $data['file_type'] ?? '';
$total_chunks = $data['total_chunks'] ?? 0;
$file_size = $data['file_size'] ?? 0;
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$token = $data['token'] ?? $clientInfo['token'];
// 验证参数
if (empty($file_id) || empty($file_name)) {
throw new \Exception('合并参数不完整');
}
// 获取临时目录
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
if (!is_dir($temp_dir)) {
throw new \Exception('分片存储目录不存在');
}
// 验证所有分片是否存在
for ($i = 0; $i < $total_chunks; $i++) {
$chunk_file = $temp_dir . '/' . $i;
if (!file_exists($chunk_file)) {
throw new \Exception('分片文件缺失:' . $i);
}
}
// 合并分片
$merged_file = $temp_dir . '/merged_' . $file_name;
$merged_handle = fopen($merged_file, 'wb');
if (!$merged_handle) {
throw new \Exception('创建合并文件失败');
}
// 按顺序读取并合并分片
for ($i = 0; $i < $total_chunks; $i++) {
$chunk_file = $temp_dir . '/' . $i;
$chunk_handle = fopen($chunk_file, 'rb');
if (!$chunk_handle) {
fclose($merged_handle);
throw new \Exception('打开分片文件失败:' . $i);
}
// 读取分片内容并写入合并文件
while (!feof($chunk_handle)) {
$buffer = fread($chunk_handle, 8192);
fwrite($merged_handle, $buffer);
}
fclose($chunk_handle);
}
fclose($merged_handle);
// 验证合并后的文件大小
if (filesize($merged_file) !== $file_size) {
throw new \Exception('合并后的文件大小与预期不符');
}
// 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([
'file_name' => ['required' => true, 'message' => '文件名不能为空', 'description' => '文件名'],
'file_type' => ['required' => true, 'message' => '文件类型不能为空', 'description' => '文件类型'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
], [
'file_name' => $file_name,
'file_type' => $file_type,
'user_id' => $user_id,
'uniacid' => $site_id,
'token' => $token
]);
// 发送请求到Dify API
$url = $config['base_url'] . '/files/upload';
// 构建请求头
$headers = [
'Authorization: Bearer ' . $config['api_key'],
];
// 读取合并后的文件内容
$file_content = base64_encode(file_get_contents($merged_file));
// 发送文件上传请求(使用 multipart/form-data 格式)
$response = $this->curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers);
// 解析响应
$result = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('解析响应失败');
}
// 验证响应数据
if (empty($result) || !isset($result['id'])) {
throw new \Exception('API返回数据格式错误或缺少必要字段');
}
// 清理临时文件
$this->cleanupTempFiles($temp_dir);
// 发送合并成功响应
$conn->send(json_encode([
'type' => 'upload_success',
'file_id' => $result['id'] ?? '',
'file_name' => $result['name'] ?? $file_name,
'file_size' => $result['size'] ?? $file_size,
'file_extension' => $result['extension'] ?? '',
'file_mime_type' => $result['mime_type'] ?? $file_type,
'file_created_by' => $result['created_by'] ?? '',
'file_created_at' => $result['created_at'] ?? '',
'file_url' => $result['url'] ?? ''
]));
$this->log('文件上传成功用户ID' . $user_id . ',文件名:' . $file_name . '文件ID' . $result['id'], 'info');
} catch (\Exception $e) {
// 清理临时文件
if (!empty($data['file_id'])) {
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $data['file_id'];
if (is_dir($temp_dir)) {
$this->cleanupTempFiles($temp_dir);
}
}
// 解析错误信息
$errorMessage = $e->getMessage();
$errorCode = 500;
$errorType = 'upload_failed';
// 提取HTTP错误码和Dify错误信息
if (preg_match('/HTTP请求失败状态码(\d+),响应:(.*)/', $errorMessage, $matches)) {
$errorCode = (int) $matches[1];
$errorResponse = $matches[2];
try {
$errorData = json_decode($errorResponse, true);
if (isset($errorData['code'])) {
$errorType = $errorData['code'];
}
if (isset($errorData['message'])) {
$errorMessage = $errorData['message'];
}
} catch (\Exception $decodeEx) {
// 解析失败,使用原始错误信息
}
}
$conn->send(json_encode([
'type' => 'error',
'code' => $errorCode,
'error_type' => $errorType,
'message' => '文件上传失败:' . $errorMessage
]));
}
}
/**
* 清理临时文件
* @param string $dir 临时目录
*/
private function cleanupTempFiles($dir)
{
if (!is_dir($dir)) {
return;
}
$files = scandir($dir);
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$file_path = $dir . '/' . $file;
if (is_file($file_path)) {
unlink($file_path);
} elseif (is_dir($file_path)) {
$this->cleanupTempFiles($file_path);
}
}
rmdir($dir);
}
/**
* 处理分片状态检查
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleCheckChunks(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取检查相关参数
$file_id = $data['file_id'] ?? '';
$total_chunks = $data['total_chunks'] ?? 0;
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
// 验证参数
if (empty($file_id)) {
throw new \Exception('文件ID不能为空');
}
// 获取临时目录
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
$uploaded_chunks = [];
// 检查临时目录是否存在
if (is_dir($temp_dir)) {
// 扫描目录中的分片文件
$files = scandir($temp_dir);
foreach ($files as $file) {
if ($file === '.' || $file === '..' || strpos($file, 'merged_') === 0) {
continue;
}
// 检查是否是数字文件名(分片索引)
if (is_numeric($file)) {
$chunk_index = (int) $file;
// 检查分片文件是否存在且不为空
if (file_exists($temp_dir . '/' . $file) && filesize($temp_dir . '/' . $file) > 0) {
$uploaded_chunks[] = $chunk_index;
}
}
}
}
// 发送分片状态响应
$conn->send(json_encode([
'type' => 'chunks_status',
'file_id' => $file_id,
'uploaded_chunks' => $uploaded_chunks,
'total_chunks' => $total_chunks,
'message' => '分片状态检查成功'
]));
$this->log('分片状态检查成功文件ID' . $file_id . ',已上传分片数:' . count($uploaded_chunks) . '/' . $total_chunks, 'info');
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '分片状态检查失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? '']));
}
}
/**
* 处理文件预览
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleFilePreview(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取预览相关参数
$file_id = $data['file_id'] ?? '';
$as_attachment = $data['as_attachment'] ?? false;
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$token = $data['token'] ?? $clientInfo['token'];
// 验证参数
if (empty($file_id)) {
throw new \Exception('文件ID不能为空');
}
// 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([
'file_id' => ['required' => true, 'message' => '文件ID不能为空', 'description' => '文件ID'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
], [
'file_id' => $file_id,
'user_id' => $user_id,
'uniacid' => $site_id,
'token' => $token
]);
// 构建请求URL
$url = $config['base_url'] . '/files/' . $file_id . '/preview';
if ($as_attachment) {
$url .= '?as_attachment=true';
}
// 构建请求头
$headers = [
'Authorization: Bearer ' . $config['api_key'],
'Accept: */*'
];
// 发送请求到Dify API
$response = $this->curlGetFile($url, $headers);
// 发送预览成功响应
$conn->send(json_encode([
'type' => 'file_preview_success',
'file_id' => $file_id,
'file_url' => $url,
'message' => '文件预览请求成功'
]));
$this->log('文件预览请求成功文件ID' . $file_id, 'info');
} catch (\Exception $e) {
// 解析错误信息
$errorMessage = $e->getMessage();
$errorCode = 500;
$errorType = 'preview_failed';
// 提取HTTP错误码和Dify错误信息
if (preg_match('/HTTP请求失败状态码(\d+),响应:(.*)/', $errorMessage, $matches)) {
$errorCode = (int) $matches[1];
$errorResponse = $matches[2];
try {
$errorData = json_decode($errorResponse, true);
if (isset($errorData['code'])) {
$errorType = $errorData['code'];
}
if (isset($errorData['message'])) {
$errorMessage = $errorData['message'];
}
} catch (\Exception $decodeEx) {
// 解析失败,使用原始错误信息
}
}
$conn->send(json_encode([
'type' => 'error',
'code' => $errorCode,
'error_type' => $errorType,
'message' => '文件预览失败:' . $errorMessage
]));
}
}
/**
* 封装文件预览的curl请求方法
* @param string $url 请求URL
* @param array $headers 请求头
* @return string 响应内容
*/
private function curlGetFile($url, $headers = [])
{
$ch = curl_init();
// 设置URL
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_HTTPGET, true);
// 设置请求头
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
// 设置返回值
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 60);
// 执行请求
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 关闭连接
curl_close($ch);
if ($response === false) {
throw new \Exception('Curl请求失败');
}
if ($httpCode >= 400) {
throw new \Exception('HTTP请求失败状态码' . $httpCode . ',响应:' . $response);
}
return $response;
}
/**
* 封装文件上传的curl请求方法适用于 Dify 1.9.0 版本)
* @param string $url 请求URL
* @param string $file_name 文件名
* @param string $file_type 文件类型
* @param string $file_content base64编码的文件内容
* @param string $user_id 用户ID
* @param array $headers 请求头
* @return string 响应内容
*/
private function curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers = [])
{
// 解码base64文件内容
$file_data = base64_decode($file_content);
if ($file_data === false) {
throw new \Exception('文件内容base64解码失败');
}
// 创建临时文件
$temp_file = tempnam(sys_get_temp_dir(), 'dify_upload_');
file_put_contents($temp_file, $file_data);
try {
$ch = curl_init();
// 设置URL
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_POST, true);
// 设置文件上传
$cfile = curl_file_create($temp_file, $file_type, $file_name);
$post_data = [
'file' => $cfile,
'user' => $user_id
];
// 设置POST数据
curl_setopt($ch, CURLOPT_POSTFIELDS, $post_data);
// 设置请求头 - 不设置 Content-Type让 curl 自动设置为 multipart/form-data
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
// 设置返回值
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 60);
// 执行请求
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 关闭连接
curl_close($ch);
if ($response === false) {
throw new \Exception('Curl请求失败');
}
if ($httpCode >= 400) {
throw new \Exception('HTTP请求失败状态码' . $httpCode . ',响应:' . $response);
}
return $response;
} finally {
// 清理临时文件
if (file_exists($temp_file)) {
unlink($temp_file);
}
}
}
/**
* 处理流式响应
* @param ConnectionInterface $conn
* @param string $url
* @param array $requestData
* @param array $headers
* @param string $query
* @param string $user_id
* @param string $site_id
*/
private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id, $site_id)
{
try {
// 记录开始处理流式请求
$this->log('AI客服WebSocket流式请求开始处理用户ID' . $user_id . ',请求内容:' . json_encode($requestData), 'info');
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$current_user_id = $user_id;
// 定义变量
$real_conversation_id = '';
$real_assistant_message_id = '';
$real_user_message_id = '';
$assistant_content = '';
$user_message_saved = false;
$user_message_content = $query;
$temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID用于失败回滚
// 立即保存用户消息使用临时会话ID
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $query);
$this->log('用户消息已立即保存临时会话ID' . $temp_conversation_id, 'info');
// 创建或更新临时会话
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('临时会话已创建ID' . $temp_conversation_id, 'info');
// WebSocket消息回调
$on_data = function ($data) use ($conn, &$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) {
try {
// 解析Dify的流式响应
$lines = explode("\n", $data);
foreach ($lines as $line) {
$line = trim($line);
if (empty($line))
continue;
// 查找以"data: "开头的行
if (strpos($line, 'data: ') === 0) {
$json_data = substr($line, 6);
$event_data = json_decode($json_data, true);
$this->log('-->获得的数据:' . $json_data, 'debug');
if (json_last_error() === JSON_ERROR_NONE && isset($event_data['event'])) {
$event = $event_data['event'];
$this->log('处理AI客服事件' . $event, 'info');
switch ($event_data['event']) {
case 'message':
// LLM返回文本块事件
if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['conversation_id'];
$this->log('获取到会话ID' . $real_conversation_id, 'info');
}
if (isset($event_data['message_id'])) {
$real_assistant_message_id = $event_data['message_id'];
$this->log('获取到助手消息ID' . $real_assistant_message_id, 'info');
}
// 积累助手回复内容
if (isset($event_data['answer'])) {
$current_chunk = $event_data['answer'];
$assistant_content .= $current_chunk;
$this->log('积累助手回复内容:' . $current_chunk, 'debug');
// 添加时间戳,确保消息顺序
$timestamp = microtime(true);
// 通过WebSocket发送消息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message',
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'answer' => $current_chunk,
'timestamp' => $timestamp,
'chunk_index' => uniqid()
]));
$this->log('向客户端发送消息: ' . $current_chunk, 'debug');
// 实时保存助手回复内容(流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('助手回复内容已实时保存,字数:' . strlen($assistant_content), 'debug');
}
}
// 更新用户消息的会话ID仅在第一次获取到真实会话ID时
if (!$user_message_saved && !empty($real_conversation_id)) {
// 将临时会话ID更新为真实会话ID
$kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true;
$this->log('用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
}
break;
case 'agent_message':
// Agent模式下返回文本块事件
if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['conversation_id'];
$this->log('获取到Agent模式会话ID' . $real_conversation_id, 'info');
}
if (isset($event_data['message_id'])) {
$real_assistant_message_id = $event_data['message_id'];
$this->log('获取到Agent模式助手消息ID' . $real_assistant_message_id, 'info');
}
// 积累助手回复内容
if (isset($event_data['answer'])) {
$current_chunk = $event_data['answer'];
$assistant_content .= $current_chunk;
$this->log('积累Agent回复内容' . $current_chunk, 'debug');
// 添加时间戳,确保消息顺序
$timestamp = microtime(true);
// 通过WebSocket发送消息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'agent_message',
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'answer' => $current_chunk,
'timestamp' => $timestamp,
'chunk_index' => uniqid()
]));
// 实时保存助手回复内容Agent模式流式过程中
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('Agent模式助手回复内容已实时保存字数' . strlen($assistant_content), 'debug');
}
}
// 更新用户消息的会话ID仅在第一次获取到真实会话ID时
if (!$user_message_saved && !empty($real_conversation_id)) {
// 将临时会话ID更新为真实会话ID
$kefu_message_model->updateMessage(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true;
$this->log('Agent模式下用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
}
break;
case 'agent_thought':
if (isset($event_data['thought'])) {
// 格式化思考过程
$thought_content = "\n[思考过程]: " . $event_data['thought'];
if (isset($event_data['tool'])) {
$thought_content .= "\n[使用工具]: " . $event_data['tool'];
}
if (isset($event_data['observation'])) {
$thought_content .= "\n[观察结果]: " . $event_data['observation'];
}
$assistant_content .= $thought_content;
$this->log('Agent思考过程' . $thought_content, 'debug');
// 通过WebSocket发送思考过程
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'agent_thought',
'thought' => $event_data['thought'],
'tool' => $event_data['tool'] ?? null,
'observation' => $event_data['observation'] ?? null
]));
}
break;
case 'file':
if (isset($event_data['id']) && isset($event_data['type']) && isset($event_data['url'])) {
$file_id = $event_data['id'];
$file_type = $event_data['type'];
$file_url = $event_data['url'];
// 可以将文件信息作为特殊内容添加到回复中
$file_content = "\n[文件]: " . $file_type . " - " . $file_url;
$assistant_content .= $file_content;
$this->log('收到文件事件:' . $file_type . ' - ' . $file_url, 'info');
// 通过WebSocket发送文件信息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'file',
'id' => $file_id,
'type' => $file_type,
'url' => $file_url
]));
}
break;
case 'message_start':
if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['conversation_id'];
$this->log('消息开始事件会话ID' . $real_conversation_id, 'info');
// 通过WebSocket发送消息开始事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_start',
'conversation_id' => $real_conversation_id
]));
}
break;
case 'message_delta':
if (isset($event_data['delta']['content'])) {
$assistant_content .= $event_data['delta']['content'];
$this->log('积累增量内容:' . $event_data['delta']['content'], 'debug');
// 通过WebSocket发送增量内容
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_delta',
'delta' => $event_data['delta'],
// 'full_content' => $assistant_content
]));
// 实时保存助手回复内容(增量流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
$this->log('助手增量回复内容已实时保存,字数:' . strlen($assistant_content), 'debug');
}
}
break;
case 'message_end':
// 最终内容已通过message或message_delta事件积累
$this->log('消息结束事件会话ID' . $real_conversation_id . '消息ID' . $real_assistant_message_id, 'info');
// 通过WebSocket发送消息结束事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_end',
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content
]));
break;
case 'error':
$error_message = isset($event_data['message']) ? $event_data['message'] : '流式输出异常';
$assistant_content .= "\n[错误]: " . $error_message;
$this->log('AI客服错误事件' . $error_message, 'error');
// 通过WebSocket发送错误事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'error',
'message' => $error_message
]));
break;
case 'ping':
// 保持连接存活的ping事件
// 无需特殊处理,继续保持连接
$this->log('收到ping事件', 'debug');
break;
}
} else {
$this->log('AI客服事件解析失败' . $json_data, 'error');
}
}
}
} catch (\Exception $e) {
$this->log('AI客服事件处理异常' . $e->getMessage(), 'error');
$conn->send(json_encode([
'stream' => 1,
'type' => 'error',
'message' => $e->getMessage()
]));
}
};
// 错误处理回调函数
$on_error = function ($error) use ($user_id, $conn) {
$this->log('AI客服请求错误用户ID' . $user_id . ',错误信息:' . json_encode($error), 'error');
$conn->send(json_encode(['type' => 'error', 'message' => $error]));
};
// 存储流式请求信息
$requestId = $conn->resourceId;
$this->streamingRequests[$requestId] = [
'conn' => $conn,
'user_id' => $user_id,
'conversation_id' => $real_conversation_id,
'started_at' => time(),
'is_active' => true
];
$this->log('开始流式请求请求ID' . $requestId, 'info');
// 检查客户端连接状态的回调
$on_check = function () use ($conn, $requestId) {
// 检查连接是否仍然在客户端列表中通过检查clientData
if (!isset($this->clientData[$requestId])) {
$this->log('客户端连接已关闭,停止流式请求:' . $requestId, 'info');
return false;
}
// 检查请求是否被标记为已停止
if (isset($this->streamingRequests[$requestId]) && !$this->streamingRequests[$requestId]['is_active']) {
$this->log('流式请求已被标记为停止:' . $requestId, 'info');
return false;
}
return true;
};
// 流式完成回调:仅在上游流真正结束后才触发(避免立刻发送 done
$on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use ($conn, $requestId, $user_id, &$real_conversation_id, &$real_assistant_message_id, &$assistant_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) {
// 从流式请求列表中移除
if (isset($this->streamingRequests[$requestId])) {
unset($this->streamingRequests[$requestId]);
$this->log('移除流式请求请求ID' . $requestId, 'info');
}
if ($errno !== 0 && $err) {
$this->log('AI客服请求结束但存在错误用户ID' . $user_id . ',错误:' . $err, 'error');
}
// 被中断(例如客户端断开)不发送 done
if (!$aborted && isset($this->clientData[$requestId])) {
$done_data = [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
];
$conn->send(json_encode(['type' => 'message', 'event' => 'done', 'data' => $done_data]));
}
// 只有非中断且有内容时,标记 completed
if (
!$aborted &&
!empty($real_conversation_id) &&
!empty($real_assistant_message_id) &&
!empty($assistant_content)
) {
$this->saveStreamingAssistantMessage(
$kefu_message_model,
$site_id,
$current_user_id,
$real_conversation_id,
$real_assistant_message_id,
$assistant_content,
'completed'
);
$this->log('AI客服回复已标记为完成状态会话ID' . $real_conversation_id . ',总字数:' . strlen($assistant_content), 'info');
}
// 清理临时数据(无论是否中断,都需要清理临时会话)
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('AI客服请求处理完成用户ID' . $user_id . '会话ID' . $real_conversation_id, 'info');
};
// 调用curl流式请求异步
$this->curlRequestStreaming($url, 'POST', $requestData, $headers, $on_data, $on_error, $on_check, $on_complete);
} catch (\Exception $e) {
$error_msg = 'AI客服请求异常' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile();
$this->log($error_msg, 'error');
$conn->send(json_encode(['type' => 'error', 'message' => $error_msg]));
// 异常时清理临时数据
try {
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $user_id, $temp_conversation_id);
} catch (\Exception $cleanupException) {
$this->log('清理临时数据时也发生异常:' . $cleanupException->getMessage(), 'error');
}
}
}
/**
* 通用的curl流式请求函数
* @param string $url 请求URL
* @param string $method 请求方法
* @param array $data 请求数据
* @param array $headers 请求头
* @param callable|null $on_data 数据回调函数,接收原始数据
* @param callable|null $on_error 错误回调函数,接收错误信息
* @param callable|null $on_check 检查是否应该继续请求的回调函数
* @param callable|null $on_complete 完成回调函数(请求结束/中断时触发)
* @return bool 请求是否成功
*/
private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null, $on_check = null, $on_complete = null)
{
try {
$ch = curl_init();
$aborted = false;
// 基础设置
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
if ($method === 'POST' && !empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data);
}
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
} else {
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
}
// 流式/非阻塞相关
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 0);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 30);
curl_setopt($ch, CURLOPT_BUFFERSIZE, 1);
curl_setopt($ch, CURLOPT_TCP_NODELAY, true);
curl_setopt($ch, CURLOPT_FRESH_CONNECT, true);
curl_setopt($ch, CURLOPT_FORBID_REUSE, true);
curl_setopt($ch, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
// 到一块就立即触发
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $chunk) use ($on_data, $on_check, &$aborted) {
if (is_callable($on_check) && !$on_check()) {
$this->log('请求被中断,停止处理数据', 'info');
$aborted = true;
return -1; // 中断
}
$this->log('收到数据块,大小:' . strlen($chunk), 'debug');
if (is_callable($on_data)) {
$on_data($chunk);
}
return strlen($chunk);
});
$mh = curl_multi_init();
curl_multi_add_handle($mh, $ch);
$loop = Loop::get();
$timer = null;
$cleanup = function () use (&$mh, &$ch, &$timer, $loop) {
if (isset($timer)) {
$loop->cancelTimer($timer);
}
if ($mh && $ch) {
curl_multi_remove_handle($mh, $ch);
curl_close($ch);
curl_multi_close($mh);
}
};
// 定时推进 multi 状态机,避免阻塞事件循环
$timer = $loop->addPeriodicTimer(0.01, function () use (&$timer, $mh, $ch, $on_error, $cleanup, $on_complete, &$aborted) {
do {
$status = curl_multi_exec($mh, $active);
} while ($status === CURLM_CALL_MULTI_PERFORM);
// 非正常状态直接报错并清理
if ($status !== CURLM_OK && $status !== CURLM_CALL_MULTI_PERFORM) {
$msg = 'Curl multi 执行异常,状态:' . $status;
$this->log($msg, 'error');
if (is_callable($on_error)) {
$on_error($msg);
}
if (is_callable($on_complete)) {
$on_complete(true, 1, $msg);
}
$cleanup();
return;
}
// 读取完成/错误事件
while ($info = curl_multi_info_read($mh)) {
if ($info['msg'] === CURLMSG_DONE) {
$errno = curl_errno($ch);
$err = null;
if ($errno !== 0) {
$err = curl_error($ch);
// 忽略回调中断错误
if (strpos($err, 'callback') === false) {
$this->log('Curl请求错误' . $err, 'error');
if (is_callable($on_error)) {
$on_error($err);
}
} else {
$this->log('请求被回调中断', 'info');
$aborted = true;
}
}
if (is_callable($on_complete)) {
$on_complete($aborted, $errno, $err);
}
$cleanup();
return;
}
}
});
return true;
} catch (\Exception $e) {
$this->log(json_encode(["event" => "error", "data" => $e->getMessage(), "line" => $e->getLine(), "file" => $e->getFile()]), 'error');
if (is_callable($on_error)) {
$on_error($e->getMessage());
}
return false;
}
}
/**
* 封装curl请求方法
* @param string $url 请求URL
* @param string $method 请求方法
* @param array $data 请求数据
* @param array $headers 请求头
* @return string 响应内容
*/
private function curlRequest($url, $method = 'GET', $data = [], $headers = [])
{
$ch = curl_init();
// 设置URL
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
// 设置POST数据
if ($method === 'POST' && !empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data);
}
// 设置请求头
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
// 设置返回值
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 30);
// 执行请求
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 关闭连接
curl_close($ch);
if ($response === false) {
throw new \Exception('Curl请求失败');
}
if ($httpCode >= 400) {
throw new \Exception('HTTP请求失败状态码' . $httpCode);
}
return $response;
}
/**
* 验证参数并获取配置
* @param array $params_rules 参数验证规则
* @return array
* @throws \Exception
*/
private function validateAndGetConfig($params_rules = [], $params = [])
{
// 参数验证规则
$rules = [];
// 合并参数验证规则
$rules = array_merge($rules, $params_rules);
// 验证参数
foreach ($rules as $field => $rule) {
if (isset($rule['required']) && $rule['required'] && empty($params[$field])) {
throw new \Exception($rule['message']);
}
}
// 获取站点ID
$site_id = $params['uniacid'] ?? 0;
if (empty($site_id)) {
throw new \Exception('站点ID不能为空');
}
// 获取智能客服配置
$kefu_config_model = new KefuConfigModel();
$config_info = $kefu_config_model->getConfig($site_id)['data']['value'] ?? [];
if (empty($config_info) || $config_info['status'] != 1) {
throw new \Exception('智能客服暂未启用');
}
return $config_info;
}
/**
* 构建请求数据
* @param string $message 用户消息
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param bool $stream 是否使用流式响应
* @param array $origin_data 原始数据
* @return array
*/
private function buildRequestData($message, $user_id, $conversation_id, $stream, $origin_data)
{
$requestData = [
'inputs' => [],
'query' => $message,
'response_mode' => $stream ? 'streaming' : 'blocking',
'user' => $user_id,
];
// 如果有会话ID添加到请求中
if (!empty($conversation_id)) {
$requestData['conversation_id'] = $conversation_id;
// ----- 只有会话ID的情况下下列情况才添加相关的数据
// 如果有files字段添加到请求中
if (!empty($origin_data['files']) && count($origin_data['files']) > 0) {
$requestData['files'] = $origin_data['files'];
}
}
return $requestData;
}
/**
* 构建请求头
* @param string $api_key API密钥
* @return array
*/
private function buildRequestHeaders($api_key)
{
return [
'Content-Type: application/json',
'Authorization: Bearer ' . $api_key,
];
}
/**
* 保存用户消息
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @return void
*/
private function saveUserMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content)
{
$message_model->addMessage([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'user',
'content' => $content,
]);
}
/**
* 保存助手消息
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @return void
*/
private function saveAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content)
{
$message_model->addMessage([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'assistant',
'content' => $content,
]);
}
/**
* 更新或创建会话
* @param KefuConversationModel $conversation_model 会话模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @return void
*/
private function updateOrCreateConversation($conversation_model, $site_id, $user_id, $conversation_id)
{
$conversation_info = $conversation_model->getConversationInfo([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
]);
if (empty($conversation_info['data'])) {
// 创建新会话
$conversation_model->addConversation([
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'name' => '智能客服会话',
]);
} else {
// 更新会话状态
$conversation_model->updateConversation([
'status' => 1,
'update_time' => date('Y-m-d H:i:s')
], [
['id', '=', $conversation_info['data']['id']],
]);
}
}
/**
* 实时保存助手消息内容(流式过程中)
* @param KefuMessageModel $message_model 消息模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $message_id 消息ID
* @param string $content 消息内容
* @param string $status 消息状态streaming流式中、completed已完成
* @return void
*/
private function saveStreamingAssistantMessage($message_model, $site_id, $user_id, $conversation_id, $message_id, $content, $status = 'streaming')
{
// 检查是否已存在该消息(用于更新)
$existing_message = $message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['message_id', '=', $message_id],
['role', '=', 'assistant']
]);
$message_data = [
'site_id' => $site_id,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'role' => 'assistant',
'content' => $content,
'status' => $status, // 新增状态字段
'update_time' => date('Y-m-d H:i:s')
];
if (empty($existing_message['data'])) {
// 新增消息
$message_model->addMessage($message_data);
} else {
// 更新消息内容
$message_model->updateMessage($message_data, [
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $conversation_id],
['message_id', '=', $message_id],
['role', '=', 'assistant']
]);
}
}
/**
* 清理临时消息和会话
* @param KefuMessageModel $message_model 消息模型
* @param KefuConversationModel $conversation_model 会话模型
* @param string $site_id 站点ID
* @param string $user_id 用户ID
* @param string $temp_conversation_id 临时会话ID
* @return void
*/
private function cleanupTempData($message_model, $conversation_model, $site_id, $user_id, $temp_conversation_id)
{
// 删除临时消息
$message_model->deleteMessage([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $temp_conversation_id]
]);
// 删除临时会话
$conversation_model->deleteConversation([
['site_id', '=', $site_id],
['user_id', '=', $user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$this->log('临时数据已清理会话ID' . $temp_conversation_id, 'info');
}
/**
* 日志记录封装方法
* @param string $message 日志内容
* @param string $level 日志级别默认为info
* @return void
*/
private function log($message, $level = 'info')
{
// 只允许info、error级别
if (!in_array($level, ['info', 'error', 'debug'])) {
return;
}
log_write($message, $level, 'ws.log', 2);
}
/**
* 处理非流式响应
* @param string $url 请求URL
* @param array $requestData 请求数据
* @param array $headers 请求头
* @param string $message 用户消息
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $site_id 站点ID
* @return array
* @throws \Exception
*/
private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id, $site_id)
{
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$current_user_id = $user_id;
// 开启事务,确保数据一致性(对齐 Kefu.php 的非流式存储行为)
Db::startTrans();
try {
// 发送请求
$response = $this->curlRequest($url, 'POST', $requestData, $headers);
$response_data = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('解析响应失败');
}
if (empty($response_data) || !isset($response_data['conversation_id'])) {
throw new \Exception('API返回数据格式错误或缺少必要字段');
}
$real_conversation_id = $response_data['conversation_id'] ?? $conversation_id;
$real_assistant_message_id = $response_data['message_id'] ?? ($response_data['id'] ?? '');
$assistant_content = $response_data['answer'] ?? '';
// 去重:用户消息(避免客户端重试导致重复写入)
$existing_user_message = $kefu_message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $real_conversation_id],
['role', '=', 'user'],
['content', '=', $message ?? '']
]);
if (empty($existing_user_message['data'])) {
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $message ?? '');
}
// 去重:助手消息
$existing_assistant_message = $kefu_message_model->getMessageInfo([
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $real_conversation_id],
['role', '=', 'assistant'],
['message_id', '=', $real_assistant_message_id]
]);
if (empty($existing_assistant_message['data'])) {
$this->saveAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content);
}
// 更新或创建会话
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $real_conversation_id);
Db::commit();
return [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
'answer' => $assistant_content,
'status' => 'completed'
];
} catch (\Exception $e) {
Db::rollback();
throw $e;
}
}
}