跳到主要内容

SSE 流式响应格式详解

本文档详细说明 CountBot 的 Server-Sent Events (SSE) 流式响应格式。

1. SSE 基础

1.1 什么是 SSE

Server-Sent Events (SSE) 是一种服务器向客户端推送实时数据的技术:

  • 单向通信: 服务器 → 客户端(客户端不能通过 SSE 连接发送数据)
  • 基于 HTTP: 使用标准 HTTP 协议,无需 WebSocket
  • 自动重连: 浏览器会自动重连断开的连接
  • 事件类型: 支持自定义事件类型

1.2 SSE 消息格式

event: <事件类型>
data: <数据内容>
id: <消息ID>(可选)
retry: <重连间隔>(可选)

注意: 每条消息以两个换行符 \n\n 结束。

2. CountBot 的 SSE 事件类型

2.1 事件类型列表

事件类型说明触发时机
start消息开始用户消息保存到数据库后
message内容片段LLM 返回每个 token 时
done消息完成AI 响应保存到数据库后
error错误信息处理过程中发生错误

2.2 事件数据格式

start 事件

{
"message_id": "1"
}

message 事件

{
"content": "你好"
}

done 事件

{
"message_id": "2"
}

error 事件

{
"error": "Tool execution failed: read_file - File not found",
"type": "ToolExecutionError"
}

3. 完整的 SSE 流示例

3.1 简单对话(无工具调用)

用户输入: "你好"

SSE 流:

event: start
data: {"message_id": "1"}

event: message
data: {"content": "你好"}

event: message
data: {"content": "!"}

event: message
data: {"content": "很高兴"}

event: message
data: {"content": "见到"}

event: message
data: {"content": "你"}

event: message
data: {"content": "!"}

event: message
data: {"content": "有什么"}

event: message
data: {"content": "我可以"}

event: message
data: {"content": "帮助"}

event: message
data: {"content": "你的"}

event: message
data: {"content": "吗"}

event: message
data: {"content": "?"}

event: done
data: {"message_id": "2"}

3.2 工具调用对话

用户输入: "读取 README.md 文件"

SSE 流:

event: start
data: {"message_id": "3"}

event: message
data: {"content": "我已经"}

event: message
data: {"content": "读取了"}

event: message
data: {"content": " README.md "}

event: message
data: {"content": "文件"}

event: message
data: {"content": "。"}

event: message
data: {"content": "这是"}

event: message
data: {"content": " CountBot "}

event: message
data: {"content": "项目的"}

event: message
data: {"content": "说明文档"}

event: message
data: {"content": ","}

event: message
data: {"content": "主要"}

event: message
data: {"content": "介绍了"}

event: message
data: {"content": "以下"}

event: message
data: {"content": "内容"}

event: message
data: {"content": ":\n\n"}

event: message
data: {"content": "1. "}

event: message
data: {"content": "**功能特性**"}

event: message
data: {"content": ":\n"}

event: message
data: {"content": " - 多模型支持"}

event: message
data: {"content": "(OpenAI、Claude、"}

event: message
data: {"content": "国产大模型)\n"}

event: message
data: {"content": " - 工具调用系统\n"}

event: message
data: {"content": " - 记忆管理\n"}

event: message
data: {"content": " - 技能系统\n"}

event: message
data: {"content": " - 多渠道接入"}

event: done
data: {"message_id": "4"}

注意: 工具调用过程对用户是透明的,用户只看到最终结果。工具执行通知通过 WebSocket 发送。

3.3 错误处理

用户输入: "读取不存在的文件.txt"

SSE 流:

event: start
data: {"message_id": "5"}

event: error
data: {"error": "Tool execution failed: read_file - Error: File not found: 不存在的文件.txt", "type": "ToolExecutionError"}

4. 后端实现

4.1 SSE 生成器

# backend/api/chat.py - send_message()
async def event_stream() -> AsyncIterator[str]:
"""SSE 事件流生成器"""
assistant_content = ""

try:
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'message_id': str(user_message.id)})}\n\n"

# 处理消息并流式输出
async for chunk in agent_loop.process_message(
message=request.message,
session_id=request.session_id,
context=context,
media=request.attachments,
):
assistant_content += chunk

