跳到主要内容

WebSocket 通知格式详解

本文档详细说明 CountBot 的 WebSocket 实时通知系统。

1. WebSocket 概述

1.1 为什么需要 WebSocket

CountBot 使用双通道通信:

  • SSE (Server-Sent Events): 传输 AI 响应内容(单向:服务器 → 客户端)
  • WebSocket: 传输实时通知(双向:服务器 ↔ 客户端)

WebSocket 用于:

  • 工具调用通知(开始、完成、错误)
  • 子代理任务进度更新
  • 系统状态通知
  • 错误警告

1.2 连接地址

ws://localhost:8000/ws/{session_id}

或使用 SSL:

wss://your-domain.com/ws/{session_id}

2. 消息格式

2.1 基础消息结构

所有 WebSocket 消息都是 JSON 格式:

{
"type": "消息类型",
"timestamp": 1705315800.123,
...其他字段
}

2.2 消息类型列表

类型说明方向
tool_call工具调用开始服务器 → 客户端
tool_result工具执行结果服务器 → 客户端
tool_start工具开始执行(详细)服务器 → 客户端
tool_complete工具执行完成(详细)服务器 → 客户端
tool_error工具执行错误(详细)服务器 → 客户端
tool_progress工具执行进度服务器 → 客户端
subagent_start子代理任务开始服务器 → 客户端
subagent_progress子代理任务进度服务器 → 客户端
subagent_complete子代理任务完成服务器 → 客户端
subagent_error子代理任务错误服务器 → 客户端
error通用错误服务器 → 客户端
ping心跳检测服务器 → 客户端
pong心跳响应客户端 → 服务器

3. 工具调用通知

3.1 tool_call(工具调用开始)

当 AI 决定调用工具时发送:

{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}

3.2 tool_result(工具执行结果)

工具执行完成后发送:

{
"type": "tool_result",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot\n 2| \n 3| CountBot 是一个基于 LLM 的智能助手框架...",
"timestamp": 1705315800.456
}

3.3 tool_start(工具开始执行 - 详细版)

{
"type": "tool_start",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}

3.4 tool_complete(工具执行完成 - 详细版)

{
"type": "tool_complete",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot...",
"duration_ms": 45.67
}

3.5 tool_error(工具执行错误)

{
"type": "tool_error",
"tool": "read_file",
"error": "Error: File not found: non-existent.txt",
"duration_ms": 12.34
}

3.6 tool_progress(工具执行进度)

用于长时间运行的工具(如文件上传、批量处理):

{
"type": "tool_progress",
"tool": "batch_process",
"progress": 45,
"message": "Processing file 45 of 100"
}

4. 子代理任务通知

4.1 subagent_start(子代理任务开始)

{
"type": "subagent_start",
"task_id": "task_abc123xyz",
"label": "数据分析",
"message": "分析用户行为数据并生成报告",
"timestamp": 1705315900.123
}

4.2 subagent_progress(子代理任务进度)

{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在读取数据文件...",
"timestamp": 1705315905.456
}

4.3 subagent_complete(子代理任务完成)

{
"type": "subagent_complete",
"task_id": "task_abc123xyz",
"result": "数据分析完成。共处理 10,000 条记录,发现 3 个关键趋势...",
"timestamp": 1705315920.789
}

4.4 subagent_error(子代理任务错误)

{
"type": "subagent_error",
"task_id": "task_abc123xyz",
"error": "数据文件格式错误:缺少必需的 'timestamp' 列",
"timestamp": 1705315910.123
}

5. 通用通知

5.1 error(通用错误)

{
"type": "error",
"error": "Session not found: invalid-session-id",
"code": "SESSION_NOT_FOUND",
"timestamp": 1705315800.123
}

5.2 ping(心跳检测)

服务器定期发送心跳:

{
"type": "ping",
"timestamp": 1705315800.123
}

客户端应响应 pong:

{
"type": "pong",
"timestamp": 1705315800.456
}

6. 完整示例场景

6.1 场景:读取文件

用户输入: "读取 README.md 文件"

WebSocket 消息序列:

// 1. 工具调用开始
{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "README.md",
"show_line_numbers": true
},
"timestamp": 1705315800.123
}

// 2. 工具执行结果
{
"type": "tool_result",
"tool": "read_file",
"result": "[File: README.md | Lines: 150]\n 1| # CountBot\n 2| \n 3| CountBot 是一个基于 LLM 的智能助手框架...",
"timestamp": 1705315800.456
}

6.2 场景:调用子代理

用户输入: "帮我分析这个数据文件"

WebSocket 消息序列:

// 1. 工具调用开始(spawn 工具)
{
"type": "tool_call",
"tool": "spawn",
"arguments": {
"label": "数据分析",
"message": "分析 data.csv 文件并生成报告"
},
"timestamp": 1705315900.123
}

