chore(websocket): 已经初步实现服务器端按照流式请求反馈信息的功能

This commit is contained in:
2026-01-20 15:03:53 +08:00
parent e6929aa1f5
commit 05b80040f6
9 changed files with 752 additions and 315 deletions

View File

@@ -12,10 +12,8 @@ COPY ./sites-enabled/ /etc/nginx/sites-enabled/
# 暴露端口
EXPOSE 80 443
# 添加在Dockerfile末尾CMD命令之前
COPY ./entrypoint.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/entrypoint.sh
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
# 直接在Dockerfile中执行权限设置不使用entrypoint.sh
RUN mkdir -p /var/log/nginx && chmod -R 0444 /etc/nginx/conf.c && chmod 0444 /etc/nginx/conf.d/default.conf && chmod -R 0755 /etc/nginx/sites-enabled
# 启动nginx
CMD ["nginx", "-g", "daemon off;"]

View File

@@ -8,6 +8,13 @@
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 禁用缓冲确保WebSocket数据实时传输
proxy_buffering off;
proxy_buffer_size 4k;
proxy_buffers 4 4k;
proxy_busy_buffers_size 4k;
proxy_max_temp_file_size 0;
# 可选设置超时WebSocket 是长连接)
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;

View File

@@ -20,13 +20,13 @@
# add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# --- SSL configuration end ---
# 启用 WebSocket 支持
include conf.c/enable-websocket.conf;
#PHP-INFO-START PHP引用配置可以注释或修改
include conf.c/enable-php-74.conf;
#PHP-INFO-END
# 启用 WebSocket 支持
include conf.c/enable-websocket.conf;
# --- REWRITE-START --- URL重写规则引用,修改后将导致面板设置的伪静态规则失效
# include /www/server/panel/vhost/rewrite/xcx30.5g-quickapp.com.conf; # 等于下面的内容
location / {

View File

@@ -9,6 +9,7 @@ use app\api\controller\WebSocketBase;
use Ratchet\ConnectionInterface;
use think\facade\Db as Db;
use think\facade\Config;
use React\EventLoop\Loop;
class WebSocket extends WebSocketBase
@@ -21,12 +22,15 @@ class WebSocket extends WebSocketBase
protected $uniacid;
protected $site_ids = [];
public $app_type;
// 存储正在进行的流式请求信息
protected $streamingRequests = [];
public function __construct()
{
// 调用父类构造函数传入当前addon名称
parent::__construct('aikefu');
// 初始化控制器属性
$this->params = [];
$this->token = '';
@@ -35,7 +39,7 @@ class WebSocket extends WebSocketBase
$this->uniacid = 0;
$this->app_type = 'weapp'; // 默认微信小程序
}
/**
* 当有新客户端连接时调用
* @param ConnectionInterface $conn
@@ -52,11 +56,10 @@ class WebSocket extends WebSocketBase
'is_authenticated' => false,
'conversation_id' => null,
];
echo "New connection! ({$conn->resourceId})
";
echo "New connection! ({$conn->resourceId})\n";
}
/**
* 当从客户端收到消息时调用
* @param ConnectionInterface $conn
@@ -65,59 +68,72 @@ class WebSocket extends WebSocketBase
public function onMessage(ConnectionInterface $conn, $message)
{
$numRecv = count($this->clients) - 1;
echo sprintf('Connection %d sending message "%s" to %d other connection%s' . "\n",
$conn->resourceId, $message, $numRecv, $numRecv == 1 ? '' : 's');
echo sprintf(
'Connection %d sending message "%s" to %d other connection%s' . "\n",
$conn->resourceId,
$message,
$numRecv,
$numRecv == 1 ? '' : 's'
);
// 解析消息
try {
$data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON format');
}
// 处理认证
if (isset($data['action']) && $data['action'] === 'auth') {
$this->handleAuth($conn, $data);
return;
}
// 检查是否已认证
if (!$this->clientData[$conn->resourceId]['is_authenticated']) {
$conn->send(json_encode(['type' => 'error', 'message' => 'Not authenticated']));
return;
}
// 处理聊天消息
if (isset($data['action']) && $data['action'] === 'chat') {
$this->handleChat($conn, $data);
return;
}
// 处理心跳
if (isset($data['action']) && $data['action'] === 'ping') {
$conn->send(json_encode(['type' => 'pong']));
return;
}
$conn->send(json_encode(['type' => 'error', 'message' => 'Unknown action']));
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage()]));
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
/**
* 当客户端连接关闭时调用
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn)
{
$resourceId = $conn->resourceId;
// 移除连接
$this->clients->detach($conn);
unset($this->clientData[$conn->resourceId]);
echo "Connection {$conn->resourceId} has disconnected\n";
unset($this->clientData[$resourceId]);
// 停止与该连接相关的所有流式请求
if (isset($this->streamingRequests[$resourceId])) {
$this->streamingRequests[$resourceId]['is_active'] = false;
$this->log('客户端连接已关闭,标记流式请求为停止:' . $resourceId, 'info');
}
echo "Connection {$resourceId} has disconnected\n";
}
/**
* 当连接发生错误时调用
* @param ConnectionInterface $conn
@@ -126,10 +142,10 @@ class WebSocket extends WebSocketBase
public function onError(ConnectionInterface $conn, \Exception $e)
{
echo "An error has occurred: {$e->getMessage()}\n";
$conn->close();
}
/**
* 处理客户端认证
* @param ConnectionInterface $conn
@@ -141,25 +157,25 @@ class WebSocket extends WebSocketBase
$site_id = $data['site_id'] ?? null;
$member_id = $data['member_id'] ?? null;
$token = $data['token'] ?? null;
if (empty($site_id) || empty($member_id) || empty($token)) {
throw new \Exception('Missing authentication parameters');
}
// 这里可以添加更严格的认证逻辑例如验证token的有效性
// 为了简单起见,我们暂时只检查参数是否存在
$this->clientData[$conn->resourceId]['site_id'] = $site_id;
$this->clientData[$conn->resourceId]['member_id'] = $member_id;
$this->clientData[$conn->resourceId]['token'] = $token;
$this->clientData[$conn->resourceId]['is_authenticated'] = true;
$conn->send(json_encode(['type' => 'auth_success', 'message' => 'Authenticated successfully']));
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'auth_error', 'message' => $e->getMessage()]));
$conn->send(json_encode(['type' => 'auth_error', 'message' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
/**
* 处理聊天消息
* @param ConnectionInterface $conn
@@ -169,13 +185,13 @@ class WebSocket extends WebSocketBase
{
try {
$clientInfo = $this->clientData[$conn->resourceId];
// 获取请求参数
$message = $data['message'] ?? '';
$user_id = $data['user_id'] ?? $clientInfo['member_id'];
$conversation_id = $data['conversation_id'] ?? '';
$stream = $data['stream'] ?? true; // WebSocket默认使用流式响应
// 设置当前控制器的属性
$this->site_id = $clientInfo['site_id'];
$this->member_id = $clientInfo['member_id'];
@@ -186,20 +202,20 @@ class WebSocket extends WebSocketBase
'conversation_id' => $conversation_id,
'stream' => $stream,
];
// 验证参数并获取配置
$config = $this->validateAndGetConfig([
'message' => ['required' => true, 'message' => '请求参数 `message` 不能为空. 为消息内容', 'description' => '消息内容'],
'user_id' => ['required' => true, 'message' => '请求参数 `user_id` 不能为空', 'description' => '用户ID']
]);
// 构建请求数据和请求头
$requestData = $this->buildRequestData($message, $user_id, $conversation_id, $stream);
$headers = $this->buildRequestHeaders($config['api_key']);
// 发送请求到Dify API
$url = $config['base_url'] . $config['chat_endpoint'];
if ($stream) {
// 处理流式响应
$this->handleStreamingResponse($conn, $url, $requestData, $headers, $message, $user_id);
@@ -209,10 +225,10 @@ class WebSocket extends WebSocketBase
$conn->send(json_encode(['type' => 'message', 'data' => $response]));
}
} catch (\Exception $e) {
$conn->send(json_encode(['type' => 'error', 'message' => '请求失败:' . $e->getMessage()]));
$conn->send(json_encode(['type' => 'error', 'message' => '请求失败:' . $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]));
}
}
/**
* 处理流式响应
* @param ConnectionInterface $conn
@@ -227,13 +243,13 @@ class WebSocket extends WebSocketBase
try {
// 记录开始处理流式请求
$this->log('AI客服WebSocket流式请求开始处理用户ID' . $user_id . ',请求消息:' . $message, 'info');
// 初始化模型
$kefu_conversation_model = new KefuConversationModel();
$kefu_message_model = new KefuMessageModel();
$site_id = $this->site_id;
$current_user_id = $this->member_id;
// 定义变量
$real_conversation_id = '';
$real_assistant_message_id = '';
@@ -242,30 +258,17 @@ class WebSocket extends WebSocketBase
$user_message_saved = false;
$user_message_content = $message;
$temp_conversation_id = 'temp_' . uniqid() . '_' . time(); // 临时会话ID用于失败回滚
// 立即保存用户消息使用临时会话ID
$this->saveUserMessage($kefu_message_model, $site_id, $current_user_id, $temp_conversation_id, '', $user_message_content);
$this->log('用户消息已立即保存临时会话ID' . $temp_conversation_id, 'info');
// 创建或更新临时会话
$this->updateOrCreateConversation($kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('临时会话已创建ID' . $temp_conversation_id, 'info');
// WebSocket消息回调
$on_data = function ($data) use (
$conn,
&$real_conversation_id,
&$real_assistant_message_id,
&$real_user_message_id,
&$assistant_content,
&$user_message_saved,
$user_message_content,
$kefu_message_model,
$kefu_conversation_model,
$site_id,
$current_user_id,
$temp_conversation_id
) {
$on_data = function ($data) use ($conn, &$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id) {
try {
// 解析Dify的流式响应
$lines = explode("\n", $data);
@@ -273,16 +276,17 @@ class WebSocket extends WebSocketBase
$line = trim($line);
if (empty($line))
continue;
// 查找以"data: "开头的行
if (strpos($line, 'data: ') === 0) {
$json_data = substr($line, 6);
$event_data = json_decode($json_data, true);
$this->log('-->获得的数据:' . $json_data, 'debug');
if (json_last_error() === JSON_ERROR_NONE && isset($event_data['event'])) {
$event = $event_data['event'];
$this->log('处理AI客服事件' . $event, 'info');
switch ($event_data['event']) {
case 'message':
// LLM返回文本块事件
@@ -296,19 +300,27 @@ class WebSocket extends WebSocketBase
}
// 积累助手回复内容
if (isset($event_data['answer'])) {
$assistant_content .= $event_data['answer'];
$this->log('积累助手回复内容:' . $event_data['answer'], 'debug');
$current_chunk = $event_data['answer'];
$assistant_content .= $current_chunk;
$this->log('积累助手回复内容:' . $current_chunk, 'debug');
// 添加时间戳,确保消息顺序
$timestamp = microtime(true);
// 通过WebSocket发送消息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message',
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'answer' => $event_data['answer'],
'full_content' => $assistant_content
'answer' => $current_chunk,
'timestamp' => $timestamp,
'chunk_index' => uniqid()
]));
$this->log('向客户端发送消息: ' . $current_chunk, 'debug');
// 实时保存助手回复内容(流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
@@ -324,18 +336,18 @@ class WebSocket extends WebSocketBase
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true;
$this->log('用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
}
break;
case 'agent_message':
// Agent模式下返回文本块事件
if (isset($event_data['conversation_id'])) {
@@ -348,19 +360,25 @@ class WebSocket extends WebSocketBase
}
// 积累助手回复内容
if (isset($event_data['answer'])) {
$assistant_content .= $event_data['answer'];
$this->log('积累Agent回复内容' . $event_data['answer'], 'debug');
$current_chunk = $event_data['answer'];
$assistant_content .= $current_chunk;
$this->log('积累Agent回复内容' . $current_chunk, 'debug');
// 添加时间戳,确保消息顺序
$timestamp = microtime(true);
// 通过WebSocket发送消息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'agent_message',
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'answer' => $event_data['answer'],
'full_content' => $assistant_content
'answer' => $current_chunk,
'timestamp' => $timestamp,
'chunk_index' => uniqid()
]));
// 实时保存助手回复内容Agent模式流式过程中
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
@@ -376,18 +394,18 @@ class WebSocket extends WebSocketBase
['conversation_id', '=', $temp_conversation_id],
['role', '=', 'user']
]);
$kefu_conversation_model->updateConversation(['conversation_id' => $real_conversation_id], [
['site_id', '=', $site_id],
['user_id', '=', $current_user_id],
['conversation_id', '=', $temp_conversation_id]
]);
$user_message_saved = true;
$this->log('Agent模式下用户消息会话ID已更新为真实ID' . $real_conversation_id, 'info');
}
break;
case 'agent_thought':
if (isset($event_data['thought'])) {
// 格式化思考过程
@@ -400,9 +418,10 @@ class WebSocket extends WebSocketBase
}
$assistant_content .= $thought_content;
$this->log('Agent思考过程' . $thought_content, 'debug');
// 通过WebSocket发送思考过程
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'agent_thought',
'thought' => $event_data['thought'],
@@ -411,7 +430,7 @@ class WebSocket extends WebSocketBase
]));
}
break;
case 'file':
if (isset($event_data['id']) && isset($event_data['type']) && isset($event_data['url'])) {
$file_id = $event_data['id'];
@@ -421,9 +440,10 @@ class WebSocket extends WebSocketBase
$file_content = "\n[文件]: " . $file_type . " - " . $file_url;
$assistant_content .= $file_content;
$this->log('收到文件事件:' . $file_type . ' - ' . $file_url, 'info');
// 通过WebSocket发送文件信息
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'file',
'id' => $file_id,
@@ -432,34 +452,36 @@ class WebSocket extends WebSocketBase
]));
}
break;
case 'message_start':
if (isset($event_data['conversation_id'])) {
$real_conversation_id = $event_data['conversation_id'];
$this->log('消息开始事件会话ID' . $real_conversation_id, 'info');
// 通过WebSocket发送消息开始事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_start',
'conversation_id' => $real_conversation_id
]));
}
break;
case 'message_delta':
if (isset($event_data['delta']['content'])) {
$assistant_content .= $event_data['delta']['content'];
$this->log('积累增量内容:' . $event_data['delta']['content'], 'debug');
// 通过WebSocket发送增量内容
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_delta',
'delta' => $event_data['delta'],
'full_content' => $assistant_content
// 'full_content' => $assistant_content
]));
// 实时保存助手回复内容(增量流式过程中)
if (!empty($real_conversation_id) && !empty($real_assistant_message_id)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'streaming');
@@ -467,13 +489,14 @@ class WebSocket extends WebSocketBase
}
}
break;
case 'message_end':
// 最终内容已通过message或message_delta事件积累
$this->log('消息结束事件会话ID' . $real_conversation_id . '消息ID' . $real_assistant_message_id, 'info');
// 通过WebSocket发送消息结束事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'message_end',
'conversation_id' => $real_conversation_id,
@@ -481,20 +504,21 @@ class WebSocket extends WebSocketBase
'content' => $assistant_content
]));
break;
case 'error':
$error_message = isset($event_data['message']) ? $event_data['message'] : '流式输出异常';
$assistant_content .= "\n[错误]: " . $error_message;
$this->log('AI客服错误事件' . $error_message, 'error');
// 通过WebSocket发送错误事件
$conn->send(json_encode([
'stream' => 1,
'type' => 'message',
'event' => 'error',
'message' => $error_message
]));
break;
case 'ping':
// 保持连接存活的ping事件
// 无需特殊处理,继续保持连接
@@ -508,42 +532,113 @@ class WebSocket extends WebSocketBase
}
} catch (\Exception $e) {
$this->log('AI客服事件处理异常' . $e->getMessage(), 'error');
$conn->send(json_encode(['type' => 'error', 'message' => $e->getMessage()]));
$conn->send(json_encode([
'stream' => 1,
'type' => 'error',
'message' => $e->getMessage()
]));
}
};
// 错误处理回调函数
$on_error = function ($error) use ($user_id, $conn) {
$this->log('AI客服请求错误用户ID' . $user_id . ',错误信息:' . json_encode($error), 'error');
$conn->send(json_encode(['type' => 'error', 'message' => $error]));
};
// 调用curl流式请求
$this->curlRequestStreaming($url, 'POST', $requestData, $headers, $on_data, $on_error);
$this->log('AI客服请求成功用户ID' . $user_id . '会话ID' . $real_conversation_id, 'info');
// 数据流结束时发送明确的"done"事件
$done_data = [
// 存储流式请求信息
$requestId = $conn->resourceId;
$this->streamingRequests[$requestId] = [
'conn' => $conn,
'user_id' => $user_id,
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
'started_at' => time(),
'is_active' => true
];
$conn->send(json_encode(['type' => 'message', 'event' => 'done', 'data' => $done_data]));
// 流式正常完成,标记助手消息为已完成状态
if (!empty($real_conversation_id) && !empty($real_assistant_message_id) && !empty($assistant_content)) {
$this->saveStreamingAssistantMessage($kefu_message_model, $site_id, $current_user_id, $real_conversation_id, $real_assistant_message_id, $assistant_content, 'completed');
$this->log('AI客服回复已标记为完成状态会话ID' . $real_conversation_id . ',总字数:' . strlen($assistant_content), 'info');
}
// 清理临时数据
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('开始流式请求请求ID' . $requestId, 'info');
// 检查客户端连接状态的回调
$on_check = function() use ($conn, $requestId) {
// 检查连接是否仍然在客户端列表中通过检查clientData
if (!isset($this->clientData[$requestId])) {
$this->log('客户端连接已关闭,停止流式请求:' . $requestId, 'info');
return false;
}
// 检查请求是否被标记为已停止
if (isset($this->streamingRequests[$requestId]) && !$this->streamingRequests[$requestId]['is_active']) {
$this->log('流式请求已被标记为停止:' . $requestId, 'info');
return false;
}
return true;
};
// 流式完成回调:仅在上游流真正结束后才触发(避免立刻发送 done
$on_complete = function (bool $aborted = false, int $errno = 0, ?string $err = null) use (
$conn,
$requestId,
$user_id,
&$real_conversation_id,
&$real_assistant_message_id,
&$assistant_content,
$kefu_message_model,
$kefu_conversation_model,
$site_id,
$current_user_id,
$temp_conversation_id
) {
// 从流式请求列表中移除
if (isset($this->streamingRequests[$requestId])) {
unset($this->streamingRequests[$requestId]);
$this->log('移除流式请求请求ID' . $requestId, 'info');
}
if ($errno !== 0 && $err) {
$this->log('AI客服请求结束但存在错误用户ID' . $user_id . ',错误:' . $err, 'error');
}
// 被中断(例如客户端断开)不发送 done
if (!$aborted && isset($this->clientData[$requestId])) {
$done_data = [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content,
];
$conn->send(json_encode(['type' => 'message', 'event' => 'done', 'data' => $done_data]));
}
// 只有非中断且有内容时,标记 completed
if (
!$aborted &&
!empty($real_conversation_id) &&
!empty($real_assistant_message_id) &&
!empty($assistant_content)
) {
$this->saveStreamingAssistantMessage(
$kefu_message_model,
$site_id,
$current_user_id,
$real_conversation_id,
$real_assistant_message_id,
$assistant_content,
'completed'
);
$this->log('AI客服回复已标记为完成状态会话ID' . $real_conversation_id . ',总字数:' . strlen($assistant_content), 'info');
}
// 清理临时数据(无论是否中断,都需要清理临时会话)
$this->cleanupTempData($kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id, $temp_conversation_id);
$this->log('AI客服请求处理完成用户ID' . $user_id . '会话ID' . $real_conversation_id, 'info');
};
// 调用curl流式请求异步
$this->curlRequestStreaming($url, 'POST', $requestData, $headers, $on_data, $on_error, $on_check, $on_complete);
} catch (\Exception $e) {
$error_msg = 'AI客服请求异常' . $e->getMessage() . ',错误行:' . $e->getLine() . ',错误文件:' . $e->getFile();
$this->log($error_msg, 'error');
$conn->send(json_encode(['type' => 'error', 'message' => $error_msg]));
// 异常时清理临时数据
try {
$kefu_conversation_model = new KefuConversationModel();
@@ -556,7 +651,7 @@ class WebSocket extends WebSocketBase
}
}
}
/**
* 通用的curl流式请求函数
* @param string $url 请求URL
@@ -565,64 +660,117 @@ class WebSocket extends WebSocketBase
* @param array $headers 请求头
* @param callable|null $on_data 数据回调函数,接收原始数据
* @param callable|null $on_error 错误回调函数,接收错误信息
* @param callable|null $on_check 检查是否应该继续请求的回调函数
* @param callable|null $on_complete 完成回调函数(请求结束/中断时触发)
* @return bool 请求是否成功
*/
private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null)
private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null, $on_check = null, $on_complete = null)
{
try {
$ch = curl_init();
// 设置URL
$aborted = false;
// 基础设置
curl_setopt($ch, CURLOPT_URL, $url);
// 设置请求方法
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
// 设置POST数据
if ($method === 'POST' && !empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, is_array($data) ? json_encode($data) : $data);
}
// 设置请求头
if (!empty($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
} else {
// 默认请求头
curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
]);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
}
// 设置cURL选项以支持流式输出
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false); // 不返回结果,直接输出
// 流式/非阻塞相关
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_TIMEOUT, 0); // 无超时限制,适用于长时间流式响应
curl_setopt($ch, CURLOPT_TIMEOUT, 0);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 30);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $data) use ($on_data) {
// 调用自定义数据处理回调
curl_setopt($ch, CURLOPT_BUFFERSIZE, 1);
curl_setopt($ch, CURLOPT_TCP_NODELAY, true);
curl_setopt($ch, CURLOPT_FRESH_CONNECT, true);
curl_setopt($ch, CURLOPT_FORBID_REUSE, true);
curl_setopt($ch, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
// 到一块就立即触发
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $chunk) use ($on_data, $on_check, &$aborted) {
if (is_callable($on_check) && !$on_check()) {
$this->log('请求被中断,停止处理数据', 'info');
$aborted = true;
return -1; // 中断
}
$this->log('收到数据块,大小:' . strlen($chunk), 'debug');
if (is_callable($on_data)) {
$on_data($data);
$on_data($chunk);
}
return strlen($data);
return strlen($chunk);
});
// 执行请求并流式输出响应
curl_exec($ch);
if (curl_errno($ch)) {
$error = curl_error($ch);
$this->log('Curl请求错误' . $error, 'error');
if (is_callable($on_error)) {
$on_error($error);
$mh = curl_multi_init();
curl_multi_add_handle($mh, $ch);
$loop = Loop::get();
$timer = null;
$cleanup = function () use (&$mh, &$ch, &$timer, $loop) {
if (isset($timer)) {
$loop->cancelTimer($timer);
}
curl_close($ch);
return false;
}
curl_close($ch);
if ($mh && $ch) {
curl_multi_remove_handle($mh, $ch);
curl_close($ch);
curl_multi_close($mh);
}
};
// 定时推进 multi 状态机,避免阻塞事件循环
$timer = $loop->addPeriodicTimer(0.01, function () use (&$timer, $mh, $ch, $on_error, $cleanup, $on_complete, &$aborted) {
do {
$status = curl_multi_exec($mh, $active);
} while ($status === CURLM_CALL_MULTI_PERFORM);
// 非正常状态直接报错并清理
if ($status !== CURLM_OK && $status !== CURLM_CALL_MULTI_PERFORM) {
$msg = 'Curl multi 执行异常,状态:' . $status;
$this->log($msg, 'error');
if (is_callable($on_error)) {
$on_error($msg);
}
if (is_callable($on_complete)) {
$on_complete(true, 1, $msg);
}
$cleanup();
return;
}
// 读取完成/错误事件
while ($info = curl_multi_info_read($mh)) {
if ($info['msg'] === CURLMSG_DONE) {
$errno = curl_errno($ch);
$err = null;
if ($errno !== 0) {
$err = curl_error($ch);
// 忽略回调中断错误
if (strpos($err, 'callback') === false) {
$this->log('Curl请求错误' . $err, 'error');
if (is_callable($on_error)) {
$on_error($err);
}
} else {
$this->log('请求被回调中断', 'info');
$aborted = true;
}
}
if (is_callable($on_complete)) {
$on_complete($aborted, $errno, $err);
}
$cleanup();
return;
}
}
});
return true;
} catch (\Exception $e) {
$this->log(json_encode(["event" => "error", "data" => $e->getMessage(), "line" => $e->getLine(), "file" => $e->getFile()]), 'error');
@@ -632,7 +780,7 @@ class WebSocket extends WebSocketBase
return false;
}
}
/**
* 封装curl请求方法
* @param string $url 请求URL
@@ -684,7 +832,7 @@ class WebSocket extends WebSocketBase
return $response;
}
/**
* 验证参数并获取配置
* @param array $params_rules 参数验证规则
@@ -716,7 +864,7 @@ class WebSocket extends WebSocketBase
return $config_info;
}
/**
* 构建请求数据
* @param string $message 用户消息
@@ -741,7 +889,7 @@ class WebSocket extends WebSocketBase
return $requestData;
}
/**
* 构建请求头
* @param string $api_key API密钥
@@ -754,7 +902,7 @@ class WebSocket extends WebSocketBase
'Authorization: Bearer ' . $api_key,
];
}
/**
* 保存用户消息
* @param KefuMessageModel $message_model 消息模型
@@ -776,7 +924,7 @@ class WebSocket extends WebSocketBase
'content' => $content,
]);
}
/**
* 保存助手消息
* @param KefuMessageModel $message_model 消息模型
@@ -798,7 +946,7 @@ class WebSocket extends WebSocketBase
'content' => $content,
]);
}
/**
* 更新或创建会话
* @param KefuConversationModel $conversation_model 会话模型
@@ -833,7 +981,7 @@ class WebSocket extends WebSocketBase
]);
}
}
/**
* 实时保存助手消息内容(流式过程中)
* @param KefuMessageModel $message_model 消息模型
@@ -881,7 +1029,7 @@ class WebSocket extends WebSocketBase
]);
}
}
/**
* 清理临时消息和会话
* @param KefuMessageModel $message_model 消息模型
@@ -909,7 +1057,7 @@ class WebSocket extends WebSocketBase
$this->log('临时数据已清理会话ID' . $temp_conversation_id, 'info');
}
/**
* 日志记录封装方法
* @param string $message 日志内容
@@ -919,12 +1067,12 @@ class WebSocket extends WebSocketBase
private function log($message, $level = 'info')
{
// 只允许info、error级别
if (!in_array($level, ['info', 'error'])) {
if (!in_array($level, ['info', 'error', 'debug'])) {
return;
}
log_write($message, $level, '', 2);
log_write($message, $level, 'ws.log', 2);
}
/**
* 处理非流式响应
* @param string $url 请求URL

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,28 +1,53 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket多addon测试</title>
<!-- 引入 Vue3 -->
<script src="./vue.3.6.0-beta.3.global.prod.js"></script>
<!-- 引入 Jquery -->
<script src="./jquery-4.0.0.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
}
.settings {
margin-bottom: 20px;
padding: 15px;
background-color: #f9f9f9;
border-radius: 5px;
}
.settings input {
width: 60%;
padding: 8px;
margin-right: 10px;
}
.addon-test {
margin-bottom: 30px;
padding: 20px;
border: 1px solid #ddd;
border-radius: 5px;
}
.addon-test h2 {
margin-top: 0;
color: #333;
}
.chat-area {
height: 300px;
overflow-y: scroll;
@@ -31,23 +56,28 @@
margin-bottom: 10px;
background-color: #f5f5f5;
}
.message {
margin-bottom: 10px;
padding: 5px 10px;
border-radius: 3px;
}
.message.user {
background-color: #e3f2fd;
text-align: right;
}
.message.server {
background-color: #f0f0f0;
}
input[type="text"] {
width: 70%;
padding: 8px;
margin-right: 10px;
}
button {
padding: 8px 15px;
background-color: #4CAF50;
@@ -56,168 +86,405 @@
border-radius: 3px;
cursor: pointer;
}
button:hover {
background-color: #45a049;
}
button:disabled {
background-color: #ccc;
cursor: not-allowed;
}
.status {
color: #666;
font-size: 12px;
margin-bottom: 10px;
}
.status.connected {
color: green;
}
.status.disconnected {
color: red;
}
.status.error {
color: red;
}
</style>
</head>
<body>
<div class="container">
<div id="message_show" style="display: block; width: 100%; height: 200px;">Message</div>
<div id="app" class="container">
<h1>WebSocket多addon测试</h1>
<!-- aikefu addon 测试 -->
<div class="addon-test">
<h2>aikefu Addon (ws://localhost:8080/ws/aikefu)</h2>
<div class="status" id="status-aikefu">连接</div>
<div class="chat-area" id="chat-aikefu"></div>
<div>
<input type="text" id="message-aikefu" placeholder="输入消息...">
<button onclick="sendMessage('aikefu')">发送</button>
</div>
<div class="settings">
<input type="text" v-model="websocketUrl" placeholder="WebSocket服务器地址" @keyup.enter="setWebsocketUrl">
<button @click="setWebsocketUrl">设置WebSocket服务器地址</button>
<button @click="initConnections" :disabled="connecting">初始化连接</button>
</div>
<!-- 默认路径测试 -->
<div class="addon-test">
<h2>默认路径 (ws://localhost:8080/ws)</h2>
<div class="status" id="status-default">未连接</div>
<div class="chat-area" id="chat-default"></div>
<div v-for="addon in addons" :key="addon.name" class="addon-test">
<h2>{{ addon.title }} - {{ addon.fullPath }}</h2>
<div class="status" :class="addon.statusClass">
{{ addon.statusText }}
<pre>streamMsg: {{ streamMsg}}</pre>
</div>
<div ref="chatAreas" class="chat-area" :data-addon="addon.name">
<div v-for="(msg, index) in addon.messages" :key="index" class="message"
:class="{ user: msg.sender === '用户' }">
<strong>{{ msg.sender }}:</strong><br>
<pre v-if="msg.isJson">{{ msg.content }}</pre>
<span v-else>{{ msg.content }}</span>
</div>
</div>
<div>
<input type="text" id="message-default" placeholder="输入消息...">
<button onclick="sendMessage('default')">发送</button>
<input type="text" v-model="addon.inputMessage" placeholder="输入消息..."
@keyup.enter="sendMessage(addon.name)" :disabled="addon.status !== 'connected'">
<button @click="sendMessage(addon.name)"
:disabled="addon.status !== 'connected' || !addon.inputMessage.trim()">
发送
</button>
</div>
</div>
</div>
<script>
// WebSocket连接对象
let wsConnections = {};
// 连接配置
const configs = {
aikefu: {
url: 'ws://localhost:8080/ws/aikefu',
status: document.getElementById('status-aikefu'),
chat: document.getElementById('chat-aikefu'),
message: document.getElementById('message-aikefu')
},
default: {
url: 'ws://localhost:8080/ws',
status: document.getElementById('status-default'),
chat: document.getElementById('chat-default'),
message: document.getElementById('message-default')
}
};
// 初始化所有连接
function initConnections() {
for (const name in configs) {
connect(name);
}
}
// 连接到WebSocket服务器
function connect(name) {
const config = configs[name];
try {
wsConnections[name] = new WebSocket(config.url);
wsConnections[name].onopen = function() {
config.status.textContent = '已连接';
config.status.style.color = 'green';
// 发送认证信息
const authMsg = JSON.stringify({
action: 'auth',
site_id: 1,
member_id: 1,
token: 'test_token'
document.addEventListener('DOMContentLoaded', () => {
const { createApp, ref, reactive, onMounted, onBeforeUnmount, nextTick, triggerRef } = Vue;
createApp({
setup() {
// WebSocket服务器地址
const websocketUrl = ref('ws://localhost:8080');
const connecting = ref(false);
const streamMsg = ref('');
// WebSocket连接对象
const wsConnections = reactive({});
// Addon配置
const addons = reactive([
{
name: 'aikefu',
title: 'aikefu Addon',
path: '/ws/aikefu',
fullPath: '',
status: 'disconnected', // disconnected, connecting, connected, error
statusText: '未连接',
statusClass: 'disconnected',
messages: [],
inputMessage: ''
},
{
name: 'default',
title: '默认路径',
path: '/ws',
status: 'disconnected',
statusText: '未连接',
statusClass: 'disconnected',
messages: [],
inputMessage: ''
}
]);
// 聊天区域引用
const chatAreas = ref([]);
// 设置WebSocket服务器地址
const setWebsocketUrl = () => {
if (websocketUrl.value.trim() && websocketUrl.value.startsWith('ws://')) {
addons.forEach(addon => {
addon.fullPath = websocketUrl.value + addon.path;
addon.status = 'disconnected';
addon.statusText = '未连接';
addon.statusClass = 'disconnected';
addon.messages = [];
addon.inputMessage = '';
});
} else {
alert('请输入有效的WebSocket服务器地址格式为: ws://localhost:8080 ');
}
};
// 更新addon状态
const updateAddonStatus = (name, status, statusText) => {
const addon = addons.find(a => a.name === name);
if (addon) {
addon.status = status;
addon.statusText = statusText;
addon.statusClass = status === 'connected' ? 'connected' :
status === 'error' ? 'error' : 'disconnected';
}
};
// 强制触发响应式更新的辅助函数
const forceUpdate = (addon) => {
// 通过重新赋值数组来强制触发响应式更新
addon.messages = [...addon.messages];
};
// 添加消息
const addMessage = (name, sender, content) => {
const addon = addons.find(a => a.name === name);
if (addon) {
let isJson = false;
let displayContent = content;
let parsed = null;
let isStream = false;
// 尝试解析JSON
try {
parsed = JSON.parse(content);
displayContent = JSON.stringify(parsed, null, 2);
isJson = true;
// 检查是否是流式消息
if (parsed && (parsed.stream === 1 || parsed.stream === true || parsed.stream === '1')) {
isStream = true;
}
} catch (e) {
// 不是JSON使用原始内容
}
// 如果是流式消息,更新最后一条服务器消息
if (isStream && sender === '服务器' && parsed) {
// 检查流式消息是否结束
const isStreamEnd = parsed.stream === 0 || parsed.stream === false || parsed.done === true || parsed.finish_reason;
// 查找最后一条来自服务器的消息(且是流式消息)
let foundLastStreamMessage = false;
for (let i = addon.messages.length - 1; i >= 0; i--) {
if (addon.messages[i].sender === '服务器' && addon.messages[i].isStreaming) {
// 更新流式消息的内容 - 使用对象替换来确保响应式更新
if (parsed.answer) {
// 如果有新的内容块,追加到现有内容
const newContent = parsed.answer;
// 立即更新DOM不依赖Vue的响应式更新
const chatArea = chatAreas.value.find(el => el?.dataset?.addon === name);
if (chatArea) {
// 直接操作DOM确保实时显示
const lastMessageEl = chatArea.lastElementChild;
if (lastMessageEl) {
const contentEl = lastMessageEl.querySelector('span');
if (contentEl) {
contentEl.textContent += newContent;
} else {
// 如果没有span元素创建一个
const newSpan = document.createElement('span');
newSpan.textContent = newContent;
lastMessageEl.appendChild(newSpan);
}
// 立即滚动到底部
chatArea.scrollTop = chatArea.scrollHeight;
}
}
// 同时更新Vue的数据确保状态一致性
streamMsg.value = streamMsg.value + newContent;
addon.messages[i].content = addon.messages[i].content + newContent;
console.log(`--------->${name} 更新流式消息的内容: ${newContent}`);
$('#message_show').html(`<p>${newContent}</p>`);
} else {
// 更新整个JSON显示
addon.messages[i] = {
...addon.messages[i],
content: displayContent,
isJson: true,
isStreaming: !isStreamEnd
};
// 强制触发响应式更新
forceUpdate(addon);
// 立即触发DOM渲染和滚动
nextTick(() => {
const chatArea = chatAreas.value.find(el => el?.dataset?.addon === name);
if (chatArea) {
chatArea.scrollTop = chatArea.scrollHeight;
}
});
}
foundLastStreamMessage = true;
// 强制触发响应式更新即使我们直接操作了DOM
forceUpdate(addon);
return; // 立即返回,不执行后续的滚动逻辑
}
}
// 如果没有找到正在进行的流式消息,创建新消息
if (!foundLastStreamMessage) {
const streamContent = parsed.answer;
addon.messages.push({
sender,
content: streamContent,
isJson: false,
isStreaming: !isStreamEnd, // 如果已结束,则不是流式状态
timestamp: new Date().toLocaleTimeString()
});
// 强制触发响应式更新
forceUpdate(addon);
// 立即触发DOM渲染和滚动
nextTick(() => {
const chatArea = chatAreas.value.find(el => el?.dataset?.addon === name);
if (chatArea) {
chatArea.scrollTop = chatArea.scrollHeight;
}
});
return; // 立即返回,不执行后续的滚动逻辑
}
} else {
// 非流式消息,正常添加新消息
addon.messages.push({
sender,
content: displayContent,
isJson,
isStreaming: false,
timestamp: new Date().toLocaleTimeString()
});
}
// 滚动到底部(仅对非流式消息)
nextTick(() => {
const chatArea = chatAreas.value.find(el => el?.dataset?.addon === name);
if (chatArea) {
chatArea.scrollTop = chatArea.scrollHeight;
}
});
}
};
// 连接到WebSocket服务器
const connect = (name) => {
const addon = addons.find(a => a.name === name);
if (!addon) return;
updateAddonStatus(name, 'connecting', '连接中...');
try {
const url = `${websocketUrl.value}${addon.path}`;
wsConnections[name] = new WebSocket(url);
wsConnections[name].onopen = () => {
updateAddonStatus(name, 'connected', '已连接');
// 发送认证信息
const authMsg = JSON.stringify({
action: 'auth',
site_id: 1,
member_id: 1,
token: 'test_token'
});
wsConnections[name].send(authMsg);
addMessage(name, '系统', '已连接到WebSocket服务器');
};
wsConnections[name].onmessage = (event) => {
// console.log('WebSocket消息: ', event.data);
addMessage(name, '服务器', event.data);
};
wsConnections[name].onclose = () => {
updateAddonStatus(name, 'disconnected', '已断开');
addMessage(name, '系统', 'WebSocket连接已断开');
};
wsConnections[name].onerror = (error) => {
updateAddonStatus(name, 'error', '连接错误');
console.log('WebSocket连接错误: ', error);
addMessage(name, '系统', 'WebSocket连接错误: ' + (error.message || 'NS_ERROR_WEBSOCKET_CONNECTION_REFUSED'));
};
} catch (e) {
console.log('WebSocket连接失败: ', e);
updateAddonStatus(name, 'error', '连接失败');
addMessage(name, '系统', '无法连接到WebSocket服务器: ' + e.message);
}
};
// 初始化所有连接
const initConnections = () => {
if (connecting.value) return;
connecting.value = true;
addons.forEach(addon => {
// 如果已连接,先关闭
if (wsConnections[addon.name] &&
wsConnections[addon.name].readyState === WebSocket.OPEN) {
wsConnections[addon.name].close();
}
connect(addon.name);
});
setTimeout(() => {
connecting.value = false;
}, 1000);
};
// 发送消息
const sendMessage = (name) => {
const addon = addons.find(a => a.name === name);
if (!addon || !addon.inputMessage.trim()) return;
const message = addon.inputMessage.trim();
addMessage(name, '用户', message);
// 发送聊天消息
const chatMsg = JSON.stringify({
action: 'chat',
message: message,
user_id: 1,
stream: true
});
if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) {
wsConnections[name].send(chatMsg);
addon.inputMessage = '';
} else {
addMessage(name, '系统', 'WebSocket未连接无法发送消息');
}
};
// 关闭所有连接
const closeAllConnections = () => {
Object.keys(wsConnections).forEach(name => {
if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) {
wsConnections[name].close();
}
});
};
// 生命周期钩子
onBeforeUnmount(() => {
closeAllConnections();
});
wsConnections[name].send(authMsg);
addMessage(name, '系统', '已连接到WebSocket服务器');
};
wsConnections[name].onmessage = function(event) {
try {
const data = JSON.parse(event.data);
addMessage(name, '服务器', JSON.stringify(data, null, 2));
} catch (e) {
addMessage(name, '服务器', event.data);
}
};
wsConnections[name].onclose = function() {
config.status.textContent = '已断开';
config.status.style.color = 'red';
addMessage(name, '系统', 'WebSocket连接已断开');
};
wsConnections[name].onerror = function(error) {
config.status.textContent = '连接错误';
config.status.style.color = 'red';
addMessage(name, '系统', 'WebSocket连接错误: ' + error);
};
} catch (e) {
config.status.textContent = '连接失败';
config.status.style.color = 'red';
addMessage(name, '系统', '无法连接到WebSocket服务器: ' + e);
}
}
// 发送消息
function sendMessage(name) {
const config = configs[name];
const message = config.message.value.trim();
if (!message) return;
addMessage(name, '用户', message);
// 发送聊天消息
const chatMsg = JSON.stringify({
action: 'chat',
message: message,
user_id: 1,
stream: true
});
if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) {
wsConnections[name].send(chatMsg);
config.message.value = '';
} else {
addMessage(name, '系统', 'WebSocket未连接无法发送消息');
}
}
// 添加消息到聊天区域
function addMessage(name, sender, message) {
const config = configs[name];
const div = document.createElement('div');
div.className = 'message ' + (sender === '用户' ? 'user' : 'server');
div.innerHTML = `<strong>${sender}:</strong><br>${message}`;
config.chat.appendChild(div);
config.chat.scrollTop = config.chat.scrollHeight;
}
// 页面加载完成后初始化连接
window.onload = function() {
initConnections();
};
// 页面关闭前关闭所有连接
window.onbeforeunload = function() {
for (const name in wsConnections) {
if (wsConnections[name] && wsConnections[name].readyState === WebSocket.OPEN) {
wsConnections[name].close();
return {
websocketUrl,
connecting,
addons,
chatAreas,
setWebsocketUrl,
initConnections,
sendMessage
};
}
}
};
}).mount('#app');
});
</script>
</body>
</html>

View File

@@ -2192,6 +2192,8 @@ function log_write(string $message, string $level = 'info', string $filename = '
// 可以根据需要记录异常信息
}
}
// echo '日志位置:' . $logFile . "\n";
// 写入文件
file_put_contents($logFile, $content, $flags);

View File

@@ -483,7 +483,7 @@ Vue.component("wechat_channel-edit", {
initSortable: function () {
// 检查Sortable库是否已加载
if (typeof Sortable !== 'undefined') {
const videoList = document.getElementById('videoListEdit');
const videoList = this.$el.querySelector('.video-list-edit');
if (videoList) {
// 销毁现有实例
if (this.sortableInstance) {