# 发送内容块
yield f"event: message\ndata: {json.dumps({'content': chunk})}\n\n"

# 确保立即发送
await asyncio.sleep(0)

# 保存助手响应到数据库
if assistant_content:
assistant_message = await session_manager.add_message(
session_id=request.session_id,
role="assistant",
content=assistant_content,
)

# 发送完成事件
yield f"event: done\ndata: {json.dumps({'message_id': str(assistant_message.id)})}\n\n"
else:
# 没有内容,发送空完成事件
yield f"event: done\ndata: {json.dumps({'message_id': None})}\n\n"

except Exception as e:
logger.exception(f"Error in event stream: {e}")

# 发送错误事件
error_data = {
"error": str(e),
"type": type(e).__name__,
}
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"

4.2 返回 StreamingResponse

# backend/api/chat.py - send_message()
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)

4.3 关键 HTTP 头

头部说明
Content-Typetext/event-stream标识 SSE 流
Cache-Controlno-cache禁用缓存
Connectionkeep-alive保持连接
X-Accel-Bufferingno禁用 Nginx 缓冲(重要!)

5. 前端实现

5.1 使用原生 EventSource

// 创建 SSE 连接
const eventSource = new EventSource('/api/chat/send', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
session_id: '550e8400-e29b-41d4-a716-446655440000',
message: '你好',
attachments: null,
}),
});

// 监听 start 事件
eventSource.addEventListener('start', (e) => {
const data = JSON.parse(e.data);
console.log('Message started:', data.message_id);

// 显示加载状态
showLoadingIndicator();
});

// 监听 message 事件
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);

// 追加内容到 UI
appendToChat(data.content);
});

// 监听 done 事件
eventSource.addEventListener('done', (e) => {
const data = JSON.parse(e.data);
console.log('Message completed:', data.message_id);

// 隐藏加载状态
hideLoadingIndicator();

// 关闭连接
eventSource.close();
});

// 监听 error 事件
eventSource.addEventListener('error', (e) => {
const data = JSON.parse(e.data);
console.error('Error:', data.error);

// 显示错误消息
showError(data.error);

// 关闭连接
eventSource.close();
});

// 监听连接错误
eventSource.onerror = (e) => {
console.error('Connection error:', e);
eventSource.close();
};

5.2 使用 fetch API(更灵活)

async function sendMessage(sessionId, message) {
const response = await fetch('/api/chat/send', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
session_id: sessionId,
message: message,
attachments: null,
}),
});

if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();

if (done) {
break;
}

// 解码数据
buffer += decoder.decode(value, { stream: true });

// 按行分割
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行

for (const line of lines) {
if (line.startsWith('event:')) {
const eventType = line.substring(6).trim();
continue;
}

if (line.startsWith('data:')) {
const data = JSON.parse(line.substring(5).trim());

// 处理不同事件类型
if (eventType === 'start') {
console.log('Message started:', data.message_id);
showLoadingIndicator();
} else if (eventType === 'message') {
appendToChat(data.content);
} else if (eventType === 'done') {
console.log('Message completed:', data.message_id);
hideLoadingIndicator();
} else if (eventType === 'error') {
console.error('Error:', data.error);
showError(data.error);
}
}
}
}
}

5.3 React 示例

import { useEffect, useState } from 'react';

interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
}

function ChatComponent() {
const [messages, setMessages] = useState<Message[]>([]);
const [currentMessage, setCurrentMessage] = useState('');
const [isLoading, setIsLoading] = useState(false);

const sendMessage = async (text: string) => {
// 添加用户消息
const userMessage: Message = {
id: Date.now().toString(),
role: 'user',
content: text,
};
setMessages(prev => [...prev, userMessage]);
setIsLoading(true);

// 创建 AI 消息占位符
const aiMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: '',
};
setMessages(prev => [...prev, aiMessage]);

try {
const response = await fetch('/api/chat/send', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: 'your-session-id',
message: text,
attachments: null,
}),
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
let eventType = '';

while (true) {
const { done, value } = await reader!.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
const data = JSON.parse(line.substring(5).trim());

if (eventType === 'message') {
// 追加内容到 AI 消息
setMessages(prev => {
const newMessages = [...prev];
const lastMessage = newMessages[newMessages.length - 1];
if (lastMessage.role === 'assistant') {
lastMessage.content += data.content;
}
return newMessages;
});
} else if (eventType === 'done') {
setIsLoading(false);
} else if (eventType === 'error') {
console.error('Error:', data.error);
setIsLoading(false);
}
}
}
}
} catch (error) {
console.error('Failed to send message:', error);
setIsLoading(false);
}
};

