chore(addon/aikefu): 去除chatStream,统一使用chat接口
This commit is contained in:
@@ -1,235 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace addon\aikefu\event;
|
|
||||||
|
|
||||||
use addon\aikefu\model\Config as KefuConfigModel;
|
|
||||||
use addon\aikefu\model\Conversation as KefuConversationModel;
|
|
||||||
use addon\aikefu\model\Message as KefuMessageModel;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 智能客服流式聊天
|
|
||||||
*/
|
|
||||||
class KefuChatStream
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* 处理智能客服流式聊天事件
|
|
||||||
* @param array $data 事件数据
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
public function handle($data)
|
|
||||||
{
|
|
||||||
$message = $data['message'] ?? '';
|
|
||||||
$user_id = $data['user_id'] ?? '';
|
|
||||||
$conversation_id = $data['conversation_id'] ?? '';
|
|
||||||
$site_id = $data['site_id'] ?? 0;
|
|
||||||
$request_id = $data['request_id'] ?? '';
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 验证参数
|
|
||||||
if (empty($message)) {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
'type' => 'error',
|
|
||||||
'message' => '消息内容不能为空'
|
|
||||||
]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取智能客服配置
|
|
||||||
$kefu_config_model = new KefuConfigModel();
|
|
||||||
$config_info = $kefu_config_model->getConfig($site_id)['data']['value'] ?? [];
|
|
||||||
|
|
||||||
if (empty($config_info) || $config_info['status'] != 1) {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
'type' => 'error',
|
|
||||||
'message' => '智能客服暂未启用'
|
|
||||||
]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
$config = $config_info;
|
|
||||||
$apiKey = $config['api_key'];
|
|
||||||
$baseUrl = $config['base_url'];
|
|
||||||
$chatEndpoint = $config['chat_endpoint'];
|
|
||||||
|
|
||||||
// 构建请求数据
|
|
||||||
$requestData = [
|
|
||||||
'inputs' => [],
|
|
||||||
'query' => $message,
|
|
||||||
'response_mode' => 'streaming',
|
|
||||||
'user' => $user_id,
|
|
||||||
];
|
|
||||||
|
|
||||||
if (!empty($conversation_id)) {
|
|
||||||
$requestData['conversation_id'] = $conversation_id;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 构建请求头
|
|
||||||
$headers = [
|
|
||||||
'Authorization: Bearer ' . $apiKey,
|
|
||||||
'Content-Type: application/json',
|
|
||||||
'Accept: text/event-stream',
|
|
||||||
];
|
|
||||||
|
|
||||||
// 发送流式请求到Dify API
|
|
||||||
$url = $baseUrl . $chatEndpoint;
|
|
||||||
$result = $this->executeStreamRequest($url, $requestData, $headers);
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
'type' => 'error',
|
|
||||||
'message' => '请求失败:' . $e->getMessage()
|
|
||||||
]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行流式请求
|
|
||||||
*/
|
|
||||||
private function executeStreamRequest($url, $requestData, $headers)
|
|
||||||
{
|
|
||||||
$ch = curl_init();
|
|
||||||
|
|
||||||
// 设置curl选项
|
|
||||||
curl_setopt($ch, CURLOPT_URL, $url);
|
|
||||||
curl_setopt($ch, CURLOPT_POST, true);
|
|
||||||
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($requestData));
|
|
||||||
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
|
|
||||||
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
|
||||||
curl_setopt($ch, CURLOPT_WRITEFUNCTION, [$this, 'streamCallback']);
|
|
||||||
curl_setopt($ch, CURLOPT_TIMEOUT, 120); // 设置较长的超时时间
|
|
||||||
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
|
||||||
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
|
|
||||||
|
|
||||||
// 执行请求
|
|
||||||
$result = [];
|
|
||||||
$this->stream_buffer = '';
|
|
||||||
$this->conversation_id = '';
|
|
||||||
$this->current_message_id = '';
|
|
||||||
|
|
||||||
$response = curl_exec($ch);
|
|
||||||
$http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
|
||||||
$error = curl_error($ch);
|
|
||||||
curl_close($ch);
|
|
||||||
|
|
||||||
if ($response === false || !empty($error)) {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
'type' => 'error',
|
|
||||||
'message' => '请求失败:' . $error
|
|
||||||
]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($http_code >= 400) {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
'type' => 'error',
|
|
||||||
'message' => '服务端错误,HTTP状态码:' . $http_code
|
|
||||||
]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->parseStreamResponse($this->stream_buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 流式回调函数
|
|
||||||
*/
|
|
||||||
private function streamCallback($ch, $data)
|
|
||||||
{
|
|
||||||
$this->stream_buffer .= $data;
|
|
||||||
return strlen($data);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 解析流式响应
|
|
||||||
*/
|
|
||||||
private function parseStreamResponse($response)
|
|
||||||
{
|
|
||||||
$result = [];
|
|
||||||
$lines = explode("\n", $response);
|
|
||||||
$conversation_id = '';
|
|
||||||
$message_id = '';
|
|
||||||
$buffer = '';
|
|
||||||
|
|
||||||
foreach ($lines as $line) {
|
|
||||||
$line = trim($line);
|
|
||||||
|
|
||||||
if (empty($line)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strpos($line, 'data: ') === 0) {
|
|
||||||
$data_str = substr($line, 6);
|
|
||||||
|
|
||||||
if ($data_str === '[DONE]') {
|
|
||||||
// 流结束,发送完成事件
|
|
||||||
$result[] = [
|
|
||||||
'type' => 'complete',
|
|
||||||
'conversation_id' => $conversation_id,
|
|
||||||
'message_id' => $message_id,
|
|
||||||
'content' => $buffer,
|
|
||||||
'finished' => true
|
|
||||||
];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
$data = json_decode($data_str, true);
|
|
||||||
if (json_last_error() === JSON_ERROR_NONE && isset($data)) {
|
|
||||||
// 提取会话ID和消息ID
|
|
||||||
if (isset($data['conversation_id'])) {
|
|
||||||
$conversation_id = $data['conversation_id'];
|
|
||||||
}
|
|
||||||
if (isset($data['message_id'])) {
|
|
||||||
$message_id = $data['message_id'];
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理内容块
|
|
||||||
if (isset($data['answer'])) {
|
|
||||||
$buffer .= $data['answer'];
|
|
||||||
$result[] = [
|
|
||||||
'type' => 'chunk',
|
|
||||||
'content' => $data['answer'],
|
|
||||||
'conversation_id' => $conversation_id,
|
|
||||||
'message_id' => $message_id,
|
|
||||||
'finished' => false
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查是否结束
|
|
||||||
if (isset($data['finish_reason']) && $data['finish_reason'] !== 'null') {
|
|
||||||
$result[] = [
|
|
||||||
'type' => 'complete',
|
|
||||||
'conversation_id' => $conversation_id,
|
|
||||||
'message_id' => $message_id,
|
|
||||||
'content' => $buffer,
|
|
||||||
'finish_reason' => $data['finish_reason'],
|
|
||||||
'usage' => $data['usage'] ?? [],
|
|
||||||
'finished' => true
|
|
||||||
];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果没有收到[DONE]信号,确保发送完成事件
|
|
||||||
if (empty($result) || end($result)['type'] !== 'complete') {
|
|
||||||
$result[] = [
|
|
||||||
'type' => 'complete',
|
|
||||||
'conversation_id' => $conversation_id,
|
|
||||||
'message_id' => $message_id,
|
|
||||||
'content' => $buffer,
|
|
||||||
'finished' => true
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -43,7 +43,6 @@ class KefuGetInfo
|
|||||||
],
|
],
|
||||||
'endpoints' => [
|
'endpoints' => [
|
||||||
'chat' => '/api/kefu/chat',
|
'chat' => '/api/kefu/chat',
|
||||||
'chat_stream' => '/api/kefu/chatStream',
|
|
||||||
'get_history' => '/api/kefu/getHistory',
|
'get_history' => '/api/kefu/getHistory',
|
||||||
'clear_conversation' => '/api/kefu/clearConversation',
|
'clear_conversation' => '/api/kefu/clearConversation',
|
||||||
'health' => '/api/kefu/health',
|
'health' => '/api/kefu/health',
|
||||||
|
|||||||
@@ -26,6 +26,11 @@ class Kefu extends BaseApi
|
|||||||
return $this->response($this->error('缺少关键参数站点ID'));
|
return $this->response($this->error('缺少关键参数站点ID'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 验证事件是否存在
|
||||||
|
if (!hasEvent('KefuHealthCheck')) {
|
||||||
|
return $this->response($this->error('智能客服插件未成功注册事件'));
|
||||||
|
}
|
||||||
|
|
||||||
// 准备事件数据
|
// 准备事件数据
|
||||||
$event_data = [
|
$event_data = [
|
||||||
'check_id' => uniqid('health_', true),
|
'check_id' => uniqid('health_', true),
|
||||||
@@ -143,6 +148,11 @@ class Kefu extends BaseApi
|
|||||||
$member_id = $this->params['member_id'] ?? $this->member_id;
|
$member_id = $this->params['member_id'] ?? $this->member_id;
|
||||||
$token = $this->params['token'] ?? $this->token;
|
$token = $this->params['token'] ?? $this->token;
|
||||||
|
|
||||||
|
// 验证事件是否存在
|
||||||
|
if (!hasEvent('KefuGetInfo')) {
|
||||||
|
return $this->response($this->error('智能客服插件未成功注册事件'));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 准备事件数据
|
// 准备事件数据
|
||||||
$event_data = [
|
$event_data = [
|
||||||
@@ -227,6 +237,11 @@ class Kefu extends BaseApi
|
|||||||
return $this->response($this->error('会话ID或用户ID不能为空'));
|
return $this->response($this->error('会话ID或用户ID不能为空'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 验证事件是否存在
|
||||||
|
if (!hasEvent('KefuClearConversation')) {
|
||||||
|
return $this->response($this->error('智能客服插件未成功注册事件'));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 准备事件数据
|
// 准备事件数据
|
||||||
$event_data = [
|
$event_data = [
|
||||||
@@ -267,7 +282,7 @@ class Kefu extends BaseApi
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 智能客服聊天接口
|
* 智能客服聊天接口
|
||||||
* @return \think\response\Json
|
* @return \think\response\Json|array|bool|string|void
|
||||||
*/
|
*/
|
||||||
public function chat()
|
public function chat()
|
||||||
{
|
{
|
||||||
@@ -287,6 +302,11 @@ class Kefu extends BaseApi
|
|||||||
return $this->response($this->error('请输入消息内容'));
|
return $this->response($this->error('请输入消息内容'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 验证事件是否存在
|
||||||
|
if (!hasEvent('KefuChat')) {
|
||||||
|
return $this->response($this->error('智能客服插件未成功注册事件'));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 准备事件数据
|
// 准备事件数据
|
||||||
$event_data = [
|
$event_data = [
|
||||||
@@ -299,7 +319,12 @@ class Kefu extends BaseApi
|
|||||||
'token' => $token,
|
'token' => $token,
|
||||||
];
|
];
|
||||||
|
|
||||||
// 触发智能客服聊天事件
|
if ($stream) {
|
||||||
|
event('KefuChat', $event_data);
|
||||||
|
exit; // 流式请求直接输出并结束
|
||||||
|
}
|
||||||
|
|
||||||
|
// 触发智能客服聊天事件(非流式)
|
||||||
$result = event('KefuChat', $event_data);
|
$result = event('KefuChat', $event_data);
|
||||||
|
|
||||||
// 处理事件结果
|
// 处理事件结果
|
||||||
@@ -326,175 +351,6 @@ class Kefu extends BaseApi
|
|||||||
return $this->response($this->error('请求失败:' . $e->getMessage()));
|
return $this->response($this->error('请求失败:' . $e->getMessage()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* 智能客服聊天流式接口
|
|
||||||
*/
|
|
||||||
public function chatStream()
|
|
||||||
{
|
|
||||||
// 获取请求参数
|
|
||||||
$message = $this->params['message'] ?? '';
|
|
||||||
$user_id = $this->params['user_id'] ?? $this->member_id;
|
|
||||||
$conversation_id = $this->params['conversation_id'] ?? '';
|
|
||||||
$stream = true;
|
|
||||||
|
|
||||||
// (可选)获取站点ID和会员ID,可以通过事件数据传递
|
|
||||||
$site_id = $this->params['uniacid'] ?? $this->site_id; // 使用 uniacid, 方便以后迁移,而且uniacid 是唯一的, site_id 不是,同时被params给过滤了
|
|
||||||
$member_id = $this->params['member_id'] ?? $this->member_id;
|
|
||||||
$token = $this->params['token'] ?? $this->token;
|
|
||||||
|
|
||||||
// 验证参数
|
|
||||||
if (empty($message)) {
|
|
||||||
$this->sendStreamError('请输入消息内容');
|
|
||||||
exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 准备事件数据
|
|
||||||
$event_data = [
|
|
||||||
'message' => $message,
|
|
||||||
'user_id' => $user_id,
|
|
||||||
'conversation_id' => $conversation_id,
|
|
||||||
'stream' => $stream,
|
|
||||||
'site_id' =>$site_id,
|
|
||||||
'member_id' => $member_id,
|
|
||||||
'token' => $token,
|
|
||||||
'request_id' => uniqid('stream_', true),
|
|
||||||
'timestamp' => time(),
|
|
||||||
];
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 设置流式响应头
|
|
||||||
$this->setStreamHeaders();
|
|
||||||
|
|
||||||
// 发送开始事件
|
|
||||||
$this->sendStreamEvent('start', [
|
|
||||||
'request_id' => $event_data['request_id'],
|
|
||||||
'timestamp' => $event_data['timestamp'],
|
|
||||||
'message' => '开始处理请求'
|
|
||||||
]);
|
|
||||||
|
|
||||||
// 触发流式聊天事件
|
|
||||||
$result = event('KefuChatStream', $event_data);
|
|
||||||
|
|
||||||
// 处理事件结果并流式输出
|
|
||||||
$this->processStreamResults($result, $event_data);
|
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
$this->sendStreamError('请求失败:' . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 设置流式响应头
|
|
||||||
*/
|
|
||||||
private function setStreamHeaders()
|
|
||||||
{
|
|
||||||
header('Content-Type: text/event-stream');
|
|
||||||
header('Cache-Control: no-cache');
|
|
||||||
header('Connection: keep-alive');
|
|
||||||
header('Access-Control-Allow-Origin: *');
|
|
||||||
header('Access-Control-Allow-Headers: Cache-Control, Content-Type');
|
|
||||||
header('X-Accel-Buffering: no'); // 禁用nginx缓冲
|
|
||||||
if (function_exists('apache_setenv')) {
|
|
||||||
apache_setenv('no-gzip', '1');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送流式事件
|
|
||||||
*/
|
|
||||||
private function sendStreamEvent($event_type, $data)
|
|
||||||
{
|
|
||||||
$payload = [
|
|
||||||
'id' => uniqid(),
|
|
||||||
'event' => $event_type,
|
|
||||||
'timestamp' => time(),
|
|
||||||
'data' => $data
|
|
||||||
];
|
|
||||||
|
|
||||||
echo "event: {$event_type}\n";
|
|
||||||
echo "data: " . json_encode($payload, JSON_UNESCAPED_UNICODE) . "\n\n";
|
|
||||||
|
|
||||||
if (ob_get_level()) {
|
|
||||||
ob_flush();
|
|
||||||
}
|
|
||||||
flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送流式错误
|
|
||||||
*/
|
|
||||||
private function sendStreamError($message)
|
|
||||||
{
|
|
||||||
if (!headers_sent()) {
|
|
||||||
$this->setStreamHeaders();
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->sendStreamEvent('error', [
|
|
||||||
'error' => $message,
|
|
||||||
'finished' => true
|
|
||||||
]);
|
|
||||||
|
|
||||||
exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理流式结果
|
|
||||||
*/
|
|
||||||
private function processStreamResults($results, $event_data)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
if (is_array($results) && !empty($results)) {
|
|
||||||
foreach ($results as $result) {
|
|
||||||
if (isset($result['type'])) {
|
|
||||||
switch ($result['type']) {
|
|
||||||
case 'chunk':
|
|
||||||
// 发送内容块
|
|
||||||
$this->sendStreamEvent('message', [
|
|
||||||
'content' => $result['content'] ?? '',
|
|
||||||
'conversation_id' => $result['conversation_id'] ?? $event_data['conversation_id'],
|
|
||||||
'message_id' => $result['message_id'] ?? '',
|
|
||||||
'finished' => $result['finished'] ?? false
|
|
||||||
]);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'error':
|
|
||||||
// 发送错误
|
|
||||||
$this->sendStreamEvent('error', [
|
|
||||||
'error' => $result['message'] ?? '未知错误',
|
|
||||||
'conversation_id' => $result['conversation_id'] ?? $event_data['conversation_id']
|
|
||||||
]);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'complete':
|
|
||||||
// 发送完成事件
|
|
||||||
$this->sendStreamEvent('complete', [
|
|
||||||
'conversation_id' => $result['conversation_id'] ?? $event_data['conversation_id'],
|
|
||||||
'message_id' => $result['message_id'] ?? '',
|
|
||||||
'usage' => $result['usage'] ?? [],
|
|
||||||
'finish_reason' => $result['finish_reason'] ?? ''
|
|
||||||
]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// 如果没有事件处理器或结果为空,发送完成事件
|
|
||||||
$this->sendStreamEvent('complete', [
|
|
||||||
'conversation_id' => $event_data['conversation_id'],
|
|
||||||
'message' => '暂无可用响应'
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 发送结束事件
|
|
||||||
$this->sendStreamEvent('end', [
|
|
||||||
'request_id' => $event_data['request_id'],
|
|
||||||
'status' => 'completed'
|
|
||||||
]);
|
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
$this->sendStreamError('处理流式结果失败:' . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取会话历史
|
* 获取会话历史
|
||||||
|
|||||||
Reference in New Issue
Block a user