跳到主要内容

子代理

CountBot 的后台子代理系统和多智能体协作系统,支持创建独立的后台 Agent 处理耗时任务,以及多个智能体协同完成复杂工作流。

目录


子代理系统

设计理念

  1. 异步执行 — 子代理在后台独立运行,不阻塞主对话
  2. 独立上下文 — 每个子代理有独立的消息列表和工具注册表
  3. 进度追踪 — 实时更新任务进度(0-100%)
  4. 结果通知 — 任务完成后通过 WebSocket 通知前端
  5. 资源隔离 — 子代理有独立的迭代限制和工具集

架构概览

┌─────────────────────────────────────────────────────────┐
│ 主 Agent Loop │
│ │
│ 用户: "分析项目代码结构并生成报告" │
│ Agent: spawn(task="分析项目代码结构并生成报告") │
│ │ │
│ ▼ │
│ SpawnTool.execute() │
│ │ │
│ ▼ │
│ SubagentManager.create_task() → task_id │
│ SubagentManager.execute_task(task_id) │
│ │ │
│ │ asyncio.create_task (后台运行) │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ SubagentTask │ │
│ │ │ │
│ │ 独立的 Agent Loop: │ │
│ │ ├─ 系统提示词(子代理专用) │ │
│ │ ├─ 独立工具注册表 │ │
│ │ │ (read_file, write_file, edit_file, │ │
│ │ │ list_dir, exec, web_fetch) │ │
│ │ ├─ max_iterations = 15 │ │
│ │ └─ 进度追踪 (0% → 100%) │ │
│ │ │ │
│ │ 完成后: status=COMPLETED, result="..." │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ WebSocket 通知前端: 任务完成 │
└─────────────────────────────────────────────────────────┘

核心组件

SubagentManager

文件: backend/modules/agent/subagent.py

子代理管理器,负责任务的创建、执行、查询和清理。

from backend.modules.agent.subagent import SubagentManager

manager = SubagentManager(
provider=llm_provider,
workspace=Path("/workspace"),
model="glm-5",
)

方法列表

方法说明
create_task(label, message, session_id)创建任务,返回 task_id
execute_task(task_id)异步执行任务
cancel_task(task_id)取消运行中的任务
get_task(task_id)获取任务信息
list_tasks(status, session_id)列出任务
get_running_tasks()获取运行中的任务
delete_task(task_id)删除任务
get_running_count()运行中任务数量
get_stats()统计信息
cleanup_old_tasks(max_age_hours)清理过期任务
register_notification_callback(cb)注册通知回调

SubagentTask

文件: backend/modules/agent/subagent.py

任务数据类,封装单个子代理任务的状态。

@dataclass
class SubagentTask:
task_id: str # UUID
label: str # 显示标签
message: str # 任务描述
session_id: str # 关联会话
status: TaskStatus # 状态
progress: int # 进度 (0-100)
result: str # 执行结果
error: str # 错误信息
created_at: datetime
completed_at: datetime

TaskStatus

class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"

SpawnTool

文件: backend/modules/tools/spawn.py

面向 Agent 的子代理创建工具。

{
"name": "spawn",
"parameters": {
"task": "分析项目代码结构并生成报告",
"label": "代码分析"
}
}
参数类型必填说明
taskstring任务描述
labelstring显示标签(默认取 task 前 30 字符)

返回示例:"子 Agent [代码分析] 已启动 (ID: uuid)。完成后我会通知你。"

任务生命周期

create_task()          execute_task()         _run_task()
│ │ │
▼ ▼ ▼
PENDING ──────────► RUNNING ──────────► COMPLETED
│ │
│ ├─► FAILED (异常)
│ └─► CANCELLED (取消)

└─ cancel_task() ──► CANCELLED

子代理执行流程

1. 构建独立环境

子代理有独立的系统提示词和工具注册表:

# 子代理专用系统提示词
system_prompt = _build_subagent_prompt(task)

# 独立工具注册表(比主 Agent 少)
tools = ToolRegistry()
tools.register(ReadFileTool(workspace))
tools.register(WriteFileTool(workspace))
tools.register(EditFileTool(workspace))
tools.register(ListDirTool(workspace))
tools.register(ExecTool(workspace, timeout=300, allow_dangerous=False))
tools.register(WebSearchTool()) # 可选
tools.register(WebFetchTool()) # 可选

2. 执行 Agent Loop

子代理运行独立的 ReAct 循环:

while iteration < 15:

├─ provider.chat_stream(messages, tools)
│ ├─ 收集 content
│ └─ 收集 tool_calls

├─ if tool_calls:
│ ├─ 执行工具
│ ├─ 添加 tool result
│ └─ progress += 5 (最大 90%)

