WebSocket 实时通信
CountBot 的 WebSocket 子系统,提供实时流式响应、工具执行通知和任务状态推送。
目录
设计理念
- 会话绑定 — 每个 WebSocket 连接绑定到一个会话,只接收该会话的消息
- 多连接支持 — 同一会话可有多个连接(多标签页),消息广播到所有连接
- 流式优先 — LLM 响应逐 chunk 推送,前端实时渲染
- 结构化消息 — 所有消息使用 JSON 格式,带
type字段区分类型 - 优雅降级 — 连接断开时自动清理,不影响后台任务
架构概览
┌─────────────────────────────────────────────────────────┐
│ 前端 (Vue 3) │
│ │
│ WebSocket("ws://localhost:8000/ws/chat") │
│ │ │
│ │ subscribe / message / cancel │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ ConnectionManager │ │
│ │ │ │
│ │ connections: dict[conn_id, WebSocket] │ │
│ │ session_connections: dict[session_id, set[id]] │ │
│ └──────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ route_event() │ │
│ │ │ │
│ │ subscribe → bind_session() │ │
│ │ message → handle_message_event() │ │
│ │ cancel → cancel_session() │ │
│ │ ping → handle_ping_event() │ │
│ └──────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ AgentLoop │ │
│ │ │ │
│ │ process_message() → yield chunks │ │
│ │ │ │ │
│ │ ├─ send_message_chunk() → 流式内容 │ │
│ │ ├─ send_tool_call() → 工具开始 │ │
│ │ └─ send_tool_result() → 工具结果 │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
连接管理
文件: backend/ws/connection.py
ConnectionManager
管理所有 WebSocket 连接的生命周期。
class ConnectionManager:
connections: dict[str, WebSocket] # conn_id → WebSocket
session_connections: dict[str, set[str]] # session_id → {conn_ids}
连接流程
客户端连接 ws://localhost:8000/ws/chat
│
├─ ConnectionManager.connect(websocket)
│ └─ 分配 connection_id (UUID)
│
├─ 客户端发送 subscribe 事件
│ └─ ConnectionManager.bind_session(conn_id, session_id)
│
├─ 正常通信...
│
└─ 断开连接
└─ ConnectionManager.disconnect(conn_id)
└─ 清理 session_connections 中的连接映射
消息发送
# 发送到指定连接
await manager.send_message(conn_id, message)
# 发送到会话的所有连接
count = await manager.send_to_session(session_id, message)
# 广播到所有连接
count = await manager.broadcast(message)
消息协议
所有消息使用 JSON 格式。
客户端消息
subscribe — 订阅会话
{
"type": "subscribe",
"sessionId": "uuid"
}
message — 发送消息
{
"type": "message",
"sessionId": "uuid",
"content": "你好"
}
cancel — 取消执 行
{
"type": "cancel",
"sessionId": "uuid"
}
unsubscribe — 取消订阅
{
"type": "unsubscribe",
"sessionId": "uuid"
}
ping — 心跳
{
"type": "ping"
}
服务端消息
message_chunk — 流式内容
{
"type": "message_chunk",
"content": "你好!"
}
tool_call — 工具调用开始
{
"type": "tool_call",
"tool": "exec",
"arguments": {"command": "ls -la"},
"messageId": 2
}
tool_result — 工具执行结果
{
"type": "tool_result",
"tool": "exec",
"result": "total 48\ndrwxr-xr-x ...",
"duration": 0.15,
"messageId": 3
}
message_complete — 消息完成
{
"type": "message_complete",
"messageId": "uuid"
}
error — 错误
{
"type": "error",
"message": "处理消息时出错",
"code": "PROCESSING_ERROR"
}