流式输出

内容为 AI 基于可视化知识库的项目源码生成。

1. 项目背景与技术概述

流式输出的价值与应用场景

我们要做一个问答工具,将可视化相关的信息构建为 RAG 知识库,便于辅助业务方人员解答可视化相关的问题。
一开始用的是传统的请求-响应模式,在处理 AI 对话时,需要让用户等待很久才能看到结果,非常不友好。因此准备改为流式输出方案。
流式输出技术让我们能够在数据准备就绪的同时就开始向用户展示,极大提升了用户体验。

核心应用场景

  • AI 对话生成:实时展示 LLM 生成的文本内容,如 ChatGPT 的打字机效果
  • 知识库检索问答:RAG(检索增强生成)流式响应
  • 数据处理进度:长时间计算任务的实时反馈
  • 实时数据分析:大量数据的逐步处理和展示

技术优势

  1. 提升用户体验:用户可以立即看到响应开始,减少等待焦虑
  2. 减少感知延迟:内容逐步展现,比一次性加载感觉更快
  3. 资源利用优化:可以边生成边传输,减少内存占用
  4. 更好的反馈:用户知道系统正在工作,而不是卡住了

技术栈选择

后端技术栈

  • 框架:Egg.js (Node.js)
  • AI 集成:OpenAI SDK + DeepSeek API
  • 协议:Server-Sent Events (SSE)
  • 数据库:MySQL (用于结果持久化)

前端技术栈

  • 框架:Vue 3 + TypeScript
  • 通信方案:XMLHttpRequest (兼容性最佳)
  • UI 组件:Element Plus
  • 状态管理:Vue 3 Composition API

整体架构设计

graph TB
    subgraph "前端层"
        A[Vue组件] --> B[XMLHttpRequest API]
        B --> C[流式数据处理]
        C --> D[UI实时更新]
    end

    subgraph "网络层"
        E[HTTP POST请求] --> F[SSE长连接]
        F --> G[JSON数据流]
    end

    subgraph "后端层"
        H[Egg.js Controller] --> I[Knowledge Service]
        I --> J[AI API Adapter]
        J --> K[DeepSeek API]
    end

    subgraph "AI服务层"
        L[文档检索] --> M[RAG处理]
        M --> N[流式生成]
        N --> O[内容推送]
    end

    B --> E
    F --> H
    J --> L
    O --> G

    style A fill:#e1f5fe
    style D fill:#e8f5e8
    style K fill:#fff3e0
    style O fill:#f3e5f5

2. 通信协议与数据流设计

SSE 协议原理

Server-Sent Events (SSE) 是 HTML5 标准的一部分,基于 HTTP 协议实现服务器向客户端的单向数据推送。

协议特点

  • 单向通信:服务器主动推送,客户端被动接收
  • 自动重连:具备内置的断线重连机制
  • HTTP 兼容:无需协议升级,便于代理和负载均衡
  • 事件流格式:标准化的数据传输格式

SSE 数据格式

1
2
3
4
5
6
7
8
9
data: {"type":"start","code":0,"message":"stream started"}

data: {"type":"chunk","content":"人工智能","fullContent":"人工智能"}

data: {"type":"chunk","content":"技术","fullContent":"人工智能技术"}

event: end
data: {"type":"complete","code":0,"fullAnswer":"人工智能技术发展迅速..."}