└─ else: break

3. 完成处理

task.result = "".join(response_chunks)
task.status = TaskStatus.COMPLETED
task.progress = 100
task.completed_at = datetime.now()

工具权限

子代理可用的工具是主 Agent 的子集:

工具主 Agent子代理说明
read_file可用可用
write_file可用可用
edit_file可用可用
list_dir可用可用
exec可用可用超时 300s(主 Agent 30s)
web_fetch可用可用可选
spawn可用不可用防止递归创建
send_media可用不可用
screenshot可用不可用
memory可用不可用运行时默认是统一记忆工具

子代理的 Shell 工具超时设为 300 秒(主 Agent 默认 30 秒),适合执行耗时命令。

通知机制

WebSocket 通知

子代理状态变化时通过 WebSocket 通知前端:

manager.register_notification_callback(notify_callback)

# 通知事件类型
await _notify(task_id, "started") # 任务开始
await _notify(task_id, "completed") # 任务完成
await _notify(task_id, "failed") # 任务失败
await _notify(task_id, "cancelled") # 任务取消

前端展示

前端通过 WebSocket 接收任务状态更新,在 UI 中展示:

  • 任务列表(运行中/已完成/失败)
  • 进度条
  • 任务结果

API 接口

文件: backend/api/tasks.py

端点方法说明
/api/tasksGET列出任务
/api/tasks/{id}GET获取任务详情
/api/tasks/{id}/cancelPOST取消任务
/api/tasks/{id}DELETE删除任务
/api/tasks/statsGET任务统计

任务列表

GET /api/tasks?session_id=xxx&status=running

响应:

[
{
"task_id": "uuid",
"label": "代码分析",
"message": "分析项目代码结构并生成报告",
"status": "running",
"progress": 45,
"created_at": "2026-02-15T10:00:00",
"completed_at": null
}
]

任务统计

GET /api/tasks/stats

响应:

{
"total": 10,
"running": 1,
"completed": 8,
"failed": 1,
"cancelled": 0
}

使用场景

场景示例
代码分析"分析整个项目的代码结构并生成文档"
批量处理"将 data/ 目录下所有 CSV 转换为 JSON"
长时间编译"编译项目并运行所有测试"
数据采集"搜索并整理 Python 最佳实践资料"
报告生成"分析日志文件并生成错误报告"

配置参数

参数默认值说明
max_iterations15子代理最大迭代次数
Shell 超时300s子代理 Shell 命令超时
max_age_hours24h过期任务清理阈值
allow_dangerousfalse子代理不允许危险命令
restrict_to_workspacetrue限制在工作空间内

多智能体协作系统

CountBot 的多智能体工作流系统支持三种协作模式:Pipeline(流水线)、Graph(有向无环图)、Council(议会)。

系统架构

多智能体协作系统采用分层架构:

┌─────────────────────────────────────────────────────────┐
│ 用户层 │
│ Web UI / 飞书 / 钉钉 / QQ / Telegram │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ API 层 │
│ /api/agent-teams (CRUD) │
│ workflow_run 工具 / /api/agent-teams/{id}/config │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 工作流引擎层 │
│ WorkflowEngine (backend/modules/agent/workflow.py) │
│ ├─ Pipeline 编排器 │
│ ├─ Graph 调度器(DAG + 并行) │
│ └─ Council 协调器(多视角 + 交叉评审) │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 子代理管理层 │
│ SubagentManager (backend/modules/agent/subagent.py) │
│ ├─ 任务生命周期管理 │
│ ├─ 异步任务调度 │
│ ├─ 工具调用追踪 │
│ └─ 实时进度推送 │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ Agent Loop 层 │
│ 独立的 ReAct 循环 + 工具系统 + LLM 调用 │
└─────────────────────────────────────────────────────────┘

核心技术特性

1. 异步并发执行

使用 asyncio.gather() 实现多代理并行执行:

# Graph 模式:并发执行无依赖节点
ready_agents = [a for a in agents if not a.depends_on]
results = await asyncio.gather(*[
execute_agent(agent) for agent in ready_agents
])

# Council 模式:并发执行所有成员
results = await asyncio.gather(*[
execute_agent(agent) for agent in council_members
])

2. 流式内容传输

支持实时推送子代理的输出内容:

async for chunk in provider.chat_stream(messages, tools):
if chunk.content:
# 实时推送到 WebSocket
await event_callback("workflow_agent_chunk", {
"agent_id": agent.id,
"content": chunk.content
})

3. 工具调用追踪

记录每个子代理的工具调用详情:

tool_call_record = {
"tool": tool_name,
"arguments": arguments,
"result": result,
"success": True,
"duration": elapsed_ms,
"timestamp": datetime.now().isoformat()
}
task.tool_call_records.append(tool_call_record)

4. 执行元数据序列化

将工作流执行过程序列化为 HTML 注释,支持前端状态恢复:

metadata = f"""
<!-- WORKFLOW_EXECUTION_METADATA
mode: {mode}
agents: {len(agents)}
tool_calls: {total_tool_calls}
duration: {duration_ms}ms
-->
"""

5. 技能系统集成

支持为子代理动态注入技能文档:

if enable_skills:
skills_summary = build_skills_summary()
system_prompt += f"\n\n## 可用技能\n{skills_summary}"

协作模式

Pipeline 模式(流水线)

智能体按顺序执行任务,前一个的输出作为下一个的输入。

执行流程

async def execute_pipeline(agents, goal):
context = [] # 累积上下文

for agent in agents:
# 构建提示词:目标 + 前序输出
prompt = f"目标: {goal}\n\n前序输出:\n{context}"

# 执行当前阶段
result = await execute_agent(agent, prompt)

# 累积到上下文
context.append(f"## {agent.role}\n{result}")

return "\n\n".join(context)

使用场景

  • 文档处理流程(提取 → 分析 → 总结)
  • 数据处理管道(采集 → 清洗 → 分析)
  • 代码生成流程(设计 → 实现 → 测试)

示例配置

{
"name": "文档处理流水线",
"mode": "pipeline",
"agents": [
{
"id": "extractor",
"role": "信息提取专家",
"system_prompt": "你是信息提取专家,专注于从文档中提取关键信息",
"task": "提取文档中的关键信息和数据"
},
{
"id": "analyzer",
"role": "数据分析师",
"system_prompt": "你是数据分析师,专注于分析数据模式和趋势",
"task": "分析提取的信息,找出模式和趋势"
},
{
"id": "summarizer",
"role": "报告撰写专家",
"system_prompt": "你是报告撰写专家,专注于生成清晰的总结报告",
"task": "基于分析结果生成最终总结报告"
}
]
}

Graph 模式(有向无环图)

支持复杂的任务依赖关系,智能体可以并行执行。

执行流程

async def execute_graph(agents, goal):
# 1. 构建依赖映射
dep_map = {a.id: set(a.depends_on) for a in agents}

# 2. 检测循环依赖
if has_cycle(dep_map):
raise ValueError("检测到循环依赖")

# 3. 调度循环
completed = set()
results = {}

while len(completed) < len(agents):
# 找出所有无阻塞的节点
ready = [a for a in agents
if a.id not in completed
and dep_map[a.id].issubset(completed)]

# 并发执行
tasks = [execute_agent(a, goal, results) for a in ready]
agent_results = await asyncio.gather(*tasks)

# 更新结果
for agent, result in zip(ready, agent_results):
results[agent.id] = result
completed.add(agent.id)

return results

依赖解析

# 上游失败自动标记下游为失败
if any(dep in failed_agents for dep in agent.depends_on):
agent.status = "failed"
agent.error = "上游任务失败"
continue

使用场景

  • 复杂项目分析(多个模块并行分析)
  • 多源数据聚合(并行获取后汇总)
  • 并行代码审查(多个文件同时审查)

示例配置

{
"name": "项目分析图",
"mode": "graph",
"agents": [
{
"id": "frontend",
"role": "前端分析师",
"system_prompt": "你是前端分析师,专注于前端代码结构分析",
"task": "分析前端代码结构和技术栈",
"depends_on": []
},
{
"id": "backend",
"role": "后端分析师",
"system_prompt": "你是后端分析师,专注于后端代码结构分析",
"task": "分析后端代码结构和 API 设计",
"depends_on": []
},
{
"id": "database",
"role": "数据库专家",
"system_prompt": "你是数据库专家,专注于数据库设计分析",
"task": "分析数据库设计和数据模型",
"depends_on": []
},
{
"id": "integrator",
"role": "系统架构师",
"system_prompt": "你是系统架构师,专注于整体架构分析",
"task": "汇总前后端和数据库分析,生成完整的架构报告",
"depends_on": ["frontend", "backend", "database"]
}
]
}

Council 模式(议会)

多个智能体从不同视角分析同一问题,支持交叉审查。

执行流程

async def execute_council(agents, goal, cross_review=True):
# 第 1 轮:独立分析
round1_tasks = [
execute_agent(a, f"{goal}\n\n请从{a.perspective}角度分析")
for a in agents
]
round1_results = await asyncio.gather(*round1_tasks)

if not cross_review:
return format_council_results(round1_results)