return (
<div>
<div className="messages">
{messages.map(msg => (
<div key={msg.id} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
</div>

{isLoading && <div className="loading">AI 正在思考...</div>}

<input
type="text"
value={currentMessage}
onChange={e => setCurrentMessage(e.target.value)}
onKeyPress={e => {
if (e.key === 'Enter' && currentMessage.trim()) {
sendMessage(currentMessage);
setCurrentMessage('');
}
}}
placeholder="输入消息..."
/>
</div>
);
}

6. 调试技巧

6.1 使用 curl 测试

curl -N -X POST http://localhost:8000/api/chat/send \
-H "Content-Type: application/json" \
-d '{
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "你好",
"attachments": null
}'

注意: -N 参数禁用缓冲,确保实时输出。

6.2 浏览器开发者工具

  1. 打开 Network 面板
  2. 筛选 EventStream 类型
  3. 查看 Messages 标签页
  4. 实时查看 SSE 事件

6.3 常见问题

问题 1: 消息延迟或批量到达

原因: Nginx 或其他代理服务器启用了缓冲

解决方案: 添加 X-Accel-Buffering: no 头部

headers={
"X-Accel-Buffering": "no",
}

Nginx 配置:

location /api/chat/send {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
}

问题 2: 连接自动断开

原因: 超时设置过短

解决方案: 增加超时时间

location /api/chat/send {
proxy_pass http://backend;
proxy_read_timeout 300s;
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
}

问题 3: CORS 错误

原因: 跨域请求未配置

解决方案: 添加 CORS 头部

# backend/app.py
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

7. 性能优化

7.1 批量发送

不要每个 token 都发送一次,可以累积一定数量后再发送:

async def event_stream():
buffer = ""
buffer_size = 5 # 累积 5 个 token 后发送

async for chunk in agent_loop.process_message(...):
buffer += chunk

if len(buffer) >= buffer_size:
yield f"event: message\ndata: {json.dumps({'content': buffer})}\n\n"
buffer = ""

# 发送剩余内容
if buffer:
yield f"event: message\ndata: {json.dumps({'content': buffer})}\n\n"

7.2 压缩

启用 gzip 压缩可以减少传输数据量:

from fastapi.responses import StreamingResponse
import gzip

async def compressed_event_stream():
async for chunk in event_stream():
yield gzip.compress(chunk.encode())

return StreamingResponse(
compressed_event_stream(),
media_type="text/event-stream",
headers={
"Content-Encoding": "gzip",
},
)

8. 安全考虑

8.1 认证

SSE 连接应该包含认证信息:

const eventSource = new EventSource('/api/chat/send', {
headers: {
'Authorization': 'Bearer your-token-here',
},
});

8.2 速率限制

防止滥用:

from fastapi import Request
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@router.post("/send")
@limiter.limit("10/minute")
async def send_message(request: Request, ...):
...

8.3 输入验证

验证所有输入参数:

class SendMessageRequest(BaseModel):
session_id: str = Field(..., min_length=36, max_length=36)
message: str = Field(..., min_length=1, max_length=10000)
attachments: list[str] | None = Field(None, max_items=10)

9. 总结

SSE 流式响应的关键点:

  1. 事件类型: start, message, done, error
  2. 数据格式: JSON 字符串
  3. HTTP 头部: Content-Type, Cache-Control, X-Accel-Buffering
  4. 错误处理: 捕获异常并发送 error 事件
  5. 性能优化: 批量发送、压缩
  6. 安全: 认证、速率限制、输入验证