格式说明

  • 每个数据块以data:开头
  • 数据块之间用双换行符分隔
  • 支持自定义事件类型(如event: end
  • 连接保持打开状态,支持持续推送

数据结构标准化

消息参数接口

1
2
3
4
5
6
7
interface MessageParams {
role: string; // 角色标识:system/user/assistant
content: string; // 消息内容
temperature: number; // 生成温度控制
max_tokens?: number; // 最大token限制
response_format?: { type: 'json_object' } | { type: 'text' };
}

流式响应数据结构

1
2
3
4
5
6
7
8
9
interface StreamResponse {
type: 'start' | 'chunk' | 'complete' | 'error'; // 事件类型
content?: string; // 当前块内容
fullContent?: string; // 累积完整内容
code: number; // 状态码
message?: string; // 状态信息
error?: string; // 错误信息
costTime?: number; // 处理耗时
}

3. 后端实现方案

AI API 抽象层设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 基础抽象类 - 统一API接口
export abstract class LLMAIApi {
public model: string;
protected openai?: OpenAI;
protected __client: ReturnType<typeof axios.create>;

constructor({ baseUrl, model, timeout = 60e3, apiKey }: Params) {
this.model = model;
this.__client = axios.create({ baseURL: baseUrl, timeout });
if (apiKey) {
this.openai = new OpenAI({ baseURL: baseUrl, apiKey });
}
}

// 抽象方法:普通聊天完成
abstract chatCompletions(
message: MessageParams,
traceId: string
): Promise<Error | Response<any>>;

// 可选方法:流式聊天完成
chatCompletionsStream?(
message: MessageParams,
traceId: string
): Promise<Error | Response<any>>;
}

架构优势

  • 多态支持:不同 AI 提供商统一接口
  • 可选实现:流式功能按需实现
  • 错误处理:统一的错误返回格式

DeepSeek 流式 API 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
export class DeepSeekAIApi extends LLMAIApi {
/**
* 构建请求参数 - 核心参数处理逻辑
* @param message 消息参数
* @param stream 是否启用流式模式
* @return 构建好的请求参数
*/
private buildRequestParams(message: MessageParams, stream = false): any {
const requestParams: any = {
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: message.content },
],
model: this.model,
};

// 条件参数添加:只有有效值才加入请求
if (message.temperature !== undefined) {
requestParams.temperature = message.temperature;
}
if (message.max_tokens !== undefined) {
requestParams.max_tokens = message.max_tokens;
}
if (message.response_format !== undefined) {
requestParams.response_format = message.response_format;
}

// 流式模式标记 - 关键配置
if (stream) {
requestParams.stream = true; // 启用OpenAI流式响应
}

return requestParams;
}

/**
* 流式聊天完成 - 核心流式API
*/
async chatCompletionsStream(message: MessageParams, traceId: string) {
try {
// 构建带流式标记的请求参数
const requestParams = this.buildRequestParams(message, true);

// 调用OpenAI SDK的流式接口
const stream = await this.openai.chat.completions.create(requestParams);

return {
status_code: 0,
status_msg: 'Success',
data: stream, // 返回AsyncIterable流对象
traceId,
};
} catch (error: any) {
this.handleApiError(error, 'chatCompletionsStream');
}
}

/**
* 统一错误处理
*/
private handleApiError(error: any, operation: string): never {
console.error(`${operation} Error:`, error);
throw new Error(`${operation} Error:` + error.message);
}
}

控制器层 SSE 处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* 流式查询接口 - SSE协议实现
*/
public async queryStream() {
const { ctx } = this;
const { service } = ctx;
const params = ctx.request.body;

// 1. 设置SSE协议头
ctx.status = 200;
ctx.set({
'Content-Type': 'text/event-stream', // SSE标准MIME类型
'Cache-Control': 'no-cache', // 禁止缓存确保实时性
'Connection': 'keep-alive', // 保持长连接
'Access-Control-Allow-Origin': '*', // CORS跨域支持
'Access-Control-Allow-Headers': 'Cache-Control',
});
ctx.respond = false; // 告诉Egg.js由我们自己处理响应体

// 2. 参数验证和预处理
const email = ctx.session?.unitePassport?.email || '';
params.email = email;

if (!params.app_id) {
ctx.res.write(`data: ${JSON.stringify({
code: -1,
data: null,
message: '参数错误,缺少app_id',
})}\n\n`);
ctx.res.end();
return;
}

const traceId = ctx.helper.uuid();
params.traceId = traceId;

try {
// 3. 调用服务层获取流式数据
const result = await service.knowledge.query.retrieveAndGenerateStream(
params.app_id,
params.query,
params.traceId
);

if (result.error) {
ctx.res.write(`data: ${JSON.stringify({
code: -1,
data: null,
error: result.error.message || 'unknown error',
})}\n\n`);
ctx.res.end();
return;
}

// 4. 发送开始事件
ctx.res.write(`data: ${JSON.stringify({
type: 'start',
code: 0,
message: 'stream started',
})}\n\n`);

// 5. 核心流式处理循环
let fullAnswer = '';
for await (const chunk of result.stream) {
const content = chunk.choices?.[0]?.delta?.content; // 提取增量内容
if (content) {
fullAnswer += content; // 累积完整内容

// 发送增量数据块
ctx.res.write(`data: ${JSON.stringify({
type: 'chunk',
content, // 当前块
fullContent: fullAnswer, // 累积内容
})}\n\n`);
}
}

// 6. 数据持久化
const { app } = ctx;
const { mysql } = app;
await mysql.insert('knowledge_qa', {
app_id: params.app_id,
email: params.email,
query: params.query,
answer: fullAnswer,
vector_data: JSON.stringify({ docs: result.docs }),
prompt: result.prompt,
cost_time: performance.now() - Date.now(),
});

// 7. 发送完成事件
ctx.res.write(`data: ${JSON.stringify({
type: 'complete',
code: 0,
message: 'stream completed',
fullAnswer,
costTime: performance.now() - Date.now(),
})}\n\n`);

// 8. 标准SSE结束标记
ctx.res.write('data: [DONE]\n\n');
ctx.res.end();

} catch (error: any) {
// 统一错误处理
ctx.res.write(`data: ${JSON.stringify({
code: error.status_code || -1,
data: null,
error: error.message || 'unknown error',
})}\n\n`);
ctx.res.end();
}
}

