feat(websocket): 实现文件分片上传功能并重构认证逻辑

重构WebSocketBase类,移除冗余属性,简化认证参数传递方式。新增文件分片上传功能,包括分片上传、合并、状态检查等完整流程。前端页面添加文件上传UI组件,支持断点续传和进度显示。优化认证逻辑,统一使用data参数传递认证信息,提高代码可维护性。
This commit is contained in:
2026-01-24 15:04:22 +08:00
parent aa64c475e9
commit d975abb3de
3 changed files with 834 additions and 91 deletions

View File

@@ -16,15 +16,6 @@ use React\EventLoop\Loop;
class WebSocket extends WebSocketBase
{
// 控制器属性用于替代BaseApi中的属性
public $params;
public $token;
protected $user_id;
protected $site_id;
protected $uniacid;
protected $site_ids = [];
public $app_type;
// 存储正在进行的流式请求信息
protected $streamingRequests = [];
@@ -32,14 +23,6 @@ class WebSocket extends WebSocketBase
{
// 调用父类构造函数传入当前addon名称
parent::__construct('aikefu');
// 初始化控制器属性
$this->params = [];
$this->token = '';
$this->user_id = 0;
$this->site_id = 0;
$this->uniacid = 0;
$this->app_type = 'weapp'; // 默认微信小程序
}
/**
@@ -101,7 +84,7 @@ class WebSocket extends WebSocketBase
if (isset($data['action']) && $data['action'] === 'ping') {
$conn->send(json_encode(['type' => 'pong']));
return;
}
}
// 处理聊天消息
if (isset($data['action']) && $data['action'] === 'chat') {
@@ -109,7 +92,28 @@ class WebSocket extends WebSocketBase
return;
}
// 处理分片上传
if (isset($data['action']) && $data['action'] === 'upload_chunk') {
$this->handleUploadChunk($conn, $data);
return;
}
// 处理分片合并
if (isset($data['action']) && $data['action'] === 'upload_merge') {
$this->handleMergeChunks($conn, $data);
return;
}
// 处理分片状态检查
if (isset($data['action']) && $data['action'] === 'upload_check') {
$this->handleCheckChunks($conn, $data);
return;
}
$conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action']));
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
@@ -155,12 +159,12 @@ class WebSocket extends WebSocketBase
* - WebSocket 连接场景没有 request()/input(),所以这里直接根据客户端传入的 site_id + token 解密校验
* - 校验 token 解密成功、未过期、且 token 内 member_id 与传入 member_id 一致
*/
protected function doAuth(ConnectionInterface $conn, $site_id, $user_id, $token)
protected function doAuth(ConnectionInterface $conn, $data)
{
// 与 Kefu.php 保持一致,支持使用 uniacid 作为站点ID
$site_id = (int)$site_id;
$user_id = (int)$user_id;
$token = (string)$token;
$site_id = (int) ($data['uniacid'] ?? $data['site_id'] ?? 0);
$user_id = (int) ($data['user_id'] ?? 0);
$token = (string) ($data['token'] ?? '');
if ($site_id <= 0 || $user_id <= 0 || $token === '') {
throw new \Exception('Missing authentication parameters');
@@ -189,18 +193,18 @@ class WebSocket extends WebSocketBase
$this->log('decrypt:' . $decrypt, 'info');
$data = json_decode($decrypt, true);
if (!is_array($data) || empty($data['member_id'])) {
$decrypted_data = json_decode($decrypt, true);
if (!is_array($decrypted_data) || empty($decrypted_data['member_id'])) {
throw new \Exception('TOKEN_ERROR');
}
// member_id 必须一致,避免冒用
if ((int)$data['member_id'] !== $user_id) {
if ((int) $decrypted_data['member_id'] !== $user_id) {
throw new \Exception('TOKEN_ERROR');
}
// 过期校验expire_time=0 为永久,其余必须未过期
$expire_time = (int)($data['expire_time'] ?? 0);
$expire_time = (int) ($decrypted_data['expire_time'] ?? 0);
if ($expire_time !== 0 && $expire_time < time()) {
throw new \Exception('TOKEN_EXPIRE');
}
@@ -240,25 +244,24 @@ class WebSocket extends WebSocketBase
$stream = $data['stream'] ?? false;
$response_mode = $data['response_mode'] ?? 'streaming'; // 与 Kefu.php 保持一致
// 设置当前控制器的属性,与 Kefu.php 保持一致的参数优先级
$this->site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$this->user_id = $data['user_id'] ?? $clientInfo['user_id'];
$this->token = $data['token'] ?? $clientInfo['token'];
$this->params = [
'query' => $query,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'stream' => $stream,
'response_mode' => $response_mode,
'uniacid' => $this->site_id,
'user_id' => $this->user_id,
'token' => $this->token,
];
// 获取当前连接的客户端信息
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$token = $data['token'] ?? $clientInfo['token'];
// 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([
'query' => ['required' => true, 'message' => '参数错误,请检查 `query` 参数是否设置正确', 'description' => '消息内容'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
], [
'query' => $query,
'user_id' => $user_id,
'conversation_id' => $conversation_id,
'stream' => $stream,
'response_mode' => $response_mode,
'uniacid' => $site_id,
'user_id' => $user_id,
'token' => $token,
]);
// 是否启用流式响应,与 Kefu.php 保持一致
@@ -273,10 +276,10 @@ class WebSocket extends WebSocketBase
if ($enable_stream) {
// 处理流式响应
$this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id);
$this->handleStreamingResponse($conn, $url, $requestData, $headers, $query, $user_id, $site_id);
} else {
// 处理非流式响应
$response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id);
$response = $this->handleBlockingResponse($url, $requestData, $headers, $query, $user_id, $conversation_id, $site_id);
$conn->send(json_encode(['type' => 'message', 'data' => $response]));
}
} catch (\Exception $e) {
@@ -284,6 +287,401 @@ class WebSocket extends WebSocketBase
}
}
/**
* 处理分片上传
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleUploadChunk(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取分片上传相关参数
$file_id = $data['file_id'] ?? '';
$file_name = $data['file_name'] ?? '';
$file_type = $data['file_type'] ?? '';
$chunk_index = $data['chunk_index'] ?? 0;
$total_chunks = $data['total_chunks'] ?? 0;
$chunk_size = $data['chunk_size'] ?? 0;
$file_size = $data['file_size'] ?? 0;
$file_content = $data['file_content'] ?? '';
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
// 验证参数
if (empty($file_id) || empty($file_name) || empty($file_content)) {
throw new \Exception('分片上传参数不完整');
}
// 创建临时目录存储分片
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
if (!is_dir($temp_dir)) {
mkdir($temp_dir, 0777, true);
}
// 保存分片文件
$chunk_file = $temp_dir . '/' . $chunk_index;
$file_data = base64_decode($file_content);
if ($file_data === false) {
throw new \Exception('分片内容base64解码失败');
}
// 写入分片文件
file_put_contents($chunk_file, $file_data);
// 发送分片上传成功响应
$conn->send(json_encode([
'type' => 'chunk_uploaded',
'file_id' => $file_id,
'chunk_index' => $chunk_index,
'message' => '分片上传成功'
]));
$this->log('分片上传成功文件ID' . $file_id . ',分片索引:' . $chunk_index . '/' . $total_chunks, 'info');
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '分片上传失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? '']));
}
}
/**
* 处理分片合并
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleMergeChunks(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取合并相关参数
$file_id = $data['file_id'] ?? '';
$file_name = $data['file_name'] ?? '';
$file_type = $data['file_type'] ?? '';
$total_chunks = $data['total_chunks'] ?? 0;
$file_size = $data['file_size'] ?? 0;
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
$site_id = $data['uniacid'] ?? $clientInfo['site_id'];
$token = $data['token'] ?? $clientInfo['token'];
// 验证参数
if (empty($file_id) || empty($file_name)) {
throw new \Exception('合并参数不完整');
}
// 获取临时目录
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
if (!is_dir($temp_dir)) {
throw new \Exception('分片存储目录不存在');
}
// 验证所有分片是否存在
for ($i = 0; $i < $total_chunks; $i++) {
$chunk_file = $temp_dir . '/' . $i;
if (!file_exists($chunk_file)) {
throw new \Exception('分片文件缺失:' . $i);
}
}
// 合并分片
$merged_file = $temp_dir . '/merged_' . $file_name;
$merged_handle = fopen($merged_file, 'wb');
if (!$merged_handle) {
throw new \Exception('创建合并文件失败');
}
// 按顺序读取并合并分片
for ($i = 0; $i < $total_chunks; $i++) {
$chunk_file = $temp_dir . '/' . $i;
$chunk_handle = fopen($chunk_file, 'rb');
if (!$chunk_handle) {
fclose($merged_handle);
throw new \Exception('打开分片文件失败:' . $i);
}
// 读取分片内容并写入合并文件
while (!feof($chunk_handle)) {
$buffer = fread($chunk_handle, 8192);
fwrite($merged_handle, $buffer);
}
fclose($chunk_handle);
}
fclose($merged_handle);
// 验证合并后的文件大小
if (filesize($merged_file) !== $file_size) {
throw new \Exception('合并后的文件大小与预期不符');
}
// 验证参数并获取配置,与 Kefu.php 保持一致
$config = $this->validateAndGetConfig([
'file_name' => ['required' => true, 'message' => '文件名不能为空', 'description' => '文件名'],
'file_type' => ['required' => true, 'message' => '文件类型不能为空', 'description' => '文件类型'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
], [
'file_name' => $file_name,
'file_type' => $file_type,
'user_id' => $user_id,
'uniacid' => $site_id,
'token' => $token
]);
// 发送请求到Dify API
$url = $config['base_url'] . '/files/upload';
// 构建请求头
$headers = [
'Authorization: Bearer ' . $config['api_key'],
];
// 读取合并后的文件内容
$file_content = base64_encode(file_get_contents($merged_file));
// 发送文件上传请求(使用 multipart/form-data 格式)
$response = $this->curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers);
// 解析响应
$result = json_decode($response, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('解析响应失败');
}
// 验证响应数据
if (empty($result) || !isset($result['id'])) {
throw new \Exception('API返回数据格式错误或缺少必要字段');
}
// 清理临时文件
$this->cleanupTempFiles($temp_dir);
// 发送合并成功响应
$conn->send(json_encode([
'type' => 'upload_success',
'file_id' => $result['id'] ?? '',
'file_name' => $result['name'] ?? $file_name,
'file_size' => $result['size'] ?? $file_size,
'file_extension' => $result['extension'] ?? '',
'file_mime_type' => $result['mime_type'] ?? $file_type,
'file_created_by' => $result['created_by'] ?? '',
'file_created_at' => $result['created_at'] ?? '',
'file_url' => $result['url'] ?? ''
]));
$this->log('文件上传成功用户ID' . $user_id . ',文件名:' . $file_name . '文件ID' . $result['id'], 'info');
} catch (\Exception $e) {
// 清理临时文件
if (!empty($data['file_id'])) {
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $data['file_id'];
if (is_dir($temp_dir)) {
$this->cleanupTempFiles($temp_dir);
}
}
// 解析错误信息
$errorMessage = $e->getMessage();
$errorCode = 500;
$errorType = 'upload_failed';
// 提取HTTP错误码和Dify错误信息
if (preg_match('/HTTP请求失败状态码(\d+),响应:(.*)/', $errorMessage, $matches)) {
$errorCode = (int)$matches[1];
$errorResponse = $matches[2];
try {
$errorData = json_decode($errorResponse, true);
if (isset($errorData['code'])) {
$errorType = $errorData['code'];
}
if (isset($errorData['message'])) {
$errorMessage = $errorData['message'];
}
} catch (\Exception $decodeEx) {
// 解析失败,使用原始错误信息
}
}
$conn->send(json_encode([
'type' => 'error',
'code' => $errorCode,
'error_type' => $errorType,
'message' => '文件上传失败:' . $errorMessage
]));
}
}
/**
* 清理临时文件
* @param string $dir 临时目录
*/
private function cleanupTempFiles($dir)
{
if (!is_dir($dir)) {
return;
}
$files = scandir($dir);
foreach ($files as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$file_path = $dir . '/' . $file;
if (is_file($file_path)) {
unlink($file_path);
} elseif (is_dir($file_path)) {
$this->cleanupTempFiles($file_path);
}
}
rmdir($dir);
}
/**
* 处理分片状态检查
* @param ConnectionInterface $conn
* @param array $data
*/
private function handleCheckChunks(ConnectionInterface $conn, $data)
{
try {
// 获取客户端信息
$clientInfo = $this->clientData[$conn->resourceId];
// 获取检查相关参数
$file_id = $data['file_id'] ?? '';
$total_chunks = $data['total_chunks'] ?? 0;
$user_id = $data['user_id'] ?? $clientInfo['user_id'];
// 验证参数
if (empty($file_id)) {
throw new \Exception('文件ID不能为空');
}
// 获取临时目录
$temp_dir = sys_get_temp_dir() . '/dify_uploads/' . $file_id;
$uploaded_chunks = [];
// 检查临时目录是否存在
if (is_dir($temp_dir)) {
// 扫描目录中的分片文件
$files = scandir($temp_dir);
foreach ($files as $file) {
if ($file === '.' || $file === '..' || strpos($file, 'merged_') === 0) {
continue;
}
// 检查是否是数字文件名(分片索引)
if (is_numeric($file)) {
$chunk_index = (int) $file;
// 检查分片文件是否存在且不为空
if (file_exists($temp_dir . '/' . $file) && filesize($temp_dir . '/' . $file) > 0) {
$uploaded_chunks[] = $chunk_index;
}
}
}
}
// 发送分片状态响应
$conn->send(json_encode([
'type' => 'chunks_status',
'file_id' => $file_id,
'uploaded_chunks' => $uploaded_chunks,
'total_chunks' => $total_chunks,
'message' => '分片状态检查成功'
]));
$this->log('分片状态检查成功文件ID' . $file_id . ',已上传分片数:' . count($uploaded_chunks) . '/' . $total_chunks, 'info');
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '分片状态检查失败:' . $e->getMessage(), 'file_id' => $data['file_id'] ?? '']));
}
}
/**
* 封装文件上传的curl请求方法适用于 Dify 1.9.0 版本)
* @param string $url 请求URL
* @param string $file_name 文件名
* @param string $file_type 文件类型
* @param string $file_content base64编码的文件内容
* @param string $user_id 用户ID
* @param array $headers 请求头
* @return string 响应内容
*/
private function curlFileUpload($url, $file_name, $file_type, $file_content, $user_id, $headers = [])
{
// 解码base64文件内容
$file_data = base64_decode($file_content);
if ($file_data === false) {
throw new \Exception('文件内容base64解码失败');
}
// 创建临时文件
$temp_file = tempnam(sys_get_temp_dir(), 'dify_upload_');
file_put_contents($temp_file, $file_data);
try {
$ch = curl_init();
// 设置URL
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_POST, true);
// 设置文件上传
$cfile = curl_file_create($temp_file, $file_type, $file_name);
$post_data = [
'file' => $cfile,
'user' => $user_id
];
// 设置POST数据
curl_setopt($ch, CURLOPT_POSTFIELDS, $post_data);
// 设置请求头 - 不设置 Content-Type让 curl 自动设置为 multipart/form-data
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
// 设置返回值
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 60);
// 执行请求
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 关闭连接
curl_close($ch);
if ($response === false) {
throw new \Exception('Curl请求失败');
}
if ($httpCode >= 400) {
throw new \Exception('HTTP请求失败状态码' . $httpCode . ',响应:' . $response);
}
return $response;
} finally {
// 清理临时文件
if (file_exists($temp_file)) {
unlink($temp_file);
}
}
}
/**
* 处理流式响应
* @param ConnectionInterface $conn
@@ -292,8 +690,9 @@ class WebSocket extends WebSocketBase
* @param array $headers
* @param string $query
* @param string $user_id
* @param string $site_id
*/
private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id)
private function handleStreamingResponse(ConnectionInterface $conn, $url, $requestData, $headers, $query, $user_id, $site_id)
{
try {
// 记录开始处理流式请求
@@ -302,8 +701,7 @@ class WebSocket extends WebSocketBase
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->user_id;
$current_user_id = $user_id;
// 定义变量
$real_conversation_id = '';
@@ -613,7 +1011,7 @@ class WebSocket extends WebSocketBase
$this->log('开始流式请求请求ID' . $requestId, 'info');
// 检查客户端连接状态的回调
$on_check = function() use ($conn, $requestId) {
$on_check = function () use ($conn, $requestId) {
// 检查连接是否仍然在客户端列表中通过检查clientData
if (!isset($this->clientData[$requestId])) {
$this->log('客户端连接已关闭,停止流式请求:' . $requestId, 'info');
@@ -628,19 +1026,7 @@ class WebSocket extends WebSocketBase
};
// 流式完成回调:仅在上游流真正结束后才触发(避免立刻发送 done
$on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use (
$conn,
$requestId,
$user_id,
&$real_conversation_id,
&$real_assistant_message_id,
&$assistant_content,
$kefu_message_model,
$kefu_conversation_model,
$site_id,
$current_user_id,
$temp_conversation_id
) {
$on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use ($conn, $requestId, $user_id, &$real_conversation_id, &$real_assistant_message_id, &$assistant_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) {
// 从流式请求列表中移除
if (isset($this->streamingRequests[$requestId])) {
unset($this->streamingRequests[$requestId]);
@@ -698,9 +1084,7 @@ class WebSocket extends WebSocketBase
try {
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->member_id;
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $user_id, $temp_conversation_id);
} catch (\Exception $cleanupException) {
$this->log('清理临时数据时也发生异常:' . $cleanupException->getMessage(), 'error');
}
@@ -894,7 +1278,7 @@ class WebSocket extends WebSocketBase
* @return array
* @throws \Exception
*/
private function validateAndGetConfig($params_rules = [])
private function validateAndGetConfig($params_rules = [], $params = [])
{
// 参数验证规则
$rules = [];
@@ -904,14 +1288,20 @@ class WebSocket extends WebSocketBase
// 验证参数
foreach ($rules as $field => $rule) {
if (isset($rule['required']) && $rule['required'] && empty($this->params[$field])) {
if (isset($rule['required']) && $rule['required'] && empty($params[$field])) {
throw new \Exception($rule['message']);
}
}
// 获取站点ID
$site_id = $params['uniacid'] ?? 0;
if (empty($site_id)) {
throw new \Exception('站点ID不能为空');
}
// 获取智能客服配置
$kefu_config_model = new KefuConfigModel();
$config_info = $kefu_config_model->getConfig($this->site_id)['data']['value'] ?? [];
$config_info = $kefu_config_model->getConfig($site_id)['data']['value'] ?? [];
if (empty($config_info) || $config_info['status'] != 1) {
throw new \Exception('智能客服暂未启用');
@@ -1136,16 +1526,16 @@ class WebSocket extends WebSocketBase
* @param string $message 用户消息
* @param string $user_id 用户ID
* @param string $conversation_id 会话ID
* @param string $site_id 站点ID
* @return array
* @throws \Exception
*/
private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id)
private function handleBlockingResponse($url, $requestData, $headers, $message, $user_id, $conversation_id, $site_id)
{
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->member_id;
$current_user_id = $user_id;
// 开启事务,确保数据一致性(对齐 Kefu.php 的非流式存储行为)
Db::startTrans();