Pattern: Multi-Agent Coordination
随着 AI 任务复杂度的增长,单个 agent 往往不够用。Claude Code 实现了三种不同的 multi-agent 协调模式,每种模式适用于不同场景。了解何时使用哪种模式,对于构建高效的 agent 系统至关重要。
graph TB
subgraph "Pattern 1: Fork"
FP["Parent Agent"]
FC1["Child 1"]
FC2["Child 2"]
FC3["Child 3"]
FP --> FC1
FP --> FC2
FP --> FC3
FC1 -.->|result| FP
FC2 -.->|result| FP
FC3 -.->|result| FP
end
subgraph "Pattern 2: Coordinator"
CO["Coordinator"]
CW1["Worker 1"]
CW2["Worker 2"]
CW3["Worker 3"]
CO -->|task| CW1
CO -->|task| CW2
CO -->|task| CW3
CW1 -.->|report| CO
CW2 -.->|report| CO
CW3 -.->|report| CO
CO -->|more tasks| CW1
end
subgraph "Pattern 3: Team"
TA["Agent A"]
TB["Agent B"]
TC["Agent C"]
MB["Shared Mailbox"]
TA <-->|message| MB
TB <-->|message| MB
TC <-->|message| MB
end
| 维度 | Fork | Coordinator | Team |
|---|---|---|---|
| 拓扑 | 星形(父 → 子) | 轮毂辐射型(coordinator → worker) | 网状(点对点) |
| 通信 | 单向(返回结果) | 双向(任务/报告) | 多向(mailbox) |
| 子节点自主性 | 发后即忘 | 受管生命周期 | 完全自主 |
| 父节点开销 | 低 | 高(协调逻辑) | 无(无中央控制) |
| 复杂度 | ⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 最适合 | 独立子任务 | 复杂工作流 | 协作式问题解决 |
| 最大 agent 数 | 5-10 | 3-7 | 2-4 |
| Cache 复用 | ✅ 极好 | ✅ 好 | ⚠️ 有限 |
| 错误隔离 | ✅ 每个子节点独立 | ⚠️ coordinator 是单点故障 | ❌ 错误可能级联 |
模式一:Fork
Section titled “模式一:Fork”最简单的 multi-agent 模式。父 agent 派生出独立的子 agent,每个子 agent 处理一个独立的子任务。子 agent 并行运行,返回结果,父 agent 进行综合。
- 任务是令人尴尬的并行(子任务之间没有依赖)
- 每个子任务是自包含的,有清晰的输入/输出
- 希望最大化 cache 复用(所有 fork 共享父级的前缀)
interface ForkResult { taskId: string; result: string; tokenUsage: number;}
async function forkPattern( parentContext: AgentContext, tasks: string[],): Promise<ForkResult[]> { // 所有 fork 共享父级的 system prompt 和 context const sharedPrefix = { system: parentContext.systemPrompt, messages: parentContext.messages, // Cache 友好的共享前缀 };
// 并行启动所有 fork const forkPromises = tasks.map((task, i) => runSubAgent({ ...sharedPrefix, messages: [ ...sharedPrefix.messages, { role: 'user', content: `Sub-task ${i + 1}: ${task}\n\nComplete this specific sub-task and return your findings.`, }, ], maxTurns: 10, }).then(result => ({ taskId: `fork-${i}`, result: result.finalMessage, tokenUsage: result.tokensUsed, })) );
return Promise.all(forkPromises);}示例:多文件代码审查
Section titled “示例:多文件代码审查”// 父 agent 识别变更文件,然后 fork 审查者const changedFiles = ['src/auth.ts', 'src/api.ts', 'src/utils.ts', 'tests/auth.test.ts'];
const reviews = await forkPattern(parentContext, changedFiles.map(file => `Review ${file} for: 1. Security vulnerabilities 2. Performance issues 3. Code style violations Return a structured review with severity ratings.`));
// 父 agent 将所有审查综合为统一报告const synthesis = await callLLM({ messages: [{ role: 'user', content: `Synthesize these code reviews into a unified PR report:\n${ reviews.map(r => `### ${r.taskId}\n${r.result}`).join('\n\n') }`, }],});模式二:Coordinator
Section titled “模式二:Coordinator”中央 coordinator agent 管理工作流,将任务分配给 worker agent,监控进度,并根据需要重新分配工作。与 Fork 不同,coordinator 可以根据中间结果做出动态决策。
- 任务之间有依赖关系(任务 B 需要任务 A 的输出)
- 工作流需要自适应规划(下一个任务取决于结果)
- 需要集中式错误处理和重试
interface WorkerTask { id: string; description: string; dependencies: string[]; // 必须先完成的任务 ID assignedTo?: string; status: 'pending' | 'running' | 'complete' | 'failed'; result?: string;}
class CoordinatorPattern { private tasks: WorkerTask[] = []; private workers = new Map<string, SubAgent>(); private results = new Map<string, string>();
constructor( private readonly coordinatorLLM: LLMClient, private readonly maxWorkers: number = 3, ) {}
async execute(goal: string): Promise<string> { // 第一阶段:规划任务 this.tasks = await this.planTasks(goal);
// 第二阶段:动态协调执行 while (this.hasPendingTasks()) { // 找出依赖已满足的任务 const ready = this.tasks.filter(t => t.status === 'pending' && t.dependencies.every(d => this.results.has(d)) );
// 为就绪任务启动 worker(不超过并发限制) const toRun = ready.slice(0, this.maxWorkers - this.workers.size);
for (const task of toRun) { task.status = 'running'; const worker = this.launchWorker(task); this.workers.set(task.id, worker); }
// 等待任意一个 worker 完成 const completed = await Promise.race( [...this.workers.entries()].map(async ([id, worker]) => { const result = await worker.waitForCompletion(); return { id, result }; }) );
// 处理完成 this.results.set(completed.id, completed.result); this.workers.delete(completed.id); this.tasks.find(t => t.id === completed.id)!.status = 'complete';
// 第三阶段:如有必要则重新规划 const shouldReplan = await this.evaluateProgress(); if (shouldReplan) { const newTasks = await this.replanRemainingTasks(); this.tasks.push(...newTasks); } }
// 第四阶段:综合最终结果 return this.synthesizeResults(); }
private async planTasks(goal: string): Promise<WorkerTask[]> { const plan = await this.coordinatorLLM.complete({ messages: [{ role: 'user', content: `Break this goal into concrete tasks with dependencies: Goal: ${goal} Return JSON: [{id, description, dependencies: []}]`, }], }); return JSON.parse(plan.content); }
private launchWorker(task: WorkerTask): SubAgent { // 将依赖结果注入 worker 的 context const dependencyContext = task.dependencies .map(d => `Result of "${d}": ${this.results.get(d)}`) .join('\n');
return createSubAgent({ task: `${task.description}\n\nContext from prior tasks:\n${dependencyContext}`, maxTurns: 15, }); }}Coordinator 工作流可视化
Section titled “Coordinator 工作流可视化”sequenceDiagram
participant C as Coordinator
participant W1 as Worker 1
participant W2 as Worker 2
participant W3 as Worker 3
C->>C: Plan tasks (A, B, C, D)
Note over C: A, B have no deps; C depends on A; D depends on B, C
par Independent tasks
C->>W1: Task A (analyze codebase)
C->>W2: Task B (read requirements)
end
W1-->>C: Result A
C->>W3: Task C (design solution) [depends on A]
W2-->>C: Result B
Note over C: D waiting for B and C
W3-->>C: Result C
C->>W1: Task D (implement) [depends on B, C]
W1-->>C: Result D
C->>C: Synthesize final result
模式三:Team(Swarm)
Section titled “模式三:Team(Swarm)”多个 agent 以对等方式运作,没有中央权威。它们通过共享 mailbox(消息总线)进行通信,每个 agent 根据来自其他 agent 的消息独立决定要处理什么。
- 问题需要多元专业知识(每个 agent 专精某个领域)
- Agent 需要迭代协作(来回讨论)
- 没有单个 agent 能看到全局
interface MailboxMessage { from: string; to: string | 'all'; type: 'request' | 'response' | 'update' | 'done'; content: string; timestamp: number;}
class TeamPattern { private mailbox: MailboxMessage[] = []; private agents = new Map<string, TeamAgent>();
addAgent(id: string, role: string, specialization: string) { this.agents.set(id, new TeamAgent(id, role, specialization, this.mailbox)); }
async execute(goal: string, maxRounds: number = 10): Promise<string> { // 向所有 agent 广播目标 this.broadcast({ from: 'system', to: 'all', type: 'request', content: goal, timestamp: Date.now() });
for (let round = 0; round < maxRounds; round++) { // 每个 agent 处理未读消息并响应 const roundPromises = [...this.agents.values()].map(agent => agent.processMailbox() );
await Promise.all(roundPromises);
// 检查是否有 agent 发出完成信号 const doneMessages = this.mailbox.filter(m => m.type === 'done'); if (doneMessages.length >= this.agents.size * 0.5) { break; // 多数共识:完成 } }
return this.synthesizeMailbox(); }
private broadcast(message: MailboxMessage) { this.mailbox.push(message); }
private synthesizeMailbox(): string { return this.mailbox .filter(m => m.type === 'response' || m.type === 'done') .map(m => `[${m.from}]: ${m.content}`) .join('\n\n'); }}
class TeamAgent { private lastReadIndex = 0;
constructor( private id: string, private role: string, private specialization: string, private mailbox: MailboxMessage[], ) {}
async processMailbox(): Promise<void> { // 读取未读消息 const unread = this.mailbox.slice(this.lastReadIndex); this.lastReadIndex = this.mailbox.length;
if (unread.length === 0) return;
// 使用角色专属 system prompt 的 LLM 生成响应 const response = await callLLM({ system: `You are ${this.role}, specializing in ${this.specialization}. Read the team messages and contribute your expertise. Reply to relevant messages or add new insights.`, messages: [{ role: 'user', content: unread.map(m => `[${m.from}] (${m.type}): ${m.content}`).join('\n'), }], });
// 将响应发布到 mailbox this.mailbox.push({ from: this.id, to: 'all', type: 'response', content: response.content, timestamp: Date.now(), }); }}这些模式与流行的 multi-agent 框架相比如何?
| 特性 | Claude Code | AutoGen | CrewAI | LangGraph |
|---|---|---|---|---|
| Fork(并行) | ✅ 原生 | ⚠️ 通过 GroupChat | ❌ 仅顺序 | ✅ 通过分支 |
| Coordinator | ✅ 原生 | ✅ 通过 AssistantAgent | ✅ 通过 Manager | ✅ 通过状态图 |
| Team/Swarm | ✅ 原生 | ✅ GroupChat | ✅ Crew roles | ⚠️ 设置复杂 |
| 通信 | Mailbox + 直接 | GroupChat | 委托 | 状态传递 |
| Cache 复用 | ✅ Prompt Cache | ❌ 无 | ❌ 无 | ❌ 无 |
| 隔离 | ✅ Git worktree | ❌ 共享内存 | ❌ 共享状态 | ⚠️ 每节点状态 |
| 动态重规划 | ✅ Coordinator 重规划 | ✅ 嵌套对话 | ⚠️ 手动 | ✅ 条件边 |
| 实用最大 agent 数 | 5-10 | 10+ | 5-7 | 无硬限制 |
| 学习曲线 | 中等 | 高 | 低 | 高 |
| Token 效率 | ✅ 最优(cache 复用) | ⚠️ 一般 | ⚠️ 一般 | ⚠️ 一般 |
决策树:选择哪种模式
Section titled “决策树:选择哪种模式”graph TD
START["任务结构是什么?"] --> Q1{"子任务是否<br/>独立?"}
Q1 -->|"是,完全独立"| FORK["✅ 使用 Fork<br/>最大并行度<br/>最佳 cache 复用"]
Q1 -->|"有一些依赖"| Q2{"需要动态<br/>重规划?"}
Q2 -->|"不需要,固定计划"| Q3{"有几个<br/>依赖层级?"}
Q3 -->|"1 层"| FORK2["✅ 使用 Fork<br/>分组顺序执行"]
Q3 -->|"2 层以上"| COORD["✅ 使用 Coordinator<br/>管理依赖 DAG"]
Q2 -->|"需要"| COORD
Q1 -->|"深度交织"| Q4{"需要多元<br/>专业知识?"}
Q4 -->|"需要"| TEAM["✅ 使用 Team<br/>对等协作"]
Q4 -->|"不需要"| COORD2["✅ 使用 Coordinator<br/>带迭代"]
style FORK fill:#4ade80
style FORK2 fill:#4ade80
style COORD fill:#60a5fa
style COORD2 fill:#60a5fa
style TEAM fill:#c084fc
| 场景 | 推荐模式 |
|---|---|
| 独立审查 5 个文件 | Fork |
| 跨多个文件实现一个功能 | Coordinator |
| 调试复杂的跨系统问题 | Team |
| 并行运行测试 | Fork |
| 带依赖分析的重构 | Coordinator |
| 前端 + 后端 + 安全视角的设计评审 | Team |
| 为 10 种语言生成翻译 | Fork |
| 多步骤数据流水线 | Coordinator |
可复用 Agent 编排模板
Section titled “可复用 Agent 编排模板”// ============================================// 通用 Agent 编排器// ============================================
type OrchestrationType = 'fork' | 'coordinator' | 'team';
interface OrchestrationConfig { type: OrchestrationType; maxAgents: number; maxRounds: number; timeout: number; cacheReuse: boolean;}
interface OrchestratorResult { type: OrchestrationType; results: AgentResult[]; totalTokens: number; wallTime: number;}
async function orchestrate( goal: string, context: AgentContext, config: OrchestrationConfig,): Promise<OrchestratorResult> { const start = performance.now();
switch (config.type) { case 'fork': { // 第一步:分解为独立任务 const tasks = await decomposeTasks(goal, context);
// 第二步:全部并行运行 const results = await Promise.all( tasks.slice(0, config.maxAgents).map(task => runSubAgent({ system: context.systemPrompt, messages: [...context.messages, { role: 'user', content: task }], maxTurns: config.maxRounds, }) ) );
return { type: 'fork', results, totalTokens: results.reduce((sum, r) => sum + r.tokensUsed, 0), wallTime: performance.now() - start, }; }
case 'coordinator': { const coordinator = new CoordinatorPattern(context.llm, config.maxAgents); const result = await coordinator.execute(goal); return { type: 'coordinator', results: [{ finalMessage: result, tokensUsed: 0 }], totalTokens: 0, // 内部跟踪 wallTime: performance.now() - start, }; }
case 'team': { const team = new TeamPattern(); // 根据目标添加专业化 agent const roles = await identifyNeededRoles(goal, context); roles.forEach(role => team.addAgent(role.id, role.name, role.specialization));
const result = await team.execute(goal, config.maxRounds); return { type: 'team', results: [{ finalMessage: result, tokensUsed: 0 }], totalTokens: 0, wallTime: performance.now() - start, }; } }}演进路径:从简单开始,逐步扩展
Section titled “演进路径:从简单开始,逐步扩展”graph LR
S1["单 Agent<br/>1 个,顺序"] -->|"任务过大"| S2["Fork<br/>N 个,并行"]
S2 -->|"需要协调"| S3["Coordinator<br/>1 个管理者 + N 个 worker"]
S3 -->|"需要协作"| S4["Team<br/>N 个对等 + mailbox"]
style S1 fill:#94a3b8
style S2 fill:#4ade80
style S3 fill:#60a5fa
style S4 fill:#c084fc
| 反模式 | 问题 | 更好的做法 |
|---|---|---|
| 对独立任务使用 Team | 不必要的通信开销 | 使用 Fork |
| 对有依赖的任务使用 Fork | 竞态条件,输入缺失 | 使用 Coordinator |
| Agent 数量过多(>10 个) | 指数级通信成本 | 拆分为子组 |
| 没有每个 agent 的超时 | 一个卡住的 agent 阻塞所有人 | 每个 agent 的超时 + 回退 |
| 共享可变状态 | 竞态条件 | 不可变消息或隔离的 worktree |
| Coordinator 做所有实际工作 | 违背初衷,形成瓶颈 | 将实际工作委托给 worker |