实现要点

  • 异步迭代for await...of处理 AsyncIterable 流
  • 增量累积:实时计算完整内容
  • 状态通知:start -> chunk* -> complete 的标准流程
  • 错误恢复:每个阶段的异常处理

RAG 流式架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* RAG:检索和生成答案 - 流式版本
* @param app_id 应用ID
* @param query 查询问题
* @param traceId 追踪ID
* @return 包含文档、prompt和流式数据的对象
*/
async retrieveAndGenerateStream(app_id, query, traceId) {
try {
// 1. RAG预处理:文档检索和提示词构建
const { docs, prompt } = await this.prepareRAGData(app_id, query);

// 2. 获取AI API配置
const { params, message, AIApiInstance } = this.getAIApiConfig(prompt);

// 3. 流式能力检查
if (params.model.includes('deepseek')) {
// 调用DeepSeek流式API
const streamResult = await (AIApiInstance as any).chatCompletionsStream(
message,
traceId,
);

if (streamResult instanceof Error) {
return {
docs,
prompt,
error: streamResult,
};
}

return {
docs, // 检索文档
prompt, // 构建的提示词
stream: streamResult.data, // AsyncIterable流对象
traceId: streamResult.traceId,
};
}

throw new Error('当前模型不支持流式输出,请使用deepseek-chat模型');
} catch (error: any) {
console.error('RAG Stream Error:', error);
throw new Error(error.message);
}
}

RAG 流式设计

  • 预处理分离:文档检索和流式生成解耦
  • 模型适配:按模型能力选择不同处理路径
  • 上下文保持:prompt 和 docs 信息随流传递

4. 前端实现方案

技术选型:为什么选择 XMLHttpRequest?

在实现前端流式输出时,主要有三种技术方案:

1. EventSource (标准 SSE)

1
2
3
4
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
console.log(event.data);
};

优点:专为 SSE 设计,使用简单
缺点:只支持 GET 请求,无法发送复杂数据

2. Fetch API + ReadableStream

1
2
const response = await fetch('/api/stream', { method: 'POST' });
const reader = response.body.getReader();

优点:现代 API,支持 POST 请求
缺点:浏览器兼容性相对较差,错误处理复杂

3. XMLHttpRequest (我们的选择)

1
2
3
4
5
const xhr = new XMLHttpRequest();
xhr.open('POST', '/api/stream', true);
xhr.onreadystatechange = function () {
// 处理流式数据
};

优点:兼容性最好,支持 POST,状态控制精确
缺点:API 相对老旧,代码量稍多

在我们的项目中,需要发送 POST 请求并传递复杂参数,最终选择了 XMLHttpRequest 方案。

核心实现代码

1. API 层实现

实现流程

  1. 创建 XMLHttpRequest 对象并配置 SSE 相关请求头
  2. 监听readystatechange事件,在 LOADING 状态就开始处理数据
  3. 使用增量读取策略,只处理新接收的数据
  4. 按行解析 SSE 格式数据,提取data:event:字段
  5. 通过回调函数将解析后的数据传递给上层组件
  6. 智能处理各种异常情况,确保连接稳定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 使用XMLHttpRequest实现SSE流式输出
