子代理
CountBot 的后台子代理系统和多智能体协作系统,支持创建独立的后台 Agent 处理耗时任务,以及多个智能体协同完成复杂工作流。
目录
子代理系统
设计理念
- 异步执行 — 子代理在后台独立运行,不阻塞主对话
- 独立上下文 — 每个子代理有独立的消息列表和工具注册表
- 进度追踪 — 实时更新任务进度(0-100%)
- 结果通知 — 任务完成后通过 WebSocket 通知前端
- 资源隔离 — 子代理有独立的迭代限制和工具集
架构概览
┌─────────────────────────────────────────────────────────┐
│ 主 Agent Loop │
│ │
│ 用户: "分析项目代码结构并生成报告" │
│ Agent: spawn(task="分析项目代码结构并生成报告") │
│ │ │
│ ▼ │
│ SpawnTool.execute() │
│ │ │
│ ▼ │
│ SubagentManager.create_task() → task_id │
│ SubagentManager.execute_task(task_id) │
│ │ │
│ │ asyncio.create_task (后台运行) │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ SubagentTask │ │
│ │ │ │
│ │ 独立的 Agent Loop: │ │
│ │ ├─ 系统提示词(子代理专用) │ │
│ │ ├─ 独立工具注册表 │ │
│ │ │ (read_file, write_file, edit_file, │ │
│ │ │ list_dir, exec, web_fetch) │ │
│ │ ├─ max_iterations = 15 │ │
│ │ └─ 进度追踪 (0% → 100%) │ │
│ │ │ │
│ │ 完成后: status=COMPLETED, result="..." │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ WebSocket 通知前端: 任务完成 │
└─────────────────────────────────────────────────────────┘
核心组件
SubagentManager
文件: backend/modules/agent/subagent.py
子代理管理器,负责任务的创建、执行、查询和清理。
from backend.modules.agent.subagent import SubagentManager
manager = SubagentManager(
provider=llm_provider,
workspace=Path("/workspace"),
model="glm-5",
)
方法列表
| 方法 | 说明 |
|---|---|
create_task(label, message, session_id) | 创建任务,返回 task_id |
execute_task(task_id) | 异步执行任务 |
cancel_task(task_id) | 取消运行中的任务 |
get_task(task_id) | 获取任务信息 |
list_tasks(status, session_id) | 列出任务 |
get_running_tasks() | 获取运行中的任务 |
delete_task(task_id) | 删除任务 |
get_running_count() | 运行中任务数量 |
get_stats() | 统计信息 |
cleanup_old_tasks(max_age_hours) | 清理过期任务 |
register_notification_callback(cb) | 注册通知回调 |
SubagentTask
文件: backend/modules/agent/subagent.py
任务数据类,封装单个子代理任务的状态。
@dataclass
class SubagentTask:
task_id: str # UUID
label: str # 显示标签
message: str # 任务描述
session_id: str # 关联会话
status: TaskStatus # 状态
progress: int # 进度 (0-100)
result: str # 执行结果
error: str # 错误信息
created_at: datetime
completed_at: datetime
TaskStatus
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
SpawnTool
文件: backend/modules/tools/spawn.py
面向 Agent 的子代理创建工具。
{
"name": "spawn",
"parameters": {
"task": "分析项目代码结构并生成报告",
"label": "代码分析"
}
}
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
task | string | 是 | 任务描述 |
label | string | 否 | 显示标签(默认取 task 前 30 字符) |
返回示例:"子 Agent [代码分析] 已启动 (ID: uuid)。完成后我会通知你。"
任务生命周期
create_task() execute_task() _run_task()
│ │ │
▼ ▼ ▼
PENDING ──────────► RUNNING ──────────► COMPLETED
│ │
│ ├─► FAILED (异常)
│ └─► CANCELLED (取消)
│
└─ cancel_task() ──► CANCELLED
子代理执行流程
1. 构建独立环境
子代理有独立的系统提示词和工具注册表:
# 子代理专用系统提示词
system_prompt = _build_subagent_prompt(task)
# 独立工具注册表(比主 Agent 少)
tools = ToolRegistry()
tools.register(ReadFileTool(workspace))
tools.register(WriteFileTool(workspace))
tools.register(EditFileTool(workspace))
tools.register(ListDirTool(workspace))
tools.register(ExecTool(workspace, timeout=300, allow_dangerous=False))
tools.register(WebSearchTool()) # 可选
tools.register(WebFetchTool()) # 可选
2. 执行 Agent Loop
子代理运行独立的 ReAct 循环:
while iteration < 15:
│
├─ provider.chat_stream(messages, tools)
│ ├─ 收集 content
│ └─ 收集 tool_calls
│
├─ if tool_calls:
│ ├─ 执行工具
│ ├─ 添加 tool result
│ └─ progress += 5 (最大 90%)
│
└─ else: break
3. 完成处理
task.result = "".join(response_chunks)
task.status = TaskStatus.COMPLETED
task.progress = 100
task.completed_at = datetime.now()
工具权限
子代理可用的工具是主 Agent 的子集:
| 工具 | 主 Agent | 子代理 | 说明 |
|---|---|---|---|
| read_file | 可用 | 可用 | |
| write_file | 可用 | 可用 | |
| edit_file | 可用 | 可用 | |
| list_dir | 可用 | 可用 | |
| exec | 可用 | 可用 | 超时 300s(主 Agent 30s) |
| web_fetch | 可用 | 可用 | 可选 |
| spawn | 可用 | 不可用 | 防止递归创建 |
| send_media | 可用 | 不可用 | |
| screenshot | 可用 | 不可用 | |
| memory | 可用 | 不可用 | 运行时默认是统一记忆工具 |
子代理的 Shell 工具超时设为 300 秒(主 Agent 默认 30 秒),适合执行耗时命令。
通知机制
WebSocket 通知
子代理状态变化时通过 WebSocket 通知前端:
manager.register_notification_callback(notify_callback)
# 通知事件类型
await _notify(task_id, "started") # 任务开始
await _notify(task_id, "completed") # 任务完成
await _notify(task_id, "failed") # 任务失败
await _notify(task_id, "cancelled") # 任务取消
前端展示
前端通过 WebSocket 接收任务状态更新,在 UI 中展示:
- 任务列表(运行中/已完成/失败)
- 进度条
- 任务结果
API 接口
文件: backend/api/tasks.py
| 端点 | 方法 | 说明 |
|---|---|---|
/api/tasks | GET | 列出任务 |
/api/tasks/{id} | GET | 获取任务详情 |
/api/tasks/{id}/cancel | POST | 取消任务 |
/api/tasks/{id} | DELETE | 删除任务 |
/api/tasks/stats | GET | 任务统 计 |
任务列表
GET /api/tasks?session_id=xxx&status=running
响应:
[
{
"task_id": "uuid",
"label": "代码分析",
"message": "分析项目代码结构并生成报告",
"status": "running",
"progress": 45,
"created_at": "2026-02-15T10:00:00",
"completed_at": null
}
]
任务统计
GET /api/tasks/stats
响应:
{
"total": 10,
"running": 1,
"completed": 8,
"failed": 1,
"cancelled": 0
}
使用场景
| 场景 | 示例 |
|---|---|
| 代码分析 | "分析整个项目的代码结构并生成文档" |
| 批量处理 | "将 data/ 目录下所有 CSV 转换为 JSON" |
| 长时间编译 | "编译项目并运行所有测试" |
| 数据采集 | "搜索并整理 Python 最佳实践资料" |
| 报告生成 | "分析日志文件并生成错误报告" |
配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
max_iterations | 15 | 子代理最大迭代次数 |
| Shell 超时 | 300s | 子代理 Shell 命令超时 |
max_age_hours | 24h | 过期任务清理阈值 |
allow_dangerous | false | 子代理不允许危险命令 |
restrict_to_workspace | true | 限制在工作空间内 |
多智能体协作系统
CountBot 的多智能体工作流系统支持三种协作模式:Pipeline(流水线)、Graph(有向无环图)、Council(议会)。
系统架构
多智能体协作系统采用分层架构:
┌─────────────────────────────────────────────────────────┐
│ 用户层 │
│ Web UI / 飞书 / 钉钉 / QQ / Telegram │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ API 层 │
│ /api/agent-teams (CRUD) │
│ workflow_run 工具 / /api/agent-teams/{id}/config │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 工作流引擎层 │
│ WorkflowEngine (backend/modules/agent/workflow.py) │
│ ├─ Pipeline 编排器 │
│ ├─ Graph 调度器(DAG + 并行) │
│ └─ Council 协调器(多视角 + 交叉评审) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 子代理管理层 │
│ SubagentManager (backend/modules/agent/subagent.py) │
│ ├─ 任务生命周期管理 │
│ ├─ 异步任务调度 │
│ ├─ 工具调用追踪 │
│ └─ 实时进度推送 │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Agent Loop 层 │
│ 独立的 ReAct 循环 + 工具系统 + LLM 调用 │
└─────────────────────────────────────────────────────────┘
核心技术 特性
1. 异步并发执行
使用 asyncio.gather() 实现多代理并行执行:
# Graph 模式:并发执行无依赖节点
ready_agents = [a for a in agents if not a.depends_on]
results = await asyncio.gather(*[
execute_agent(agent) for agent in ready_agents
])
# Council 模式:并发执行所有成员
results = await asyncio.gather(*[
execute_agent(agent) for agent in council_members
])
2. 流式内容传输
支持实时推送子代理的输出内容:
async for chunk in provider.chat_stream(messages, tools):
if chunk.content:
# 实时推送到 WebSocket
await event_callback("workflow_agent_chunk", {
"agent_id": agent.id,
"content": chunk.content
})
3. 工具调用追踪
记录每个子代理的工具调用详情:
tool_call_record = {
"tool": tool_name,
"arguments": arguments,
"result": result,
"success": True,
"duration": elapsed_ms,
"timestamp": datetime.now().isoformat()
}
task.tool_call_records.append(tool_call_record)