Files
shop-platform/src/addon/aikefu/event/KefuChatStream.php

235 lines
7.5 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace addon\aikefu\event;
use addon\aikefu\model\Config as KefuConfigModel;
use addon\aikefu\model\Conversation as KefuConversationModel;
use addon\aikefu\model\Message as KefuMessageModel;
/**
* 智能客服流式聊天
*/
class KefuChatStream
{
/**
* 处理智能客服流式聊天事件
* @param array $data 事件数据
* @return array
*/
public function handle($data)
{
$message = $data['message'] ?? '';
$user_id = $data['user_id'] ?? '';
$conversation_id = $data['conversation_id'] ?? '';
$site_id = $data['site_id'] ?? 0;
$request_id = $data['request_id'] ?? '';
try {
// 验证参数
if (empty($message)) {
return [
[
'type' => 'error',
'message' => '消息内容不能为空'
]
];
}
// 获取智能客服配置
$kefu_config_model = new KefuConfigModel();
$config_info = $kefu_config_model->getConfig($site_id)['data']['value'] ?? [];
if (empty($config_info) || $config_info['status'] != 1) {
return [
[
'type' => 'error',
'message' => '智能客服暂未启用'
]
];
}
$config = $config_info;
$apiKey = $config['api_key'];
$baseUrl = $config['base_url'];
$chatEndpoint = $config['chat_endpoint'];
// 构建请求数据
$requestData = [
'inputs' => [],
'query' => $message,
'response_mode' => 'streaming',
'user' => $user_id,
];
if (!empty($conversation_id)) {
$requestData['conversation_id'] = $conversation_id;
}
// 构建请求头
$headers = [
'Authorization: Bearer ' . $apiKey,
'Content-Type: application/json',
'Accept: text/event-stream',
];
// 发送流式请求到Dify API
$url = $baseUrl . $chatEndpoint;
$result = $this->executeStreamRequest($url, $requestData, $headers);
return $result;
} catch (\Exception $e) {
return [
[
'type' => 'error',
'message' => '请求失败:' . $e->getMessage()
]
];
}
}
/**
* 执行流式请求
*/
private function executeStreamRequest($url, $requestData, $headers)
{
$ch = curl_init();
// 设置curl选项
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($requestData));
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, [$this, 'streamCallback']);
curl_setopt($ch, CURLOPT_TIMEOUT, 120); // 设置较长的超时时间
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
// 执行请求
$result = [];
$this->stream_buffer = '';
$this->conversation_id = '';
$this->current_message_id = '';
$response = curl_exec($ch);
$http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($response === false || !empty($error)) {
return [
[
'type' => 'error',
'message' => '请求失败:' . $error
]
];
}
if ($http_code >= 400) {
return [
[
'type' => 'error',
'message' => '服务端错误HTTP状态码' . $http_code
]
];
}
return $this->parseStreamResponse($this->stream_buffer);
}
/**
* 流式回调函数
*/
private function streamCallback($ch, $data)
{
$this->stream_buffer .= $data;
return strlen($data);
}
/**
* 解析流式响应
*/
private function parseStreamResponse($response)
{
$result = [];
$lines = explode("\n", $response);
$conversation_id = '';
$message_id = '';
$buffer = '';
foreach ($lines as $line) {
$line = trim($line);
if (empty($line)) {
continue;
}
if (strpos($line, 'data: ') === 0) {
$data_str = substr($line, 6);
if ($data_str === '[DONE]') {
// 流结束,发送完成事件
$result[] = [
'type' => 'complete',
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'content' => $buffer,
'finished' => true
];
break;
}
$data = json_decode($data_str, true);
if (json_last_error() === JSON_ERROR_NONE && isset($data)) {
// 提取会话ID和消息ID
if (isset($data['conversation_id'])) {
$conversation_id = $data['conversation_id'];
}
if (isset($data['message_id'])) {
$message_id = $data['message_id'];
}
// 处理内容块
if (isset($data['answer'])) {
$buffer .= $data['answer'];
$result[] = [
'type' => 'chunk',
'content' => $data['answer'],
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'finished' => false
];
}
// 检查是否结束
if (isset($data['finish_reason']) && $data['finish_reason'] !== 'null') {
$result[] = [
'type' => 'complete',
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'content' => $buffer,
'finish_reason' => $data['finish_reason'],
'usage' => $data['usage'] ?? [],
'finished' => true
];
break;
}
}
}
}
// 如果没有收到[DONE]信号,确保发送完成事件
if (empty($result) || end($result)['type'] !== 'complete') {
$result[] = [
'type' => 'complete',
'conversation_id' => $conversation_id,
'message_id' => $message_id,
'content' => $buffer,
'finished' => true
];
}
return $result;
}
}