* @param {Object} data - 请求参数
* @param {Function} onMessage - 接收到数据时的回调
* @param {Function} onError - 发生错误时的回调
* @param {Function} onComplete - 流式传输完成时的回调
*/
export function getQueryAnswerStreamXHR(data, onMessage, onError, onComplete) {
const xhr = new XMLHttpRequest();
let buffer = ''; // 数据缓冲区,处理不完整的数据行
let hasReceivedData = false; // 标记是否接收到有效数据

// 配置POST请求,支持发送复杂参数
xhr.open('POST', 'http://127.0.0.1:7001/knowledge/query/stream', true);
xhr.setRequestHeader('Content-Type', 'application/json');
xhr.setRequestHeader('Accept', 'text/event-stream'); // 告诉服务器期望SSE格式
xhr.setRequestHeader('Cache-Control', 'no-cache'); // 禁用缓存
xhr.responseType = 'text'; // 确保以文本形式接收数据

let lastResponseLength = 0; // 记录上次读取的位置,实现增量读取

xhr.onreadystatechange = function () {
// 关键:在LOADING状态就开始处理数据,不等待完全加载完成
if (
xhr.readyState === XMLHttpRequest.LOADING ||
xhr.readyState === XMLHttpRequest.DONE
) {
// 增量读取:只处理新接收的数据,避免重复处理
const newData = xhr.responseText.substring(lastResponseLength);
lastResponseLength = xhr.responseText.length;

if (newData) {
hasReceivedData = true; // 标记已接收到数据
buffer += newData; // 添加到缓冲区

// 按行解析SSE数据格式
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留最后一行(可能不完整)

// 逐行处理完整的数据行
for (const line of lines) {
const trimmed = line.trim();
if (trimmed === '') continue; // 跳过空行

// 处理事件类型(如结束事件)
if (trimmed.startsWith('event:')) {
const eventType = trimmed.substring(6).trim();
if (eventType === 'end') {
onComplete(); // 流式传输结束
return;
}
}
// 处理数据内容
else if (trimmed.startsWith('data:')) {
const dataStr = trimmed.substring(5).trim();
if (dataStr === '') continue;

try {
const parsedData = JSON.parse(dataStr);
onMessage(parsedData); // 将解析后的数据传递给组件
} catch (error) {
console.error('解析数据失败:', error);
// 单条数据解析失败不中断整个流
}
}
}
}

// 请求完成时的最终处理
if (xhr.readyState === XMLHttpRequest.DONE) {
// 智能错误处理:有数据就认为成功,避免误判404等状态码
if (!hasReceivedData && xhr.status !== 200) {
onError(new Error(`HTTP error! status: ${xhr.status}`));
} else {
onComplete();
}
}
}
};

// 网络级别的错误处理
xhr.onerror = () => onError(new Error('网络请求失败'));
xhr.ontimeout = () => onError(new Error('请求超时'));

// 发送JSON格式的请求数据
xhr.send(JSON.stringify(data));

// 返回控制对象,允许外部中断连接
return {
close: () => xhr.abort(), // 关闭连接
abort: () => xhr.abort(), // 中止请求
};
}

2. Vue 组件实现

实现流程

  1. 定义消息数据结构,包含流式状态标记和缓冲区
  2. 创建新消息对象并立即添加到列表中
  3. 调用 API 开始流式传输,通过三个回调函数处理不同阶段
  4. onMessage回调中累积内容并实时更新 UI
  5. 使用 Vue 响应式系统自动触发界面更新
  6. 提供状态管理和资源清理机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
<template>
<div class="chat-container">
<!-- 消息列表 -->
<div class="message-list">
<div
v-for="(message, index) in messages"
:key="`msg-${index}-${message.query}`"
class="message-item"
>
<div class="query">{{ message.query }}</div>
<div class="answer" v-html="renderMarkdown(message.answer)"></div>
<!-- 流式状态指示器:显示闪烁光标表示正在接收数据 -->
<div v-if="message.isStreaming" class="streaming-cursor">
<span class="blinking-cursor">|</span>
</div>
<div class="cost-time">{{ message.cost_time }}</div>
</div>
</div>

