SSE 流式响应格式详解
本文档详细说明 CountBot 的 Server-Sent Events (SSE) 流式响应格式。
1. SSE 基础
1.1 什么是 SSE
Server-Sent Events (SSE) 是一种服务器向客户端推送实时数据的技术:
- 单向通信: 服务器 → 客户端(客户端不能通过 SSE 连接发送数据)
- 基于 HTTP: 使用标准 HTTP 协议,无需 WebSocket
- 自动重连: 浏览器会自动重连断开的连接
- 事件类型: 支持自定义事件类型
1.2 SSE 消息格式
event: <事件类型>
data: <数据内容>
id: <消息ID>(可选)
retry: <重连间隔>(可选)
注意: 每条消息以两个换行符 \n\n 结束。
2. CountBot 的 SSE 事件类型
2.1 事件类型列表
| 事件类型 | 说明 | 触发时机 |
|---|---|---|
start | 消息开始 | 用户消息保存到数据库后 |
message | 内容片段 | LLM 返回每个 token 时 |
done | 消息完成 | AI 响应保存到数据库后 |
error | 错误信息 | 处理过程中发生错误 |
2.2 事件数据格式
start 事件
{
"message_id": "1"
}
message 事件
{
"content": "你好"
}
done 事件
{
"message_id": "2"
}
error 事件
{
"error": "Tool execution failed: read_file - File not found",
"type": "ToolExecutionError"
}
3. 完整的 SSE 流示例
3.1 简单对话(无工具调用)
用户输入: "你好"
SSE 流:
event: start
data: {"message_id": "1"}
event: message
data: {"content": "你好"}
event: message
data: {"content": "!"}
event: message
data: {"content": "很高兴"}
event: message
data: {"content": "见到"}
event: message
data: {"content": "你"}
event: message
data: {"content": "!"}
event: message
data: {"content": "有什么"}
event: message
data: {"content": "我可以"}
event: message
data: {"content": "帮助"}
event: message
data: {"content": "你的"}
event: message
data: {"content": "吗"}
event: message
data: {"content": "?"}
event: done
data: {"message_id": "2"}
3.2 工具调用对话
用户输入: "读取 README.md 文件"
SSE 流:
event: start
data: {"message_id": "3"}
event: message
data: {"content": "我已经"}
event: message
data: {"content": "读取了"}
event: message
data: {"content": " README.md "}
event: message
data: {"content": "文件"}
event: message
data: {"content": "。"}
event: message
data: {"content": "这是"}
event: message
data: {"content": " CountBot "}
event: message
data: {"content": "项目的"}
event: message
data: {"content": "说明文档"}
event: message
data: {"content": ","}
event: message
data: {"content": "主要"}
event: message
data: {"content": "介绍了"}
event: message
data: {"content": "以下"}
event: message
data: {"content": "内容"}
event: message
data: {"content": ":\n\n"}
event: message
data: {"content": "1. "}
event: message
data: {"content": "**功能特性**"}
event: message
data: {"content": ":\n"}
event: message
data: {"content": " - 多模型支持"}
event: message
data: {"content": "(OpenAI、Claude、"}
event: message
data: {"content": "国产大模型)\n"}
event: message
data: {"content": " - 工具调用系统\n"}
event: message
data: {"content": " - 记忆管理\n"}
event: message
data: {"content": " - 技能系统\n"}
event: message
data: {"content": " - 多渠道接入"}
event: done
data: {"message_id": "4"}
注意: 工具调用过程对用户是透明的,用户只看到最终结果。工具执行通知通过 WebSocket 发送。
3.3 错误处理
用户输入: "读取不存在的文件.txt"
SSE 流:
event: start
data: {"message_id": "5"}
event: error
data: {"error": "Tool execution failed: read_file - Error: File not found: 不存在的文件.txt", "type": "ToolExecutionError"}
4. 后端实现
4.1 SSE 生成器
# backend/api/chat.py - send_message()
async def event_stream() -> AsyncIterator[str]:
"""SSE 事件流生成器"""
assistant_content = ""
try:
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'message_id': str(user_message.id)})}\n\n"
# 处理消息并流式输出
async for chunk in agent_loop.process_message(
message=request.message,
session_id=request.session_id,
context=context,
media=request.attachments,
):
assistant_content += chunk
# 发送内容块
yield f"event: message\ndata: {json.dumps({'content': chunk})}\n\n"
# 确保立即发送
await asyncio.sleep(0)
# 保存助手响应到数据库
if assistant_content:
assistant_message = await session_manager.add_message(
session_id=request.session_id,
role="assistant",
content=assistant_content,
)
# 发送完成事件
yield f"event: done\ndata: {json.dumps({'message_id': str(assistant_message.id)})}\n\n"
else:
# 没有内容,发送空完成事件
yield f"event: done\ndata: {json.dumps({'message_id': None})}\n\n"
except Exception as e:
logger.exception(f"Error in event stream: {e}")
# 发送错误事件
error_data = {
"error": str(e),
"type": type(e).__name__,
}
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
4.2 返回 StreamingResponse
# backend/api/chat.py - send_message()
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
4.3 关键 HTTP 头
| 头部 | 值 | 说明 |
|---|---|---|
Content-Type | text/event-stream | 标识 SSE 流 |
Cache-Control | no-cache | 禁用缓存 |
Connection | keep-alive | 保持连接 |
X-Accel-Buffering | no | 禁用 Nginx 缓冲(重要!) |
5. 前端实现
5.1 使用原生 EventSource
// 创建 SSE 连接
const eventSource = new EventSource('/api/chat/send', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
session_id: '550e8400-e29b-41d4-a716-446655440000',
message: '你好',
attachments: null,
}),
});
// 监听 start 事件
eventSource.addEventListener('start', (e) => {
const data = JSON.parse(e.data);
console.log('Message started:', data.message_id);
// 显示加载状态
showLoadingIndicator();
});
// 监听 message 事件
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
// 追加内容到 UI
appendToChat(data.content);
});
// 监听 done 事件
eventSource.addEventListener('done', (e) => {
const data = JSON.parse(e.data);
console.log('Message completed:', data.message_id);
// 隐藏加载状态
hideLoadingIndicator();
// 关闭连接
eventSource.close();
});
// 监听 error 事件
eventSource.addEventListener('error', (e) => {
const data = JSON.parse(e.data);
console.error('Error:', data.error);
// 显示错误消息
showError(data.error);
// 关闭连接
eventSource.close();
});
// 监听连接错误
eventSource.onerror = (e) => {
console.error('Connection error:', e);
eventSource.close();
};
5.2 使用 fetch API(更灵活)
async function sendMessage(sessionId, message) {
const response = await fetch('/api/chat/send', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
session_id: sessionId,
message: message,
attachments: null,
}),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 解码数据
buffer += decoder.decode(value, { stream: true });
// 按行分割
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行
for (const line of lines) {
if (line.startsWith('event:')) {
const eventType = line.substring(6).trim();
continue;
}
if (line.startsWith('data:')) {
const data = JSON.parse(line.substring(5).trim());
// 处理不同事件类型
if (eventType === 'start') {
console.log('Message started:', data.message_id);
showLoadingIndicator();
} else if (eventType === 'message') {
appendToChat(data.content);
} else if (eventType === 'done') {
console.log('Message completed:', data.message_id);
hideLoadingIndicator();
} else if (eventType === 'error') {
console.error('Error:', data.error);
showError(data.error);
}
}
}
}
}
5.3 React 示例
import { useEffect, useState } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}
function ChatComponent() {
const [messages, setMessages] = useState<Message[]>([]);
const [currentMessage, setCurrentMessage] = useState('');
const [isLoading, setIsLoading] = useState(false);
const sendMessage = async (text: string) => {
// 添加用户消息
const userMessage: Message = {
id: Date.now().toString(),
role: 'user',
content: text,
};
setMessages(prev => [...prev, userMessage]);
setIsLoading(true);
// 创建 AI 消息占位符
const aiMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: '',
};
setMessages(prev => [...prev, aiMessage]);
try {
const response = await fetch('/api/chat/send', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: 'your-session-id',
message: text,
attachments: null,
}),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
let eventType = '';
while (true) {
const { done, value } = await reader!.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
const data = JSON.parse(line.substring(5).trim());
if (eventType === 'message') {
// 追加内容到 AI 消息
setMessages(prev => {
const newMessages = [...prev];
const lastMessage = newMessages[newMessages.length - 1];
if (lastMessage.role === 'assistant') {
lastMessage.content += data.content;
}
return newMessages;
});
} else if (eventType === 'done') {
setIsLoading(false);
} else if (eventType === 'error') {
console.error('Error:', data.error);
setIsLoading(false);
}
}
}
}
} catch (error) {
console.error('Failed to send message:', error);
setIsLoading(false);
}
};
return (
<div>
<div className="messages">
{messages.map(msg => (
<div key={msg.id} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
</div>
{isLoading && <div className="loading">AI 正在思考...</div>}
<input
type="text"
value={currentMessage}
onChange={e => setCurrentMessage(e.target.value)}
onKeyPress={e => {
if (e.key === 'Enter' && currentMessage.trim()) {
sendMessage(currentMessage);
setCurrentMessage('');
}
}}
placeholder="输入消息..."
/>
</div>
);
}
6. 调试技巧
6.1 使用 curl 测试
curl -N -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "你好",
"attachments": null
}'
注意: -N 参数禁用缓冲,确保实时输出。
6.2 浏览器开发者工具
- 打开 Network 面板
- 筛选
EventStream类型 - 查看
Messages标签页 - 实时查看 SSE 事件
6.3 常见问题
问题 1: 消息延迟或批量到达
原因: Nginx 或其他代理服务器启用了缓冲
解决方案: 添加 X-Accel-Buffering: no 头部
headers={
"X-Accel-Buffering": "no",
}
Nginx 配置:
location /api/chat/send {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
}
问题 2: 连接自动断开
原因: 超时设置过短
解决方案: 增加超时时间
location /api/chat/send {
proxy_pass http://backend;
proxy_read_timeout 300s;
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
}
问题 3: CORS 错误
原因: 跨域请求未配置
解决方案: 添加 CORS 头部
# backend/app.py
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
7. 性能优化
7.1 批量发送
不要每个 token 都发送一次,可以累积一定数量后再发送:
async def event_stream():
buffer = ""
buffer_size = 5 # 累积 5 个 token 后发送
async for chunk in agent_loop.process_message(...):
buffer += chunk
if len(buffer) >= buffer_size:
yield f"event: message\ndata: {json.dumps({'content': buffer})}\n\n"
buffer = ""
# 发送剩余内容
if buffer:
yield f"event: message\ndata: {json.dumps({'content': buffer})}\n\n"
7.2 压缩
启用 gzip 压缩可以减少传输数据量:
from fastapi.responses import StreamingResponse
import gzip
async def compressed_event_stream():
async for chunk in event_stream():
yield gzip.compress(chunk.encode())
return StreamingResponse(
compressed_event_stream(),
media_type="text/event-stream",
headers={
"Content-Encoding": "gzip",
},
)
8. 安全考虑
8.1 认证
SSE 连接应该包含认证信息:
const eventSource = new EventSource('/api/chat/send', {
headers: {
'Authorization': 'Bearer your-token-here',
},
});
8.2 速率限制
防止滥用:
from fastapi import Request
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@router.post("/send")
@limiter.limit("10/minute")
async def send_message(request: Request, ...):
...
8.3 输入验证
验证所有输入参数:
class SendMessageRequest(BaseModel):
session_id: str = Field(..., min_length=36, max_length=36)
message: str = Field(..., min_length=1, max_length=10000)
attachments: list[str] | None = Field(None, max_items=10)
9. 总结
SSE 流式响应的关键点:
- 事件类型: start, message, done, error
- 数据格式: JSON 字符串
- HTTP 头部: Content-Type, Cache-Control, X-Accel-Buffering
- 错误处理: 捕获异常并发送 error 事件
- 性能优化: 批量发送、压缩
- 安全: 认证、速率限制、输入验证