chore(addon/aikefu): 流式对话接口实现

This commit is contained in:
2025-12-10 10:19:36 +08:00
parent fd7593c72b
commit ed81982239
4 changed files with 828 additions and 314 deletions

View File

@@ -22,6 +22,7 @@ class Kefu extends BaseApi
*/
private function curlRequestStreaming($url, $method = 'GET', $data = [], $headers = [], $on_data = null, $on_error = null)
{
try {
$ch = curl_init();
// 设置URL
@@ -58,7 +59,10 @@ class Kefu extends BaseApi
} else {
// 默认直接输出
echo $data;
// 只在有输出缓冲时才刷新
if (ob_get_level() > 0) {
ob_flush();
}
flush();
}
@@ -69,6 +73,8 @@ class Kefu extends BaseApi
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('X-Accel-Buffering: no'); // 禁用Nginx缓冲
ob_end_clean(); // 清除输出缓冲区
ob_implicit_flush(true); // 自动刷新输出
@@ -88,6 +94,15 @@ class Kefu extends BaseApi
curl_close($ch);
return true;
} catch (\Exception $e) {
$this->log(json_encode(["event" => "error", "data" => $e->getMessage(), "line" => $e->getLine(), "file" => $e->getFile()]), 'error');
if (is_callable($on_error)) {
$on_error($e->getMessage());
} else {
echo "data: " . json_encode(["event" => "error", "data" => $e->getMessage()]) . "\n\n";
}
return false;
}
}
/**
@@ -343,7 +358,7 @@ class Kefu extends BaseApi
*/
private function log($message, $level = 'info')
{
// Log::write($message, $level);
log_write($message, $level, '', 2);
}
private function handleStreamingResponse($url, $requestData, $headers, $message, $user_id)
@@ -365,14 +380,20 @@ class Kefu extends BaseApi
$user_message_saved = false;
$user_message_content = $message;
try {
// 数据处理回调函数
$on_data = function ($data) use (&$real_conversation_id, &$real_assistant_message_id, &$real_user_message_id, &$assistant_content, &$user_message_saved, $user_message_content, $kefu_message_model, $kefu_conversation_model, $site_id, $current_user_id) {
try {
// 记录接收到的数据块
$this->log('接收到AI客服数据块' . $data, 'debug');
// 输出原始数据
// 输出原始数据(为了兼容旧版本或调试)
echo $data;
// 只在有输出缓冲时才刷新
if (ob_get_level() > 0) {
ob_flush();
}
flush();
// 解析Dify的流式响应
@@ -571,6 +592,9 @@ class Kefu extends BaseApi
}
}
}
} catch (Exception $e) {
$this->log('AI客服事件处理异常' . $e->getMessage(), 'error');
}
};
// 错误处理回调函数
@@ -578,21 +602,21 @@ class Kefu extends BaseApi
$this->log('AI客服请求错误用户ID' . $user_id . ',错误信息:' . json_encode($error), 'error');
};
try {
// 调用curl流式请求
$this->curlRequestStreaming($url, $requestData, $headers, $on_data, $on_error);
$this->curlRequestStreaming($url, 'POST', $requestData, $headers, $on_data, $on_error);
$this->log('AI客服请求成功用户ID' . $user_id . '会话ID' . $real_conversation_id, 'info');
// 数据流结束时发送明确的"done"事件
$done_event = [
'event' => 'done',
'data' => [
$done_data = [
'conversation_id' => $real_conversation_id,
'message_id' => $real_assistant_message_id,
'content' => $assistant_content
]
];
echo "data: " . json_encode($done_event) . "\n\n";
echo "event: done\ndata: " . json_encode($done_data) . "\n\n";
// 只在有输出缓冲时才刷新
if (ob_get_level() > 0) {
ob_flush();
}
flush();
$this->log('发送done事件会话ID' . $real_conversation_id, 'info');

View File

@@ -0,0 +1,437 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>流式聊天测试 Demo</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background-color: white;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
padding: 20px;
}
h1 {
text-align: center;
color: #333;
}
.method-selector {
margin-bottom: 20px;
text-align: center;
}
.method-btn {
padding: 10px 20px;
margin: 0 10px;
border: none;
border-radius: 4px;
background-color: #007bff;
color: white;
cursor: pointer;
font-size: 14px;
}
.method-btn.active {
background-color: #0056b3;
}
#chat-container {
border: 1px solid #ddd;
border-radius: 4px;
height: 400px;
overflow-y: auto;
margin-bottom: 20px;
padding: 10px;
background-color: #fafafa;
}
.message {
margin-bottom: 15px;
padding: 10px;
border-radius: 8px;
max-width: 80%;
}
.user-message {
background-color: #007bff;
color: white;
margin-left: auto;
}
.ai-message {
background-color: #e9ecef;
color: #333;
margin-right: auto;
white-space: pre-wrap;
}
.input-area {
display: flex;
gap: 10px;
}
#message-input {
flex: 1;
padding: 12px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
}
#send-btn {
padding: 12px 20px;
background-color: #28a745;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
#send-btn:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.status {
margin-top: 10px;
font-size: 12px;
color: #666;
text-align: center;
}
</style>
</head>
<body>
<div class="container">
<h1>流式聊天测试 Demo</h1>
<div class="method-selector">
<h3>选择请求方式:</h3>
<button class="method-btn active" data-method="eventsource">EventSource</button>
<button class="method-btn" data-method="fetch">Fetch API</button>
</div>
<div id="chat-container"></div>
<div class="input-area">
<input type="text" id="message-input" placeholder="输入消息...">
<button id="send-btn">发送</button>
</div>
<div class="status">
<span id="status-text">就绪</span>
</div>
</div>
<script>
// 配置
const API_URL = 'http://localhost:8050/api/kefu/chat';
const UNIACID = '1';
let conversationId = '';
let currentMethod = 'eventsource';
let es = null;
let controller = null;
// DOM 元素
const chatContainer = document.getElementById('chat-container');
const messageInput = document.getElementById('message-input');
const sendBtn = document.getElementById('send-btn');
const statusText = document.getElementById('status-text');
const methodBtns = document.querySelectorAll('.method-btn');
// 事件监听
messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendMessage();
}
});
sendBtn.addEventListener('click', sendMessage);
methodBtns.forEach(btn => {
btn.addEventListener('click', () => {
methodBtns.forEach(b => b.classList.remove('active'));
btn.classList.add('active');
currentMethod = btn.dataset.method;
statusText.textContent = `已切换到 ${btn.textContent} 方式`;
});
});
// 发送消息
function sendMessage() {
const message = messageInput.value.trim();
if (!message) return;
// 清空输入框
messageInput.value = '';
// 添加用户消息到聊天记录
addMessageToChat(message, 'user');
// 禁用发送按钮
sendBtn.disabled = true;
statusText.textContent = '正在发送请求...';
// 根据选择的方式发送请求
if (currentMethod === 'eventsource') {
sendEventSourceRequest(message);
} else {
sendFetchStreamRequest(message);
}
}
// EventSource 方式
function sendEventSourceRequest(message) {
// 关闭之前的连接
if (es) {
es.close();
es = null;
}
// 构建请求 URL
const params = new URLSearchParams({
uniacid: UNIACID,
user_id: '123456',
message: message,
conversation_id: conversationId || '',
stream: 'true'
});
const url = `${API_URL}?${params.toString()}`;
try {
statusText.textContent = 'EventSource 连接中...';
es = new EventSource(url);
let aiMessage = '';
// 监听消息事件
es.addEventListener('message', (event) => {
console.log('收到消息:', event);
try {
const data = JSON.parse(event.data);
if (data.event === 'message') {
// 更新 AI 消息
aiMessage += data.answer || '';
updateAIMessage(aiMessage);
}
if (data.event === 'message_end') {
statusText.textContent = '对话完成';
sendBtn.disabled = false;
}
if (data.conversation_id) {
conversationId = data.conversation_id;
}
} catch (error) {
console.error('解析消息失败:', error);
}
});
// 监听完成事件
es.addEventListener('done', (event) => {
console.log('收到完成事件:', event);
try {
const data = JSON.parse(event.data);
if (data.conversation_id) {
conversationId = data.conversation_id;
}
statusText.textContent = '对话完成';
sendBtn.disabled = false;
} catch (error) {
console.error('解析完成事件失败:', error);
}
});
// 监听错误事件
es.addEventListener('error', (error) => {
console.error('EventSource 错误:', error);
statusText.textContent = '连接错误';
sendBtn.disabled = false;
es.close();
es = null;
});
// 监听关闭事件
es.addEventListener('close', () => {
console.log('EventSource 连接关闭');
});
} catch (error) {
console.error('创建 EventSource 失败:', error);
statusText.textContent = '请求失败';
sendBtn.disabled = false;
}
}
// Fetch API 流式请求方式
async function sendFetchStreamRequest(message) {
// 取消之前的请求
if (controller) {
controller.abort();
controller = null;
}
// 创建 AbortController
controller = new AbortController();
const signal = controller.signal;
// 构建请求体
const body = new URLSearchParams({
uniacid: UNIACID,
user_id: '123456',
message: message,
conversation_id: conversationId || '',
stream: 'true'
});
try {
statusText.textContent = 'Fetch 连接中...';
const response = await fetch(API_URL, {
method: 'POST',
body: body,
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'text/event-stream'
},
signal: signal
});
if (!response.ok) {
throw new Error(`HTTP 错误! 状态: ${response.status}`);
}
if (!response.body) {
throw new Error('响应体不可用');
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let aiMessage = '';
statusText.textContent = '接收流式响应...';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 处理接收到的数据
processStreamData(buffer, (newData) => {
buffer = newData;
if (newData) {
// 更新 AI 消息
aiMessage += newData;
updateAIMessage(aiMessage);
}
});
}
// 处理剩余数据
processStreamData(buffer, (newData) => {
if (newData) {
aiMessage += newData;
updateAIMessage(aiMessage);
}
});
statusText.textContent = '对话完成';
sendBtn.disabled = false;
} catch (error) {
if (error.name === 'AbortError') {
statusText.textContent = '请求已取消';
} else {
console.error('Fetch 请求失败:', error);
statusText.textContent = `请求失败: ${error.message}`;
}
sendBtn.disabled = false;
}
}
// 处理流式数据
function processStreamData(buffer, callback) {
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 最后一行可能不完整
lines.forEach(line => {
line = line.trim();
if (!line) return;
// 解析 SSE 格式
if (line.startsWith('data:')) {
const dataPart = line.slice(5).trim();
if (dataPart) {
try {
const data = JSON.parse(dataPart);
if (data.event === 'message') {
callback(data.answer || '');
}
if (data.conversation_id) {
conversationId = data.conversation_id;
}
if (data.event === 'message_end') {
// 对话完成
console.log('对话完成');
}
} catch (error) {
console.error('解析流式数据失败:', error);
}
}
}
});
return buffer;
}
// 添加消息到聊天记录
function addMessageToChat(message, type) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${type}-message`;
messageDiv.textContent = message;
chatContainer.appendChild(messageDiv);
chatContainer.scrollTop = chatContainer.scrollHeight;
// 如果是用户消息,添加一个临时的 AI 消息容器
if (type === 'user') {
const aiMessageDiv = document.createElement('div');
aiMessageDiv.id = 'temp-ai-message';
aiMessageDiv.className = 'message ai-message';
chatContainer.appendChild(aiMessageDiv);
chatContainer.scrollTop = chatContainer.scrollHeight;
}
}
// 更新 AI 消息
function updateAIMessage(message) {
let aiMessageDiv = document.getElementById('temp-ai-message');
if (!aiMessageDiv) {
// 如果临时容器不存在,创建一个新的
aiMessageDiv = document.createElement('div');
aiMessageDiv.id = 'temp-ai-message';
aiMessageDiv.className = 'message ai-message';
chatContainer.appendChild(aiMessageDiv);
}
aiMessageDiv.textContent = message;
chatContainer.scrollTop = chatContainer.scrollHeight;
}
// 页面卸载时清理资源
window.addEventListener('beforeunload', () => {
if (es) {
es.close();
}
if (controller) {
controller.abort();
}
});
// 初始化
statusText.textContent = '就绪,使用 EventSource 方式';
</script>
</body>
</html>

View File

@@ -12,7 +12,7 @@ class KefuChat
/**
* 处理智能客服聊天事件
* @param array $data 事件数据
* @return array
* @return array|null
*/
public function handle($data)
{
@@ -26,6 +26,11 @@ class KefuChat
// 调用addon的chat方法
$response = $kefu_api->chat();
// 对于流式请求,直接输出不返回数据
if (isset($data['stream']) && $data['stream']) {
return null;
}
// 返回响应数据
return json_decode($response->getContent(), true);
} catch (\Exception $e) {

View File

@@ -16,6 +16,11 @@ class Kefu extends BaseApi
*/
public function health()
{
// 检查请求方式是否为POST
if (!request()->isPost()) {
return $this->response($this->error('请求方式错误仅支持POST请求'));
}
$start_time = microtime(true);
// (必选) 获取站点ID和会员ID可以通过事件数据传递
@@ -49,7 +54,7 @@ class Kefu extends BaseApi
try {
// 触发健康检查事件
$result = Event::trigger('KefuHealthCheck', $event_data);
$result = event('KefuHealthCheck', $event_data, true);
// 汇总检查结果
$health_summary = [
@@ -143,6 +148,11 @@ class Kefu extends BaseApi
*/
public function info()
{
// 检查请求方式是否为POST
if (!request()->isPost()) {
return $this->response($this->error('请求方式错误仅支持POST请求'));
}
// (可选获取站点ID和会员ID可以通过事件数据传递
$site_id = $this->params['uniacid'] ?? $this->site_id;
$member_id = $this->params['member_id'] ?? $this->member_id;
@@ -167,7 +177,7 @@ class Kefu extends BaseApi
];
// 触发获取配置信息事件
$result = event('KefuGetInfo', $event_data);
$result = event('KefuGetInfo', $event_data, true);
// 处理事件结果
$response = [
@@ -223,6 +233,11 @@ class Kefu extends BaseApi
*/
public function clearConversation()
{
// 检查请求方式是否为POST
if (!request()->isPost()) {
return $this->response($this->error('请求方式错误仅支持POST请求'));
}
// 获取请求参数
$conversation_id = $this->params['conversation_id'] ?? '';
$user_id = $this->params['user_id'] ?? $this->member_id;
@@ -253,7 +268,7 @@ class Kefu extends BaseApi
];
// 触发清除会话事件
$result = event('KefuClearConversation', $event_data);
$result = event('KefuClearConversation', $event_data, true);
// 处理事件结果
$response = [
@@ -286,11 +301,33 @@ class Kefu extends BaseApi
*/
public function chat()
{
// 检查请求方式是否为POST或者是EventSource请求
$isPost = request()->isPost();
$isGet = request()->isGet();
$isEventSource = $isGet && request()->header('Accept') === 'text/event-stream';
if (!$isPost && !$isEventSource) {
return $this->response($this->error('请求方式错误仅支持POST或EventSource请求'));
}
// 获取请求参数
// 对于GET请求需要单独获取参数因为BaseApi的构造函数可能没有正确处理GET参数
if ($isGet) {
$message = request()->get('message', '');
$user_id = request()->get('user_id', $this->member_id);
$conversation_id = request()->get('conversation_id', '');
$stream = request()->get('stream', false);
} else {
$message = $this->params['message'] ?? '';
$user_id = $this->params['user_id'] ?? $this->member_id;
$conversation_id = $this->params['conversation_id'] ?? '';
$stream = $this->params['stream'] ?? false;
}
// 确保stream参数正确处理字符串'false'和'0'
if (is_string($stream)) {
$stream = filter_var($stream, FILTER_VALIDATE_BOOLEAN);
}
// (可选获取站点ID和会员ID可以通过事件数据传递
$site_id = $this->params['uniacid'] ?? $this->site_id; // 使用 uniacid, 方便以后迁移而且uniacid 是唯一的, site_id 不是同时被params给过滤了
@@ -320,10 +357,15 @@ class Kefu extends BaseApi
];
if ($stream) {
event('KefuChat', $event_data);
exit; // 流式请求直接输出并结束
}
// 设置SSE响应头
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('X-Accel-Buffering: no'); // 禁用Nginx缓冲
// 触发事件,让监听器处理流式响应
event('KefuChat', $event_data);
} else {
// 触发智能客服聊天事件(非流式)
$result = event('KefuChat', $event_data);
@@ -347,6 +389,7 @@ class Kefu extends BaseApi
}
return $this->response($response);
}
} catch (\Exception $e) {
return $this->response($this->error('请求失败:' . $e->getMessage()));
}
@@ -358,6 +401,11 @@ class Kefu extends BaseApi
*/
public function getHistory()
{
// 检查请求方式是否为POST
if (!request()->isPost()) {
return $this->response($this->error('请求方式错误仅支持POST请求'));
}
// 获取请求参数
$conversation_id = $this->params['conversation_id'] ?? '';
$user_id = $this->params['user_id'] ?? $this->member_id;
@@ -387,7 +435,7 @@ class Kefu extends BaseApi
];
// 触发获取历史消息事件
$result = event('KefuGetHistory', $event_data);
$result = event('KefuGetHistory', $event_data, true);
// 处理事件结果
$response = [