<!-- 输入区域 -->
<div class="input-area">
<el-input
v-model="input"
placeholder="请输入问题"
:disabled="isStreamActive"
@keydown.enter="submit"
/>
<el-button
@click="submit"
type="primary"
:disabled="isStreamActive"
:loading="isStreamActive"
>
{{ isStreamActive ? '思考中...' : '发送' }}
</el-button>
</div>
</div>
</template>

<script setup lang="ts">
import { ref, onUnmounted } from 'vue';
import * as api from '@/api/datav';

// 消息数据结构定义
interface Message {
query: string; // 用户问题
answer: string; // 最终答案
cost_time: string; // 耗时信息
isStreaming?: boolean; // 是否正在流式接收
streamBuffer?: string; // 流式数据缓冲区
}

const messages = ref<Message[]>([]); // 消息列表
const input = ref(''); // 输入框内容
const isStreamActive = ref(false); // 全局流式状态
const currentController = ref<{ close: () => void } | null>(null); // 当前连接控制器

async function submit() {
const query = input.value.trim();
// 防护:空输入或正在流式传输时不处理
if (!query || isStreamActive.value) return;

// 立即创建新消息对象并添加到列表
const newMessage: Message = {
query,
answer: '', // 初始为空,将通过流式更新
cost_time: '',
isStreaming: true, // 标记为流式状态
streamBuffer: '', // 初始化缓冲区
};

messages.value.push(newMessage); // Vue响应式更新UI
input.value = ''; // 清空输入框
isStreamActive.value = true; // 设置全局流式状态

const startTime = Date.now(); // 记录开始时间

try {
// 调用流式API,传入三个关键回调函数
currentController.value = api.getQueryAnswerStreamXHR(
{ query, app_id: 1, email: 'user@example.com' },

// onMessage: 接收到流式数据时的处理
(data) => {
const currentMessage = messages.value[messages.value.length - 1];
if (data.content && currentMessage.isStreaming) {
// 累积内容到缓冲区
currentMessage.streamBuffer =
(currentMessage.streamBuffer || '') + data.content;
// 实时更新显示内容(Vue响应式自动更新UI)
currentMessage.answer = currentMessage.streamBuffer;
}
},

// onError: 发生错误时的处理
(error) => {
console.error('Stream error:', error);
const currentMessage = messages.value[messages.value.length - 1];
if (currentMessage.isStreaming) {
currentMessage.isStreaming = false; // 停止流式状态
// 显示已接收的内容或错误信息
currentMessage.answer =
currentMessage.streamBuffer || '获取答案时出现错误';
currentMessage.cost_time = '出错';
}
cleanup(); // 清理资源
},

// onComplete: 流式传输完成时的处理
() => {
const endTime = Date.now();
const currentMessage = messages.value[messages.value.length - 1];
if (currentMessage.isStreaming) {
currentMessage.isStreaming = false; // 结束流式状态
// 计算并显示耗时
currentMessage.cost_time = `回答用时${(
(endTime - startTime) /
1000
).toFixed(2)}s`;
delete currentMessage.streamBuffer; // 清理缓冲区
}
cleanup(); // 清理资源
}
);
} catch (error) {
console.error('Failed to start stream:', error);
cleanup();
}
}

// 资源清理函数
function cleanup() {
isStreamActive.value = false; // 重置全局状态
currentController.value = null; // 清空控制器引用
}

// 组件卸载时的资源清理
onUnmounted(() => {
if (currentController.value) {
currentController.value.close(); // 关闭可能存在的连接
}
});
</script>

<style scoped>
/* 流式状态指示器样式 */
.streaming-cursor {
display: inline-block;
margin-left: 5px;
}

/* 闪烁光标动画 */
.blinking-cursor {
font-size: 16px;
color: #409eff;
animation: blink 1s infinite;
}

@keyframes blink {
0%,
50% {
opacity: 1;
} /* 前半秒显示 */
51%,
100% {
opacity: 0;
} /* 后半秒隐藏 */
}
</style>

3. 流式处理流程图

下图展示了完整的流式数据处理流程:

