跳到主要内容

WebSocket 实时通信

CountBot 的 WebSocket 子系统,提供实时流式响应、工具执行通知和任务状态推送。

目录

设计理念

  1. 会话绑定 — 每个 WebSocket 连接绑定到一个会话,只接收该会话的消息
  2. 多连接支持 — 同一会话可有多个连接(多标签页),消息广播到所有连接
  3. 流式优先 — LLM 响应逐 chunk 推送,前端实时渲染
  4. 结构化消息 — 所有消息使用 JSON 格式,带 type 字段区分类型
  5. 优雅降级 — 连接断开时自动清理,不影响后台任务

架构概览

┌─────────────────────────────────────────────────────────┐
│ 前端 (Vue 3) │
│ │
│ WebSocket("ws://localhost:8000/ws/chat") │
│ │ │
│ │ subscribe / message / cancel │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ ConnectionManager │ │
│ │ │ │
│ │ connections: dict[conn_id, WebSocket] │ │
│ │ session_connections: dict[session_id, set[id]] │ │
│ └──────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ route_event() │ │
│ │ │ │
│ │ subscribe → bind_session() │ │
│ │ message → handle_message_event() │ │
│ │ cancel → cancel_session() │ │
│ │ ping → handle_ping_event() │ │
│ └──────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ AgentLoop │ │
│ │ │ │
│ │ process_message() → yield chunks │ │
│ │ │ │ │
│ │ ├─ send_message_chunk() → 流式内容 │ │
│ │ ├─ send_tool_call() → 工具开始 │ │
│ │ └─ send_tool_result() → 工具结果 │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

连接管理

文件: backend/ws/connection.py

ConnectionManager

管理所有 WebSocket 连接的生命周期。

class ConnectionManager:
connections: dict[str, WebSocket] # conn_id → WebSocket
session_connections: dict[str, set[str]] # session_id → {conn_ids}

连接流程

客户端连接 ws://localhost:8000/ws/chat

├─ ConnectionManager.connect(websocket)
│ └─ 分配 connection_id (UUID)

├─ 客户端发送 subscribe 事件
│ └─ ConnectionManager.bind_session(conn_id, session_id)

├─ 正常通信...

└─ 断开连接
└─ ConnectionManager.disconnect(conn_id)
└─ 清理 session_connections 中的连接映射

消息发送

# 发送到指定连接
await manager.send_message(conn_id, message)

# 发送到会话的所有连接
count = await manager.send_to_session(session_id, message)

# 广播到所有连接
count = await manager.broadcast(message)

消息协议

所有消息使用 JSON 格式。

客户端消息

subscribe — 订阅会话

{
"type": "subscribe",
"sessionId": "uuid"
}

message — 发送消息

{
"type": "message",
"sessionId": "uuid",
"content": "你好"
}

cancel — 取消执行

{
"type": "cancel",
"sessionId": "uuid"
}

unsubscribe — 取消订阅

{
"type": "unsubscribe",
"sessionId": "uuid"
}

ping — 心跳

{
"type": "ping"
}

服务端消息

message_chunk — 流式内容

{
"type": "message_chunk",
"content": "你好!"
}

tool_call — 工具调用开始

{
"type": "tool_call",
"tool": "exec",
"arguments": {"command": "ls -la"},
"messageId": 2
}

tool_result — 工具执行结果

{
"type": "tool_result",
"tool": "exec",
"result": "total 48\ndrwxr-xr-x ...",
"duration": 0.15,
"messageId": 3
}

message_complete — 消息完成

{
"type": "message_complete",
"messageId": "uuid"
}

error — 错误

{
"type": "error",
"message": "处理消息时出错",
"code": "PROCESSING_ERROR"
}

task_status — 任务状态更新

{
"type": "task_status",
"taskId": "uuid",
"status": "completed",
"progress": 100,
"message": "任务已完成"
}

事件路由

文件: backend/ws/events.py

route_event() 根据消息 type 分发到对应处理函数:

type处理函数说明
subscribehandle_subscribe_event()绑定会话
unsubscribehandle_unsubscribe_event()当前仅记录日志,未真正解除绑定
messagehandle_message_event()处理用户消息
cancelcancel_session()取消执行
pinghandle_ping_event()心跳响应

handle_message_event 流程

handle_message_event(data, connection_id)

├─ 获取 session_id, content
├─ 保存用户消息到数据库
├─ 获取会话历史
├─ 创建 CancelToken

├─ AgentLoop.process_message(...)
│ │
│ ├─ async for chunk:
│ │ └─ send_message_chunk(session_id, chunk)
│ │
│ └─ (工具调用通知由 AgentLoop 内部发送)

