WebSocket 通知格式详解
本文档详细说明 CountBot 的 WebSocket 实时通知系统。
1. WebSocket 概述
1.1 为什么需要 WebSocket
CountBot 使用双通道通信:
- SSE (Server-Sent Events): 传输 AI 响应内容(单向:服务器 → 客户端)
- WebSocket: 传输实时通知(双向:服务器 ↔ 客户端)
WebSocket 用于:
- 工具调用通知(开始、完成、错误)
- 子代理任务进度更新
- 系统状态通知
- 错误警告
1.2 连接地址
ws://localhost:8000/ws/{session_id}
或使用 SSL:
wss://your-domain.com/ws/{session_id}
2. 消息格式
2.1 基础消息结构
所有 WebSocket 消息都是 JSON 格式:
{
"type": "消息类型",
"timestamp": 1705315800.123,
...其他字段
}
2.2 消息类型列表
| 类型 | 说明 | 方向 |
|---|---|---|
tool_call | 工具调用开始 | 服务器 → 客户端 |
tool_result | 工具执行结果 | 服务器 → 客户端 |
tool_start | 工具开始执行(详细) | 服务器 → 客户端 |
tool_complete | 工具执行完成(详细) | 服务器 → 客户端 |
tool_error | 工具执行错误(详细) | 服务器 → 客户端 |
tool_progress | 工具执行进度 | 服务器 → 客户端 |
subagent_start | 子代理任务开始 | 服务器 → 客户端 |
subagent_progress | 子代理任务进度 | 服务器 → 客户端 |
subagent_complete | 子代理任务完成 | 服务器 → 客户端 |
subagent_error | 子代理任务错误 | 服务器 → 客户端 |
error | 通用错误 | 服务器 → 客户端 |
ping | 心跳检测 | 服务器 → 客户端 |
pong | 心跳响应 | 客户端 → 服务器 |
3. 工具调用通知
3.1 tool_call(工具调用开始)
当 AI 决定调用工具时发送:
{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}
3.2 tool_result(工具执行结果)
工具执行完成后发送:
{
"type": "tool_result",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot\n 2| \n 3| CountBot 是一个基于 LLM 的智能助手框架...",
"timestamp": 1705315800.456
}
3.3 tool_start(工具开始执行 - 详细版)
{
"type": "tool_start",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}
3.4 tool_complete(工具执行完成 - 详细版)
{
"type": "tool_complete",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot...",
"duration_ms": 45.67
}
3.5 tool_error(工具执行错误)
{
"type": "tool_error",
"tool": "read_file",
"error": "Error: File not found: non-existent.txt",
"duration_ms": 12.34
}
3.6 tool_progress(工具执行进度)
用于长时间运行的工具(如文件上传、批量处理):
{
"type": "tool_progress",
"tool": "batch_process",
"progress": 45,
"message": "Processing file 45 of 100"
}
4. 子代理任务通知
4.1 subagent_start(子代理任务开始)
{
"type": "subagent_start",
"task_id": "task_abc123xyz",
"label": "数据分析",
"message": "分析用户行为数据并生成报告",
"timestamp": 1705315900.123
}
4.2 subagent_progress(子代理任务进度)
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在读取数据文件...",
"timestamp": 1705315905.456
}
4.3 subagent_complete(子代理任务完成)
{
"type": "subagent_complete",
"task_id": "task_abc123xyz",
"result": "数据分析完成。共处理 10,000 条记录,发现 3 个关键趋势...",
"timestamp": 1705315920.789
}
4.4 subagent_error(子代理任务错误)
{
"type": "subagent_error",
"task_id": "task_abc123xyz",
"error": "数据文 件格式错误:缺少必需的 'timestamp' 列",
"timestamp": 1705315910.123
}
5. 通用通知
5.1 error(通用错误)
{
"type": "error",
"error": "Session not found: invalid-session-id",
"code": "SESSION_NOT_FOUND",
"timestamp": 1705315800.123
}
5.2 ping(心跳检测)
服务器定期发送心跳:
{
"type": "ping",
"timestamp": 1705315800.123
}
客户端应响应 pong:
{
"type": "pong",
"timestamp": 1705315800.456
}
6. 完整示例场景
6.1 场景:读取文件
用户输入: "读取 README.md 文件"
WebSocket 消息序列:
// 1. 工具调用开始
{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}
// 2. 工具执行结果
{
"type": "tool_result",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot\n 2| \n 3| CountBot 是一个基于 LLM 的智能助手框架...",
"timestamp": 1705315800.456
}
6.2 场景:调用子代理
用户输入: "帮我分析这个数据文件"
WebSocket 消息序列:
// 1. 工具调用开始(spawn 工具)
{
"type": "tool_call",
"tool": "spawn",
"arguments": {
"label": "数据分析",
"message": "分析 data.csv 文件并生成报告"
},
"timestamp": 1705315900.123
}
// 2. 子代理任务开始
{
"type": "subagent_start",
"task_id": "task_abc123xyz",
"label": "数据分析",
"message": "分析 data.csv 文件并生成报告",
"timestamp": 1705315900.456
}
// 3. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在读取数据文件...",
"timestamp": 1705315905.789
}
// 4. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在分析数据...",
"timestamp": 1705315910.123
}
// 5. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在生成报告...",
"timestamp": 1705315915.456
}
// 6. 子代理任务完成
{
"type": "subagent_complete",
"task_id": "task_abc123xyz",
"result": "数据分析完成。共处理 10,000 条记录,发现 3 个关键趋势:\n1. 用户活跃度在周末显著提升\n2. 移动端访问占比达到 65%\n3. 平均会话时长为 8.5 分钟",
"timestamp": 1705315920.789
}
// 7. 工具执行结果(spawn 工具)
{
"type": "tool_result",
"tool": "spawn",
"result": "子 Agent [数据分析] 已完成 (ID: task_abc123xyz)。结果:数据分析完成。共 处理 10,000 条记录...",
"timestamp": 1705315920.890
}
6.3 场景:工具执行错误
用户输入: "读取不存在的文件.txt"
WebSocket 消息序列:
// 1. 工具调用开始
{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "不存在的文件.txt"
},
"timestamp": 1705316000.123
}
// 2. 工具执行错误
{
"type": "tool_error",
"tool": "read_file",
"error": "Error: File not found: 不存在的文件.txt",
"duration_ms": 12.34
}
7. 后端实现
7.1 连接管理
# backend/ws/connection.py
from fastapi import WebSocket
from typing import Dict, Set
class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self):
# session_id -> Set[WebSocket]
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, session_id: str):
"""接受新连接"""
await websocket.accept()
if session_id not in self.active_connections:
self.active_connections[session_id] = set()
self.active_connections[session_id].add(websocket)
logger.info(f"WebSocket connected: {session_id}")
def disconnect(self, websocket: WebSocket, session_id: str):
"""断开连接"""
if session_id in self.active_connections:
self.active_connections[session_id].discard(websocket)
if not self.active_connections[session_id]:
del self.active_connections[session_id]
logger.info(f"WebSocket disconnected: {session_id}")
async def send_to_session(self, session_id: str, message: dict):
"""发送消息到指定会话的所有连接"""
if session_id not in self.active_connections:
return
disconnected = set()
for connection in self.active_connections[session_id]:
try:
await connection.send_json(message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
disconnected.add(connection)
# 清理断开的连接
for connection in disconnected:
self.disconnect(connection, session_id)
# 全局连接管理器
connection_manager = ConnectionManager()
7.2 WebSocket 端点
# backend/app.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
router = APIRouter()
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
"""WebSocket 连接端点"""
await connection_manager.connect(websocket, session_id)
try:
while True:
# 接收客户端消息
data = await websocket.receive_json()
# 处理 pong 响应
if data.get("type") == "pong":
logger.debug(f"Received pong from {session_id}")
continue
# 处理其他消息类型...
except WebSocketDisconnect:
connection_manager.disconnect(websocket, session_id)
except Exception as e:
logger.error(f"WebSocket error: {e}")
connection_manager.disconnect(websocket, session_id)
7.3 发送通知
# backend/ws/tool_notifications.py
async def notify_tool_execution(
session_id: str,
tool_name: str,
arguments: dict,
result: str | None = None,
error: str | None = None,
):
"""发送工具执行通知"""
if error:
# 错误通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_error",
"tool": tool_name,
"error": error,
}
)
elif result:
# 结果通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_result",
"tool": tool_name,
"result": result,
}
)
else:
# 开始通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_call",
"tool": tool_name,
"arguments": arguments,
}
)
8. 前端实现
8.1 原生 WebSocket
// 创建 WebSocket 连接
const ws = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);
// 连接打开
ws.onopen = () => {
console.log('WebSocket connected');
};
// 接收消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'tool_call':
console.log(`Tool called: ${data.tool}`, data.arguments);
showToolNotification(data.tool, 'started');
break;
case 'tool_result':
console.log(`Tool result: ${data.tool}`, data.result);
showToolNotification(data.tool, 'completed');
break;
case 'tool_error':
console.error(`Tool error: ${data.tool}`, data.error);
showToolNotification(data.tool, 'error', data.error);
break;
case 'subagent_start':
console.log(`Subagent started: ${data.task_id}`, data.label);
showSubagentNotification(data.task_id, 'started', data.label);
break;
case 'subagent_progress':
console.log(`Subagent progress: ${data.task_id}`, data.progress);
updateSubagentProgress(data.task_id, data.progress);
break;
case 'subagent_complete':
console.log(`Subagent completed: ${data.task_id}`, data.result);
showSubagentNotification(data.task_id, 'completed', data.result);
break;
case 'subagent_error':
console.error(`Subagent error: ${data.task_id}`, data.error);
showSubagentNotification(data.task_id, 'error', data.error);
break;
case 'ping':
// 响应心跳
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() / 1000 }));
break;
case 'error':
console.error('WebSocket error:', data.error);
showError(data.error);
break;
default:
console.warn('Unknown message type:', data.type);
}
};
// 连接关闭
ws.onclose = () => {
console.log('WebSocket disconnected');
// 尝试重连
setTimeout(() => {
reconnect();
}, 3000);
};
// 连接错误
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
8.2 React Hook
import { useEffect, useRef, useState } from 'react';
interface ToolNotification {
tool: string;
status: 'started' | 'completed' | 'error';
message?: string;
}
interface SubagentNotification {
taskId: string;
status: 'started' | 'progress' | 'completed' | 'error';
message: string;
}
function useWebSocket(sessionId: string) {
const ws = useRef<WebSocket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [toolNotifications, setToolNotifications] = useState<ToolNotification[]>([]);
const [subagentNotifications, setSubagentNotifications] = useState<SubagentNotification[]>([]);
useEffect(() => {
// 创建连接
ws.current = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);
ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
};
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'tool_call':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'started' }
]);
break;
case 'tool_result':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'completed', message: data.result }
]);
break;
case 'tool_error':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'error', message: data.error }
]);
break;
case 'subagent_start':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'started', message: data.label }
]);
break;
case 'subagent_progress':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'progress', message: data.progress }
]);
break;
case 'subagent_complete':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'completed', message: data.result }
]);
break;
case 'subagent_error':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'error', message: data.error }
]);
break;
case 'ping':
ws.current?.send(JSON.stringify({ type: 'pong', timestamp: Date.now() / 1000 }));
break;
}
};
ws.current.onclose = () => {
console.log('WebSocket disconnected');
setIsConnected(false);
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
};
// 清理
return () => {
ws.current?.close();
};
}, [sessionId]);
return {
isConnected,
toolNotifications,
subagentNotifications,
};
}
// 使用示例
function ChatComponent() {
const { isConnected, toolNotifications, subagentNotifications } = useWebSocket('your-session-id');
return (
<div>
<div>连接状态: {isConnected ? '已连接' : '未连接'}</div>
<div>
<h3>工具调用</h3>
{toolNotifications.map((notif, i) => (
<div key={i}>
{notif.tool}: {notif.status}
{notif.message && ` - ${notif.message}`}
</div>
))}
</div>
<div>
<h3>子代理任务</h3>
{subagentNotifications.map((notif, i) => (
<div key={i}>
{notif.taskId}: {notif.status} - {notif.message}
</div>
))}
</div>
</div>
);
}
9. 调试技巧
9.1 使用 wscat 测试
# 安装 wscat
npm install -g wscat
# 连接到 WebSocket
wscat -c ws://localhost:8000/ws/550e8400-e29b-41d4-a716-446655440000
# 发送 pong 响应
> {"type": "pong", "timestamp": 1705315800.123}
9.2 浏览器开发者工具
- 打开 Network 面板
- 筛选
WS类型 - 查看
Messages标签页 - 实时查看 WebSocket 消息
10. 安全考虑
10.1 认证
WebSocket 连接应该验证 session_id:
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
# 验证 session_id
session = await session_manager.get_session(session_id)
if not session:
await websocket.close(code=1008, reason="Invalid session")
return
await connection_manager.connect(websocket, session_id)
...
10.2 速率限制
防止消息洪水攻击:
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_messages: int = 100, window: int = 60):
self.max_messages = max_messages
self.window = window
self.messages = defaultdict(list)
def is_allowed(self, session_id: str) -> bool:
now = time.time()
# 清理过期消息
self.messages[session_id] = [
ts for ts in self.messages[session_id]
if now - ts < self.window
]
if len(self.messages[session_id]) >= self.max_messages:
return False
self.messages[session_id].append(now)
return True
11. 总结
WebSocket 通知系统的关键点:
- 双向通信: 支持服务器推送和客户端响应
- 实时性: 工具调用和子代理任务的实时进度
- 连接管理: 支持多个客户端连接到同一会话
- 心跳机制: 保持连接活跃,及时发现断线
- 错误处理: 优雅处理连接错误和消息发送失败
- 安全: 认证、速率限制、输入验证