// 2. 子代理任务开始
{
"type": "subagent_start",
"task_id": "task_abc123xyz",
"label": "数据分析",
"message": "分析 data.csv 文件并生成报告",
"timestamp": 1705315900.456
}

// 3. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在读取数据文件...",
"timestamp": 1705315905.789
}

// 4. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在分析数据...",
"timestamp": 1705315910.123
}

// 5. 子代理进度更新
{
"type": "subagent_progress",
"task_id": "task_abc123xyz",
"progress": "正在生成报告...",
"timestamp": 1705315915.456
}

// 6. 子代理任务完成
{
"type": "subagent_complete",
"task_id": "task_abc123xyz",
"result": "数据分析完成。共处理 10,000 条记录,发现 3 个关键趋势:\n1. 用户活跃度在周末显著提升\n2. 移动端访问占比达到 65%\n3. 平均会话时长为 8.5 分钟",
"timestamp": 1705315920.789
}

// 7. 工具执行结果(spawn 工具)
{
"type": "tool_result",
"tool": "spawn",
"result": "子 Agent [数据分析] 已完成 (ID: task_abc123xyz)。结果:数据分析完成。共处理 10,000 条记录...",
"timestamp": 1705315920.890
}

6.3 场景:工具执行错误

用户输入: "读取不存在的文件.txt"

WebSocket 消息序列:

// 1. 工具调用开始
{
"type": "tool_call",
"tool": "read_file",
"arguments": {
"path": "不存在的文件.txt"
},
"timestamp": 1705316000.123
}

// 2. 工具执行错误
{
"type": "tool_error",
"tool": "read_file",
"error": "Error: File not found: 不存在的文件.txt",
"duration_ms": 12.34
}

7. 后端实现

7.1 连接管理

# backend/ws/connection.py
from fastapi import WebSocket
from typing import Dict, Set

class ConnectionManager:
"""WebSocket 连接管理器"""

def __init__(self):
# session_id -> Set[WebSocket]
self.active_connections: Dict[str, Set[WebSocket]] = {}

async def connect(self, websocket: WebSocket, session_id: str):
"""接受新连接"""
await websocket.accept()

if session_id not in self.active_connections:
self.active_connections[session_id] = set()

self.active_connections[session_id].add(websocket)
logger.info(f"WebSocket connected: {session_id}")

def disconnect(self, websocket: WebSocket, session_id: str):
"""断开连接"""
if session_id in self.active_connections:
self.active_connections[session_id].discard(websocket)

if not self.active_connections[session_id]:
del self.active_connections[session_id]

logger.info(f"WebSocket disconnected: {session_id}")

async def send_to_session(self, session_id: str, message: dict):
"""发送消息到指定会话的所有连接"""
if session_id not in self.active_connections:
return

disconnected = set()

for connection in self.active_connections[session_id]:
try:
await connection.send_json(message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
disconnected.add(connection)

# 清理断开的连接
for connection in disconnected:
self.disconnect(connection, session_id)

# 全局连接管理器
connection_manager = ConnectionManager()

7.2 WebSocket 端点

# backend/app.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect

router = APIRouter()

@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
"""WebSocket 连接端点"""
await connection_manager.connect(websocket, session_id)

try:
while True:
# 接收客户端消息
data = await websocket.receive_json()

# 处理 pong 响应
if data.get("type") == "pong":
logger.debug(f"Received pong from {session_id}")
continue

# 处理其他消息类型...

except WebSocketDisconnect:
connection_manager.disconnect(websocket, session_id)
except Exception as e:
logger.error(f"WebSocket error: {e}")
connection_manager.disconnect(websocket, session_id)

7.3 发送通知

# backend/ws/tool_notifications.py
async def notify_tool_execution(
session_id: str,
tool_name: str,
arguments: dict,
result: str | None = None,
error: str | None = None,
):
"""发送工具执行通知"""
if error:
# 错误通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_error",
"tool": tool_name,
"error": error,
}
)
elif result:
# 结果通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_result",
"tool": tool_name,
"result": result,
}
)
else:
# 开始通知
await connection_manager.send_to_session(
session_id,
{
"type": "tool_call",
"tool": tool_name,
"arguments": arguments,
}
)

8. 前端实现

8.1 原生 WebSocket

// 创建 WebSocket 连接
const ws = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);

// 连接打开
ws.onopen = () => {
console.log('WebSocket connected');
};

// 接收消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'tool_call':
console.log(`Tool called: ${data.tool}`, data.arguments);
showToolNotification(data.tool, 'started');
break;

case 'tool_result':
console.log(`Tool result: ${data.tool}`, data.result);
showToolNotification(data.tool, 'completed');
break;

case 'tool_error':
console.error(`Tool error: ${data.tool}`, data.error);
showToolNotification(data.tool, 'error', data.error);
break;

case 'subagent_start':
console.log(`Subagent started: ${data.task_id}`, data.label);
showSubagentNotification(data.task_id, 'started', data.label);
break;

