内容为 AI 基于可视化知识库的项目源码生成。
1. 项目背景与技术概述 流式输出的价值与应用场景 我们要做一个问答工具,将可视化相关的信息构建为 RAG 知识库,便于辅助业务方人员解答可视化相关的问题。 一开始用的是传统的请求-响应模式,在处理 AI 对话时,需要让用户等待很久才能看到结果,非常不友好。因此准备改为流式输出方案。 流式输出技术让我们能够在数据准备就绪的同时就开始向用户展示,极大提升了用户体验。
核心应用场景 :
AI 对话生成 :实时展示 LLM 生成的文本内容,如 ChatGPT 的打字机效果
知识库检索问答 :RAG(检索增强生成)流式响应
数据处理进度 :长时间计算任务的实时反馈
实时数据分析 :大量数据的逐步处理和展示
技术优势 :
提升用户体验 :用户可以立即看到响应开始,减少等待焦虑
减少感知延迟 :内容逐步展现,比一次性加载感觉更快
资源利用优化 :可以边生成边传输,减少内存占用
更好的反馈 :用户知道系统正在工作,而不是卡住了
技术栈选择 后端技术栈 :
框架 :Egg.js (Node.js)
AI 集成 :OpenAI SDK + DeepSeek API
协议 :Server-Sent Events (SSE)
数据库 :MySQL (用于结果持久化)
前端技术栈 :
框架 :Vue 3 + TypeScript
通信方案 :XMLHttpRequest (兼容性最佳)
UI 组件 :Element Plus
状态管理 :Vue 3 Composition API
整体架构设计 graph TB
subgraph "前端层"
A[Vue组件] --> B[XMLHttpRequest API]
B --> C[流式数据处理]
C --> D[UI实时更新]
end
subgraph "网络层"
E[HTTP POST请求] --> F[SSE长连接]
F --> G[JSON数据流]
end
subgraph "后端层"
H[Egg.js Controller] --> I[Knowledge Service]
I --> J[AI API Adapter]
J --> K[DeepSeek API]
end
subgraph "AI服务层"
L[文档检索] --> M[RAG处理]
M --> N[流式生成]
N --> O[内容推送]
end
B --> E
F --> H
J --> L
O --> G
style A fill:#e1f5fe
style D fill:#e8f5e8
style K fill:#fff3e0
style O fill:#f3e5f5
2. 通信协议与数据流设计 SSE 协议原理 Server-Sent Events (SSE) 是 HTML5 标准的一部分,基于 HTTP 协议实现服务器向客户端的单向数据推送。
协议特点 :
单向通信 :服务器主动推送,客户端被动接收
自动重连 :具备内置的断线重连机制
HTTP 兼容 :无需协议升级,便于代理和负载均衡
事件流格式 :标准化的数据传输格式
SSE 数据格式 :
1 2 3 4 5 6 7 8 9 data: {"type" :"start" ,"code" :0 ,"message" :"stream started" }data: {"type" :"chunk" ,"content" :"人工智能" ,"fullContent" :"人工智能" }data: {"type" :"chunk" ,"content" :"技术" ,"fullContent" :"人工智能技术" }event: enddata: {"type" :"complete" ,"code" :0 ,"fullAnswer" :"人工智能技术发展迅速..." }
格式说明 :
每个数据块以data:开头
数据块之间用双换行符分隔
支持自定义事件类型(如event: end)
连接保持打开状态,支持持续推送
数据结构标准化 消息参数接口 :
1 2 3 4 5 6 7 interface MessageParams { role : string ; content : string ; temperature : number ; max_tokens?: number ; response_format?: { type : 'json_object' } | { type : 'text' }; }
流式响应数据结构 :
1 2 3 4 5 6 7 8 9 interface StreamResponse { type : 'start' | 'chunk' | 'complete' | 'error' ; content?: string ; fullContent?: string ; code : number ; message?: string ; error?: string ; costTime?: number ; }
3. 后端实现方案 AI API 抽象层设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 export abstract class LLMAIApi { public model : string ; protected openai?: OpenAI ; protected __client : ReturnType <typeof axios.create >; constructor ({ baseUrl, model, timeout = 60e3 , apiKey }: Params ) { this .model = model; this .__client = axios.create ({ baseURL : baseUrl, timeout }); if (apiKey) { this .openai = new OpenAI ({ baseURL : baseUrl, apiKey }); } } abstract chatCompletions ( message : MessageParams , traceId : string ): Promise <Error | Response <any >>; chatCompletionsStream?( message : MessageParams , traceId : string ): Promise <Error | Response <any >>; }
架构优势 :
多态支持 :不同 AI 提供商统一接口
可选实现 :流式功能按需实现
错误处理 :统一的错误返回格式
DeepSeek 流式 API 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 export class DeepSeekAIApi extends LLMAIApi { private buildRequestParams (message : MessageParams , stream = false ): any { const requestParams : any = { messages : [ { role : 'system' , content : 'You are a helpful assistant.' }, { role : 'user' , content : message.content }, ], model : this .model , }; if (message.temperature !== undefined ) { requestParams.temperature = message.temperature ; } if (message.max_tokens !== undefined ) { requestParams.max_tokens = message.max_tokens ; } if (message.response_format !== undefined ) { requestParams.response_format = message.response_format ; } if (stream) { requestParams.stream = true ; } return requestParams; } async chatCompletionsStream (message: MessageParams, traceId: string ) { try { const requestParams = this .buildRequestParams (message, true ); const stream = await this .openai .chat .completions .create (requestParams); return { status_code : 0 , status_msg : 'Success' , data : stream, traceId, }; } catch (error : any ) { this .handleApiError (error, 'chatCompletionsStream' ); } } private handleApiError (error : any , operation : string ): never { console .error (`${operation} Error:` , error); throw new Error (`${operation} Error:` + error.message ); } }
控制器层 SSE 处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 public async queryStream ( ) { const { ctx } = this ; const { service } = ctx; const params = ctx.request .body ; ctx.status = 200 ; ctx.set ({ 'Content-Type' : 'text/event-stream' , 'Cache-Control' : 'no-cache' , 'Connection' : 'keep-alive' , 'Access-Control-Allow-Origin' : '*' , 'Access-Control-Allow-Headers' : 'Cache-Control' , }); ctx.respond = false ; const email = ctx.session ?.unitePassport ?.email || '' ; params.email = email; if (!params.app_id ) { ctx.res .write (`data: ${JSON .stringify({ code: -1 , data: null , message: '参数错误,缺少app_id' , })} \n\n` ); ctx.res .end (); return ; } const traceId = ctx.helper .uuid (); params.traceId = traceId; try { const result = await service.knowledge .query .retrieveAndGenerateStream ( params.app_id , params.query , params.traceId ); if (result.error ) { ctx.res .write (`data: ${JSON .stringify({ code: -1 , data: null , error: result.error.message || 'unknown error' , })} \n\n` ); ctx.res .end (); return ; } ctx.res .write (`data: ${JSON .stringify({ type : 'start' , code: 0 , message: 'stream started' , })} \n\n` ); let fullAnswer = '' ; for await (const chunk of result.stream ) { const content = chunk.choices ?.[0 ]?.delta ?.content ; if (content) { fullAnswer += content; ctx.res .write (`data: ${JSON .stringify({ type : 'chunk' , content, // 当前块 fullContent: fullAnswer, // 累积内容 })} \n\n` ); } } const { app } = ctx; const { mysql } = app; await mysql.insert ('knowledge_qa' , { app_id : params.app_id , email : params.email , query : params.query , answer : fullAnswer, vector_data : JSON .stringify ({ docs : result.docs }), prompt : result.prompt , cost_time : performance.now () - Date .now (), }); ctx.res .write (`data: ${JSON .stringify({ type : 'complete' , code: 0 , message: 'stream completed' , fullAnswer, costTime: performance.now() - Date .now(), })} \n\n` ); ctx.res .write ('data: [DONE]\n\n' ); ctx.res .end (); } catch (error : any ) { ctx.res .write (`data: ${JSON .stringify({ code: error.status_code || -1 , data: null , error: error.message || 'unknown error' , })} \n\n` ); ctx.res .end (); } }
实现要点 :
异步迭代 :for await...of处理 AsyncIterable 流
增量累积 :实时计算完整内容
状态通知 :start -> chunk* -> complete 的标准流程
错误恢复 :每个阶段的异常处理
RAG 流式架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 async retrieveAndGenerateStream (app_id, query, traceId ) { try { const { docs, prompt } = await this .prepareRAGData (app_id, query); const { params, message, AIApiInstance } = this .getAIApiConfig (prompt); if (params.model .includes ('deepseek' )) { const streamResult = await (AIApiInstance as any ).chatCompletionsStream ( message, traceId, ); if (streamResult instanceof Error ) { return { docs, prompt, error : streamResult, }; } return { docs, prompt, stream : streamResult.data , traceId : streamResult.traceId , }; } throw new Error ('当前模型不支持流式输出,请使用deepseek-chat模型' ); } catch (error : any ) { console .error ('RAG Stream Error:' , error); throw new Error (error.message ); } }
RAG 流式设计 :
预处理分离 :文档检索和流式生成解耦
模型适配 :按模型能力选择不同处理路径
上下文保持 :prompt 和 docs 信息随流传递
4. 前端实现方案 技术选型:为什么选择 XMLHttpRequest? 在实现前端流式输出时,主要有三种技术方案:
1. EventSource (标准 SSE)
1 2 3 4 const eventSource = new EventSource ('/api/stream' ); eventSource.onmessage = (event ) => { console .log (event.data ); };
优点 :专为 SSE 设计,使用简单缺点 :只支持 GET 请求,无法发送复杂数据
2. Fetch API + ReadableStream
1 2 const response = await fetch ('/api/stream' , { method : 'POST' });const reader = response.body .getReader ();
优点 :现代 API,支持 POST 请求缺点 :浏览器兼容性相对较差,错误处理复杂
3. XMLHttpRequest (我们的选择)
1 2 3 4 5 const xhr = new XMLHttpRequest (); xhr.open ('POST' , '/api/stream' , true ); xhr.onreadystatechange = function ( ) { };
优点 :兼容性最好,支持 POST,状态控制精确缺点 :API 相对老旧,代码量稍多
在我们的项目中,需要发送 POST 请求并传递复杂参数,最终选择了 XMLHttpRequest 方案。
核心实现代码 1. API 层实现 实现流程 :
创建 XMLHttpRequest 对象并配置 SSE 相关请求头
监听readystatechange事件,在 LOADING 状态就开始处理数据
使用增量读取策略,只处理新接收的数据
按行解析 SSE 格式数据,提取data:和event:字段
通过回调函数将解析后的数据传递给上层组件
智能处理各种异常情况,确保连接稳定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 export function getQueryAnswerStreamXHR (data, onMessage, onError, onComplete ) { const xhr = new XMLHttpRequest (); let buffer = '' ; let hasReceivedData = false ; xhr.open ('POST' , 'http://127.0.0.1:7001/knowledge/query/stream' , true ); xhr.setRequestHeader ('Content-Type' , 'application/json' ); xhr.setRequestHeader ('Accept' , 'text/event-stream' ); xhr.setRequestHeader ('Cache-Control' , 'no-cache' ); xhr.responseType = 'text' ; let lastResponseLength = 0 ; xhr.onreadystatechange = function ( ) { if ( xhr.readyState === XMLHttpRequest .LOADING || xhr.readyState === XMLHttpRequest .DONE ) { const newData = xhr.responseText .substring (lastResponseLength); lastResponseLength = xhr.responseText .length ; if (newData) { hasReceivedData = true ; buffer += newData; const lines = buffer.split ('\n' ); buffer = lines.pop () || '' ; for (const line of lines) { const trimmed = line.trim (); if (trimmed === '' ) continue ; if (trimmed.startsWith ('event:' )) { const eventType = trimmed.substring (6 ).trim (); if (eventType === 'end' ) { onComplete (); return ; } } else if (trimmed.startsWith ('data:' )) { const dataStr = trimmed.substring (5 ).trim (); if (dataStr === '' ) continue ; try { const parsedData = JSON .parse (dataStr); onMessage (parsedData); } catch (error) { console .error ('解析数据失败:' , error); } } } } if (xhr.readyState === XMLHttpRequest .DONE ) { if (!hasReceivedData && xhr.status !== 200 ) { onError (new Error (`HTTP error! status: ${xhr.status} ` )); } else { onComplete (); } } } }; xhr.onerror = () => onError (new Error ('网络请求失败' )); xhr.ontimeout = () => onError (new Error ('请求超时' )); xhr.send (JSON .stringify (data)); return { close : () => xhr.abort (), abort : () => xhr.abort (), }; }
2. Vue 组件实现 实现流程 :
定义消息数据结构,包含流式状态标记和缓冲区
创建新消息对象并立即添加到列表中
调用 API 开始流式传输,通过三个回调函数处理不同阶段
在onMessage回调中累积内容并实时更新 UI
使用 Vue 响应式系统自动触发界面更新
提供状态管理和资源清理机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 <template> <div class="chat-container"> <!-- 消息列表 --> <div class="message-list"> <div v-for="(message, index) in messages" :key="`msg-${index}-${message.query}`" class="message-item" > <div class="query">{{ message.query }}</div> <div class="answer" v-html="renderMarkdown(message.answer)"></div> <!-- 流式状态指示器:显示闪烁光标表示正在接收数据 --> <div v-if="message.isStreaming" class="streaming-cursor"> <span class="blinking-cursor">|</span> </div> <div class="cost-time">{{ message.cost_time }}</div> </div> </div> <!-- 输入区域 --> <div class="input-area"> <el-input v-model="input" placeholder="请输入问题" :disabled="isStreamActive" @keydown.enter="submit" /> <el-button @click="submit" type="primary" :disabled="isStreamActive" :loading="isStreamActive" > {{ isStreamActive ? '思考中...' : '发送' }} </el-button> </div> </div> </template> <script setup lang="ts"> import { ref, onUnmounted } from 'vue'; import * as api from '@/api/datav'; // 消息数据结构定义 interface Message { query: string; // 用户问题 answer: string; // 最终答案 cost_time: string; // 耗时信息 isStreaming?: boolean; // 是否正在流式接收 streamBuffer?: string; // 流式数据缓冲区 } const messages = ref<Message[]>([]); // 消息列表 const input = ref(''); // 输入框内容 const isStreamActive = ref(false); // 全局流式状态 const currentController = ref<{ close: () => void } | null>(null); // 当前连接控制器 async function submit() { const query = input.value.trim(); // 防护:空输入或正在流式传输时不处理 if (!query || isStreamActive.value) return; // 立即创建新消息对象并添加到列表 const newMessage: Message = { query, answer: '', // 初始为空,将通过流式更新 cost_time: '', isStreaming: true, // 标记为流式状态 streamBuffer: '', // 初始化缓冲区 }; messages.value.push(newMessage); // Vue响应式更新UI input.value = ''; // 清空输入框 isStreamActive.value = true; // 设置全局流式状态 const startTime = Date.now(); // 记录开始时间 try { // 调用流式API,传入三个关键回调函数 currentController.value = api.getQueryAnswerStreamXHR( { query, app_id: 1, email: 'user@example.com' }, // onMessage: 接收到流式数据时的处理 (data) => { const currentMessage = messages.value[messages.value.length - 1]; if (data.content && currentMessage.isStreaming) { // 累积内容到缓冲区 currentMessage.streamBuffer = (currentMessage.streamBuffer || '') + data.content; // 实时更新显示内容(Vue响应式自动更新UI) currentMessage.answer = currentMessage.streamBuffer; } }, // onError: 发生错误时的处理 (error) => { console.error('Stream error:', error); const currentMessage = messages.value[messages.value.length - 1]; if (currentMessage.isStreaming) { currentMessage.isStreaming = false; // 停止流式状态 // 显示已接收的内容或错误信息 currentMessage.answer = currentMessage.streamBuffer || '获取答案时出现错误'; currentMessage.cost_time = '出错'; } cleanup(); // 清理资源 }, // onComplete: 流式传输完成时的处理 () => { const endTime = Date.now(); const currentMessage = messages.value[messages.value.length - 1]; if (currentMessage.isStreaming) { currentMessage.isStreaming = false; // 结束流式状态 // 计算并显示耗时 currentMessage.cost_time = `回答用时${( (endTime - startTime) / 1000 ).toFixed(2)}s`; delete currentMessage.streamBuffer; // 清理缓冲区 } cleanup(); // 清理资源 } ); } catch (error) { console.error('Failed to start stream:', error); cleanup(); } } // 资源清理函数 function cleanup() { isStreamActive.value = false; // 重置全局状态 currentController.value = null; // 清空控制器引用 } // 组件卸载时的资源清理 onUnmounted(() => { if (currentController.value) { currentController.value.close(); // 关闭可能存在的连接 } }); </script> <style scoped> /* 流式状态指示器样式 */ .streaming-cursor { display: inline-block; margin-left: 5px; } /* 闪烁光标动画 */ .blinking-cursor { font-size: 16px; color: #409eff; animation: blink 1s infinite; } @keyframes blink { 0%, 50% { opacity: 1; } /* 前半秒显示 */ 51%, 100% { opacity: 0; } /* 后半秒隐藏 */ } </style>
3. 流式处理流程图 下图展示了完整的流式数据处理流程:
flowchart TD
A[用户输入问题] --> B{检查输入有效性}
B -->|无效| A
B -->|有效| C[创建新消息对象]
C --> D[设置流式状态]
D --> E[调用XMLHttpRequest API]
E --> F[配置请求头]
F --> G[发送POST请求]
G --> H[监听readystatechange]
H --> I{接收到数据?}
I -->|是| J[增量读取新数据]
I -->|否| K[等待数据]
K --> I
J --> L[解析SSE格式]
L --> M{数据类型?}
M -->|data:| N[解析JSON内容]
M -->|event: end| O[流式结束]
M -->|其他| P[跳过处理]
N --> Q[累积到缓冲区]
Q --> R[更新UI显示]
R --> S{还有数据?}
S -->|是| I
S -->|否| T[等待更多数据]
T --> I
O --> U[计算耗时]
U --> V[清理资源]
V --> W[结束流式状态]
P --> S
style A fill:#e1f5fe
style C fill:#f3e5f5
style R fill:#e8f5e8
style W fill:#fff3e0
关键技术点解析 1. 增量数据读取 1 2 3 const newData = xhr.responseText .substring (lastResponseLength); lastResponseLength = xhr.responseText .length ;
这是流式处理的核心技巧。XMLHttpRequest 的 responseText 会包含所有已接收的数据,我们需要记录上次读取的位置,只处理新增部分。
2. 智能状态码处理 1 2 3 4 5 6 if (!hasReceivedData && xhr.status !== 200 ) { onError (new Error (`HTTP error! status: ${xhr.status} ` )); } else { onComplete (); }
很多情况下,服务器可能返回 404 状态码但仍然传输有效数据。我们采用”有数据就是成功”的策略,只在确实没有接收到任何数据时才报错。
3. 缓冲区管理 1 2 3 buffer += newData;const lines = buffer.split ('\n' ); buffer = lines.pop () || '' ;
SSE 数据可能在任何位置被截断,我们需要维护一个缓冲区来保存不完整的行,等待下次数据到达时合并处理。
4. Vue 响应式更新 1 2 3 4 currentMessage.streamBuffer = (currentMessage.streamBuffer || '' ) + data.content ; currentMessage.answer = currentMessage.streamBuffer ;
利用 Vue3 的响应式系统,直接修改数据对象就能触发 UI 更新,实现实时显示效果。
5. 端到端完整流程 时序图:前后端完整交互 sequenceDiagram
participant User as 用户
participant Vue as Vue组件
participant XHR as XMLHttpRequest
participant Controller as Egg.js控制器
participant Service as 服务层
participant API as DeepSeek API
User->>Vue: 输入问题并提交
Vue->>Vue: 创建消息对象,设置流式状态
Vue->>XHR: 调用getQueryAnswerStreamXHR
XHR->>Controller: POST /knowledge/query/stream
Controller->>Controller: 设置SSE响应头
Controller->>Service: retrieveAndGenerateStream()
Service->>Service: 文档检索和RAG处理
Service->>API: chatCompletionsStream()
API-->>Service: AsyncIterable流对象
Service-->>Controller: {docs, prompt, stream}
Controller->>XHR: data: {"type":"start"}
XHR->>Vue: onMessage回调
Vue->>User: 显示开始状态
loop 流式数据传输
API-->>Service: 数据块chunk
Service-->>Controller: 流式chunk
Controller->>XHR: data: {"type":"chunk","content":"..."}
XHR->>Vue: onMessage(data)
Vue->>Vue: 累积内容到streamBuffer
Vue->>User: 实时更新UI显示
end
API-->>Service: 流式结束
Controller->>Controller: 保存到数据库
Controller->>XHR: data: {"type":"complete"}
Controller->>XHR: data: [DONE]
XHR->>Vue: onComplete回调
Vue->>Vue: 清理流式状态
Vue->>User: 显示最终结果
数据流转全过程 flowchart TD
A[用户输入问题] --> B[Vue组件处理]
B --> C[XMLHttpRequest发送请求]
C --> D[Egg.js控制器接收]
D --> E[设置SSE响应头]
E --> F[调用服务层]
F --> G[RAG文档检索]
G --> H[构建AI请求]
H --> I[DeepSeek API流式响应]
I --> J[AsyncIterable数据流]
J --> K[控制器处理chunk]
K --> L[SSE格式封装]
L --> M[HTTP推送到前端]
M --> N[XMLHttpRequest增量读取]
N --> O[解析SSE数据]
O --> P[Vue组件更新]
P --> Q[实时UI渲染]
Q --> R{还有数据?}
R -->|是| J
R -->|否| S[流式完成]
S --> T[数据持久化]
T --> U[资源清理]
style A fill:#e1f5fe
style Q fill:#e8f5e8
style I fill:#fff3e0
style T fill:#f3e5f5
6. 性能优化与最佳实践 后端优化策略 1. 连接管理
1 2 3 4 5 6 7 8 9 10 11 12 const STREAM_TIMEOUT = 30000 ; ctx.req .setTimeout (STREAM_TIMEOUT , () => { ctx.res .write ( `data: ${JSON .stringify({ code: -1 , error: 'Stream timeout' , })} \n\n` ); ctx.res .end (); });
2. 内存管理
流式处理 :避免大量数据在内存中累积
及时释放 :处理完成立即关闭连接
缓冲控制 :合理设置 HTTP 响应缓冲区
3. 数据库优化
1 2 3 4 5 6 7 8 setImmediate (async () => { await mysql.insert ('knowledge_qa' , { app_id : params.app_id , answer : fullAnswer, }); });
前端优化建议 1. 防抖处理 对于高频数据更新,可以考虑防抖:
1 2 3 4 5 6 7 8 let updateTimer;function updateUI (data ) { clearTimeout (updateTimer); updateTimer = setTimeout (() => { currentMessage.answer = currentMessage.streamBuffer ; }, 16 ); }
2. 虚拟滚动 对于大量消息的场景,考虑实现虚拟滚动:
1 2 3 4 5 6 7 8 9 10 11 <script setup> import { VirtualList } from '@tanstack/vue-virtual'; </script> <template> <VirtualList :items="messages" :item-size="100"> <template #default="{ item }"> <MessageItem :message="item" /> </template> </VirtualList> </template>
3. 数据压缩 服务端可以考虑 gzip 压缩:
1 xhr.setRequestHeader ('Accept-Encoding' , 'gzip, deflate' );
测试策略 1. 单元测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import { describe, it, expect, vi } from 'vitest' ;describe ('StreamAPI' , () => { it ('should handle streaming data correctly' , () => { const mockXHR = { open : vi.fn (), setRequestHeader : vi.fn (), send : vi.fn (), responseText : 'data: {"content": "test"}\n\n' , readyState : XMLHttpRequest .LOADING , status : 200 , }; }); });
2. 集成测试 1 2 3 4 5 6 7 8 const mockStream = [ 'data: {"content": "Hello"}\n\n' , 'data: {"content": " World"}\n\n' , 'event: end\ndata: {}\n\n' , ];
7. 开发实战经验 踩坑经验与注意事项 1. 避免多重重试 错误做法 :
1 2 3 4 5 xhr.onerror = () => { retryWithFetch (); };
正确做法 :
1 2 3 4 xhr.onerror = () => { onError (new Error ('网络请求失败' )); };
2. 内存泄漏防护 1 2 3 4 5 6 onUnmounted (() => { if (currentController.value ) { currentController.value .close (); } });
3. 状态管理 1 2 3 4 5 if (isStreamActive.value ) return ; isStreamActive.value = true ;
4. 错误边界处理 1 2 3 4 5 6 7 try { const parsedData = JSON .parse (dataStr); onMessage (parsedData); } catch (error) { console .error ('解析数据失败:' , error); }
8. 待优化内容 错误追踪 1 2 3 4 5 6 7 8 9 10 11 12 function reportError (error, context ) { console .error ('Stream error:' , error); analytics.track ('stream_error' , { error : error.message , context, timestamp : Date .now (), userAgent : navigator.userAgent , }); }
参考资料 :