flowchart TD
    A[用户输入问题] --> B{检查输入有效性}
    B -->|无效| A
    B -->|有效| C[创建新消息对象]
    C --> D[设置流式状态]
    D --> E[调用XMLHttpRequest API]

    E --> F[配置请求头]
    F --> G[发送POST请求]
    G --> H[监听readystatechange]

    H --> I{接收到数据?}
    I -->|是| J[增量读取新数据]
    I -->|否| K[等待数据]
    K --> I

    J --> L[解析SSE格式]
    L --> M{数据类型?}
    M -->|data:| N[解析JSON内容]
    M -->|event: end| O[流式结束]
    M -->|其他| P[跳过处理]

    N --> Q[累积到缓冲区]
    Q --> R[更新UI显示]
    R --> S{还有数据?}
    S -->|是| I
    S -->|否| T[等待更多数据]
    T --> I

    O --> U[计算耗时]
    U --> V[清理资源]
    V --> W[结束流式状态]

    P --> S

    style A fill:#e1f5fe
    style C fill:#f3e5f5
    style R fill:#e8f5e8
    style W fill:#fff3e0

关键技术点解析

1. 增量数据读取

1
2
3
// 只读取新增的数据,避免重复处理
const newData = xhr.responseText.substring(lastResponseLength);
lastResponseLength = xhr.responseText.length;

这是流式处理的核心技巧。XMLHttpRequest 的 responseText 会包含所有已接收的数据,我们需要记录上次读取的位置,只处理新增部分。

2. 智能状态码处理

1
2
3
4
5
6
// 不要过早判断状态码错误
if (!hasReceivedData && xhr.status !== 200) {
onError(new Error(`HTTP error! status: ${xhr.status}`));
} else {
onComplete();
}

很多情况下,服务器可能返回 404 状态码但仍然传输有效数据。我们采用”有数据就是成功”的策略,只在确实没有接收到任何数据时才报错。

3. 缓冲区管理

1
2
3
buffer += newData;
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留未完成的行

SSE 数据可能在任何位置被截断,我们需要维护一个缓冲区来保存不完整的行,等待下次数据到达时合并处理。

4. Vue 响应式更新

1
2
3
4
// 直接修改响应式对象,触发UI更新
currentMessage.streamBuffer =
(currentMessage.streamBuffer || '') + data.content;
currentMessage.answer = currentMessage.streamBuffer;

利用 Vue3 的响应式系统,直接修改数据对象就能触发 UI 更新,实现实时显示效果。

5. 端到端完整流程

时序图:前后端完整交互

sequenceDiagram
    participant User as 用户
    participant Vue as Vue组件
    participant XHR as XMLHttpRequest
    participant Controller as Egg.js控制器
    participant Service as 服务层
    participant API as DeepSeek API

    User->>Vue: 输入问题并提交
    Vue->>Vue: 创建消息对象,设置流式状态
    Vue->>XHR: 调用getQueryAnswerStreamXHR
    XHR->>Controller: POST /knowledge/query/stream

    Controller->>Controller: 设置SSE响应头
    Controller->>Service: retrieveAndGenerateStream()
    Service->>Service: 文档检索和RAG处理
    Service->>API: chatCompletionsStream()

    API-->>Service: AsyncIterable流对象
    Service-->>Controller: {docs, prompt, stream}
    Controller->>XHR: data: {"type":"start"}
    XHR->>Vue: onMessage回调
    Vue->>User: 显示开始状态

    loop 流式数据传输
        API-->>Service: 数据块chunk
        Service-->>Controller: 流式chunk
        Controller->>XHR: data: {"type":"chunk","content":"..."}
        XHR->>Vue: onMessage(data)
        Vue->>Vue: 累积内容到streamBuffer
        Vue->>User: 实时更新UI显示
    end

    API-->>Service: 流式结束
    Controller->>Controller: 保存到数据库
    Controller->>XHR: data: {"type":"complete"}
    Controller->>XHR: data: [DONE]
    XHR->>Vue: onComplete回调
    Vue->>Vue: 清理流式状态
    Vue->>User: 显示最终结果

数据流转全过程

flowchart TD
    A[用户输入问题] --> B[Vue组件处理]
    B --> C[XMLHttpRequest发送请求]
    C --> D[Egg.js控制器接收]
    D --> E[设置SSE响应头]
    E --> F[调用服务层]
    F --> G[RAG文档检索]
    G --> H[构建AI请求]
    H --> I[DeepSeek API流式响应]
    I --> J[AsyncIterable数据流]
    J --> K[控制器处理chunk]
    K --> L[SSE格式封装]
    L --> M[HTTP推送到前端]
    M --> N[XMLHttpRequest增量读取]
    N --> O[解析SSE数据]
    O --> P[Vue组件更新]
    P --> Q[实时UI渲染]
    Q --> R{还有数据?}
    R -->|是| J
    R -->|否| S[流式完成]
    S --> T[数据持久化]
    T --> U[资源清理]

    style A fill:#e1f5fe
    style Q fill:#e8f5e8
    style I fill:#fff3e0
    style T fill:#f3e5f5