# 第 2 轮:交叉评审
round2_tasks = []
for i, agent in enumerate(agents):
# 收集其他成员的立场
other_views = [
f"## {agents[j].role} ({agents[j].perspective})\n{round1_results[j]}"
for j in range(len(agents)) if j != i
]

prompt = f"""
目标: {goal}

你的初步分析:
{round1_results[i]}

其他成员的立场:
{chr(10).join(other_views)}

请评价其他成员的观点,并完善你的分析。
"""

round2_tasks.append(execute_agent(agent, prompt))

round2_results = await asyncio.gather(*round2_tasks)

return format_council_results(round2_results)

使用场景

  • 决策分析(多角度评估)
  • 代码审查(多人审查模式)
  • 方案评估(技术、成本、风险多维度)

示例配置

{
"name": "技术方案评审议会",
"mode": "council",
"cross_review": true,
"agents": [
{
"id": "tech_expert",
"role": "技术专家",
"system_prompt": "你是技术专家,专注于技术可行性和实现难度评估",
"perspective": "技术可行性"
},
{
"id": "business_analyst",
"role": "商业分析师",
"system_prompt": "你是商业分析师,专注于商业价值和市场需求评估",
"perspective": "商业价值"
},
{
"id": "risk_manager",
"role": "风险管理专家",
"system_prompt": "你是风险管理专家,专注于识别和评估潜在风险",
"perspective": "风险评估"
},
{
"id": "cost_controller",
"role": "成本控制专家",
"system_prompt": "你是成本控制专家,专注于成本效益分析",
"perspective": "成本效益"
}
]
}

团队管理 API

新增文件

  • backend/api/agent_teams.py - 团队管理 API
  • backend/models/agent_team.py - 团队数据模型
  • backend/modules/agent/workflow.py - 工作流引擎

API 端点

端点方法说明
/api/agent-teamsGET列出所有智能体团队
/api/agent-teamsPOST创建智能体团队
/api/agent-teams/{id}GET获取团队详情
/api/agent-teams/{id}PUT更新团队配置
/api/agent-teams/{id}DELETE删除团队
workflow_runTool在对话链路中执行团队工作流
/api/agent-teams/{id}/configGET/PUT/DELETE团队模型配置

创建团队

POST /api/agent-teams
{
"name": "文档处理团队",
"description": "专门处理文档的智能体团队",
"mode": "pipeline",
"agents": [
{
"name": "提取器",
"role": "从文档中提取关键信息",
"model": "glm-5",
"temperature": 0.3
},
{
"name": "分析器",
"role": "分析提取的信息",
"model": "glm-5",
"temperature": 0.5
}
],
"enabled": true
}

执行团队工作流

通过 `workflow_run(team_name=..., goal=...)` 触发团队执行
{
"task": "分析这份项目文档并生成总结",
"context": {
"file_path": "docs/project.md"
}
}

响应:

{
"execution_id": "uuid",
"status": "running",
"results": []
}

使用方式

1. 通过 API 创建团队

使用上述 API 创建智能体团队,配置协作模式和各个智能体的角色。

2. 在聊天中 @团队

在聊天窗口中直接 @团队名称触发协作执行:

用户: @文档处理团队 分析这份项目文档

系统会自动:

  1. 识别 @团队名称
  2. 加载团队配置
  3. 按照配置的模式执行工作流
  4. 返回协作结果

3. 在飞书等渠道中使用

在飞书、钉钉等渠道中,同样支持 @团队名称:

@CountBot @代码审查议会 审查这段代码

协作场景

场景 1:文档处理流水线

Pipeline: 提取器 → 分析器 → 总结器

输入: 一份 50 页的技术文档
输出: 结构化的总结报告

场景 2:项目分析图

Graph:
前端分析 ─┐
├─ 汇总报告
后端分析 ─┘

输入: 项目代码库
输出: 完整的项目分析报告

场景 3:代码审查议会

Council:
安全审查员 ─┐
性能审查员 ─┼─ 综合审查结果
可维护性审查员 ─┘

输入: 一段代码
输出: 多维度的审查意见

数据库变更

新增表:agent_teams

CREATE TABLE agent_teams (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
mode TEXT NOT NULL, -- pipeline/graph/council
config JSON NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

相关文件

子代理系统

文件说明
backend/modules/agent/subagent.pySubagentManager + SubagentTask
backend/modules/tools/spawn.pySpawnTool
backend/modules/agent/task_manager.py任务管理辅助
backend/api/tasks.py任务 API
backend/ws/task_notifications.pyWebSocket 任务通知

多智能体协作系统

文件说明
backend/api/agent_teams.py团队管理 API
backend/models/agent_team.py团队数据模型
backend/modules/agent/workflow.py工作流引擎