case 'subagent_progress':
console.log(`Subagent progress: ${data.task_id}`, data.progress);
updateSubagentProgress(data.task_id, data.progress);
break;

case 'subagent_complete':
console.log(`Subagent completed: ${data.task_id}`, data.result);
showSubagentNotification(data.task_id, 'completed', data.result);
break;

case 'subagent_error':
console.error(`Subagent error: ${data.task_id}`, data.error);
showSubagentNotification(data.task_id, 'error', data.error);
break;

case 'ping':
// 响应心跳
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() / 1000 }));
break;

case 'error':
console.error('WebSocket error:', data.error);
showError(data.error);
break;

default:
console.warn('Unknown message type:', data.type);
}
};

// 连接关闭
ws.onclose = () => {
console.log('WebSocket disconnected');
// 尝试重连
setTimeout(() => {
reconnect();
}, 3000);
};

// 连接错误
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};

8.2 React Hook

import { useEffect, useRef, useState } from 'react';

interface ToolNotification {
tool: string;
status: 'started' | 'completed' | 'error';
message?: string;
}

interface SubagentNotification {
taskId: string;
status: 'started' | 'progress' | 'completed' | 'error';
message: string;
}

function useWebSocket(sessionId: string) {
const ws = useRef<WebSocket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [toolNotifications, setToolNotifications] = useState<ToolNotification[]>([]);
const [subagentNotifications, setSubagentNotifications] = useState<SubagentNotification[]>([]);

useEffect(() => {
// 创建连接
ws.current = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);

ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
};

ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'tool_call':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'started' }
]);
break;

case 'tool_result':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'completed', message: data.result }
]);
break;

case 'tool_error':
setToolNotifications(prev => [
...prev,
{ tool: data.tool, status: 'error', message: data.error }
]);
break;

case 'subagent_start':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'started', message: data.label }
]);
break;

case 'subagent_progress':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'progress', message: data.progress }
]);
break;

case 'subagent_complete':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'completed', message: data.result }
]);
break;

case 'subagent_error':
setSubagentNotifications(prev => [
...prev,
{ taskId: data.task_id, status: 'error', message: data.error }
]);
break;

case 'ping':
ws.current?.send(JSON.stringify({ type: 'pong', timestamp: Date.now() / 1000 }));
break;
}
};

ws.current.onclose = () => {
console.log('WebSocket disconnected');
setIsConnected(false);
};

ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
};

// 清理
return () => {
ws.current?.close();
};
}, [sessionId]);

return {
isConnected,
toolNotifications,
subagentNotifications,
};
}

// 使用示例
function ChatComponent() {
const { isConnected, toolNotifications, subagentNotifications } = useWebSocket('your-session-id');

return (
<div>
<div>连接状态: {isConnected ? '已连接' : '未连接'}</div>

<div>
<h3>工具调用</h3>
{toolNotifications.map((notif, i) => (
<div key={i}>
{notif.tool}: {notif.status}
{notif.message && ` - ${notif.message}`}
</div>
))}
</div>

<div>
<h3>子代理任务</h3>
{subagentNotifications.map((notif, i) => (
<div key={i}>
{notif.taskId}: {notif.status} - {notif.message}
</div>
))}
</div>
</div>
);
}

9. 调试技巧

9.1 使用 wscat 测试

# 安装 wscat
npm install -g wscat

# 连接到 WebSocket
wscat -c ws://localhost:8000/ws/550e8400-e29b-41d4-a716-446655440000

# 发送 pong 响应
> {"type": "pong", "timestamp": 1705315800.123}

9.2 浏览器开发者工具

  1. 打开 Network 面板
  2. 筛选 WS 类型
  3. 查看 Messages 标签页
  4. 实时查看 WebSocket 消息

10. 安全考虑

10.1 认证

WebSocket 连接应该验证 session_id:

@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
# 验证 session_id
session = await session_manager.get_session(session_id)
if not session:
await websocket.close(code=1008, reason="Invalid session")
return

await connection_manager.connect(websocket, session_id)
...

10.2 速率限制

防止消息洪水攻击:

from collections import defaultdict
import time

class RateLimiter:
def __init__(self, max_messages: int = 100, window: int = 60):
self.max_messages = max_messages
self.window = window
self.messages = defaultdict(list)

def is_allowed(self, session_id: str) -> bool:
now = time.time()
# 清理过期消息
self.messages[session_id] = [
ts for ts in self.messages[session_id]
if now - ts < self.window
]

if len(self.messages[session_id]) >= self.max_messages:
return False

self.messages[session_id].append(now)
return True

11. 总结

WebSocket 通知系统的关键点:

  1. 双向通信: 支持服务器推送和客户端响应
  2. 实时性: 工具调用和子代理任务的实时进度
  3. 连接管理: 支持多个客户端连接到同一会话
  4. 心跳机制: 保持连接活跃,及时发现断线
  5. 错误处理: 优雅处理连接错误和消息发送失败
  6. 安全: 认证、速率限制、输入验证