6. 性能优化与最佳实践

后端优化策略

1. 连接管理

1
2
3
4
5
6
7
8
9
10
11
12
// 连接超时控制
const STREAM_TIMEOUT = 30000; // 30秒超时

ctx.req.setTimeout(STREAM_TIMEOUT, () => {
ctx.res.write(
`data: ${JSON.stringify({
code: -1,
error: 'Stream timeout',
})}\n\n`
);
ctx.res.end();
});

2. 内存管理

  • 流式处理:避免大量数据在内存中累积
  • 及时释放:处理完成立即关闭连接
  • 缓冲控制:合理设置 HTTP 响应缓冲区

3. 数据库优化

1
2
3
4
5
6
7
8
// 异步写入,不阻塞流式响应
setImmediate(async () => {
await mysql.insert('knowledge_qa', {
app_id: params.app_id,
answer: fullAnswer,
// ... 其他字段
});
});

前端优化建议

1. 防抖处理

对于高频数据更新,可以考虑防抖:

1
2
3
4
5
6
7
8
let updateTimer;
function updateUI(data) {
clearTimeout(updateTimer);
updateTimer = setTimeout(() => {
// 批量更新UI
currentMessage.answer = currentMessage.streamBuffer;
}, 16); // ~60fps
}

2. 虚拟滚动

对于大量消息的场景,考虑实现虚拟滚动:

1
2
3
4
5
6
7
8
9
10
11
<script setup>
import { VirtualList } from '@tanstack/vue-virtual';
</script>

<template>
<VirtualList :items="messages" :item-size="100">
<template #default="{ item }">
<MessageItem :message="item" />
</template>
</VirtualList>
</template>

3. 数据压缩

服务端可以考虑 gzip 压缩:

1
xhr.setRequestHeader('Accept-Encoding', 'gzip, deflate');

测试策略

1. 单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { describe, it, expect, vi } from 'vitest';

describe('StreamAPI', () => {
it('should handle streaming data correctly', () => {
const mockXHR = {
open: vi.fn(),
setRequestHeader: vi.fn(),
send: vi.fn(),
responseText: 'data: {"content": "test"}\n\n',
readyState: XMLHttpRequest.LOADING,
status: 200,
};

// 测试流式数据处理逻辑
});
});

2. 集成测试

1
2
3
4
5
6
7
8
// 模拟流式数据
const mockStream = [
'data: {"content": "Hello"}\n\n',
'data: {"content": " World"}\n\n',
'event: end\ndata: {}\n\n',
];

// 测试完整流程

7. 开发实战经验

踩坑经验与注意事项

1. 避免多重重试

错误做法

1
2
3
4
5
// 不要这样做!会导致多次请求
xhr.onerror = () => {
// 在这里重试会导致多个并发请求
retryWithFetch();
};

正确做法

1
2
3
4
// 单一方案,明确的错误处理
xhr.onerror = () => {
onError(new Error('网络请求失败'));
};

2. 内存泄漏防护

1
2
3
4
5
6
// 组件卸载时必须清理资源
onUnmounted(() => {
if (currentController.value) {
currentController.value.close();
}
});

3. 状态管理

1
2
3
4
5
// 防止重复提交
if (isStreamActive.value) return;

// 及时更新状态
isStreamActive.value = true;

4. 错误边界处理

1
2
3
4
5
6
7
try {
const parsedData = JSON.parse(dataStr);
onMessage(parsedData);
} catch (error) {
// 单条数据解析失败不应该中断整个流
console.error('解析数据失败:', error);
}

8. 待优化内容

错误追踪

1
2
3
4
5
6
7
8
9
10
11
12
// 统一错误上报
function reportError(error, context) {
console.error('Stream error:', error);

// 发送到监控系统
analytics.track('stream_error', {
error: error.message,
context,
timestamp: Date.now(),
userAgent: navigator.userAgent,
});
}

参考资料