├─ 保存 AI 回复到数据库
├─ send_message_complete(session_id)
└─ 清理 CancelToken

流式推送

文件: backend/ws/streaming.py

提供两种流式推送策略:

StreamingResponseHandler

直接推送,可选分块和延迟:

handler = StreamingResponseHandler(
session_id="uuid",
chunk_size=50, # 每块 50 字符
delay_ms=0, # 无延迟
)
await handler.stream_iterator(agent_response)

BufferedStreamingHandler

缓冲推送,减少网络开销:

handler = BufferedStreamingHandler(
session_id="uuid",
buffer_size=100, # 缓冲 100 字符
flush_interval_ms=100, # 100ms 自动刷新
)
await handler.stream_iterator(agent_response)

缓冲策略:

  • 缓冲区满(≥ buffer_size)时自动刷新
  • 超过 flush_interval_ms 未刷新时自动刷新
  • 迭代结束时强制刷新

便捷函数

from backend.ws.streaming import stream_response, stream_text

# 流式推送迭代器
stats = await stream_response(session_id, iterator, use_buffer=True)

# 流式推送文本
stats = await stream_text(session_id, "Hello World", chunk_size=50)

工具通知

文件: backend/ws/tool_notifications.py

通用工具通知能力由 ToolNotificationHandler 提供;聊天主链路当前实际发送的是 tool_call / tool_result 两类消息:

await send_tool_call(
session_id=session_id,
tool="exec",
arguments={"command": "ls -la"},
)

await send_tool_result(
session_id=session_id,
tool="exec",
result="total 48\n...",
duration=15.2,
)

前端收到通知后在聊天界面展示工具调用卡片。

任务通知

文件: backend/ws/task_notifications.py

子代理任务状态变化时推送通知。当前实现提供的是一组细分事件,而不是单一 task_update

await notify_task_created(task_id="uuid", label="Long task", session_id=session_id)
await notify_task_status(task_id="uuid", status="running", progress=30)
await notify_task_progress(task_id="uuid", progress=60, message="正在处理")
await notify_task_complete(task_id="uuid", result="任务结果...")

取消机制

文件: backend/ws/connection.py

CancellationToken

token = get_cancel_token(session_id)
# → 创建或获取该会话的取消令牌

cancel_session(session_id)
# → 设置令牌为已取消状态

cleanup_cancel_token(session_id)
# → 清理令牌

Agent Loop 在每次迭代和工具执行前检查 cancel_token.is_cancelled,实现即时取消。

远程访问认证

/ws/chat 的认证逻辑位于 backend/app.py,当前实现不会因为本地访问而跳过鉴权。

认证方式

当前实现不接受 URL query 参数传递 token,只接受以下两种方式:

  • Cookie:CountBot_token
  • Authorization: Bearer <token> 请求头

例如:

Cookie: CountBot_token=YOUR_SESSION_TOKEN

或:

Authorization: Bearer YOUR_SESSION_TOKEN

认证失败处理

场景服务端行为关闭码
无效或缺失 token关闭连接4001
token 校验通过建立连接

前端处理

ws.onclose = (event) => {
if (event.code === 4001) {
// 认证失败,跳转登录页
window.location.href = '/login'
}
}

防代理绕过

HTTP 中间件中的“本地访问豁免”不适用于这个 WebSocket 端点。/ws/chat 会直接校验 Cookie 或 Bearer Token,不接受 query token。

详细认证机制参见 认证文档

前端集成

前端通过 WebSocket 连接实现实时通信:

// 建立连接
const ws = new WebSocket("ws://localhost:8000/ws/chat")

// 订阅会话
ws.send(JSON.stringify({
type: "subscribe",
sessionId: currentSessionId,
}))

// 发送消息
ws.send(JSON.stringify({
type: "message",
sessionId: currentSessionId,
content: userInput,
}))

// 接收消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
switch (data.type) {
case "message_chunk":
appendToChat(data.content)
break
case "tool_call":
showToolCard(data.tool, data.arguments)
break
case "tool_result":
updateToolCard(data.tool, data.result)
break
case "message_complete":
finishMessage()
break
case "error":
showError(data.message)
break
}
}

// 取消执行
ws.send(JSON.stringify({
type: "cancel",
sessionId: currentSessionId,
}))

相关文件

文件说明
backend/ws/connection.pyConnectionManager + 消息类型
backend/ws/events.py事件路由和处理
backend/ws/streaming.py流式推送策略
backend/ws/tool_notifications.py工具执行通知
backend/ws/task_notifications.py任务状态通知
backend/app.pyWebSocket 端点注册