跳转到内容

Pattern: Multi-Agent Coordination

随着 AI 任务复杂度的增长,单个 agent 往往不够用。Claude Code 实现了三种不同的 multi-agent 协调模式,每种模式适用于不同场景。了解何时使用哪种模式,对于构建高效的 agent 系统至关重要。

graph TB
    subgraph "Pattern 1: Fork"
        FP["Parent Agent"]
        FC1["Child 1"]
        FC2["Child 2"]
        FC3["Child 3"]
        FP --> FC1
        FP --> FC2
        FP --> FC3
        FC1 -.->|result| FP
        FC2 -.->|result| FP
        FC3 -.->|result| FP
    end

    subgraph "Pattern 2: Coordinator"
        CO["Coordinator"]
        CW1["Worker 1"]
        CW2["Worker 2"]
        CW3["Worker 3"]
        CO -->|task| CW1
        CO -->|task| CW2
        CO -->|task| CW3
        CW1 -.->|report| CO
        CW2 -.->|report| CO
        CW3 -.->|report| CO
        CO -->|more tasks| CW1
    end

    subgraph "Pattern 3: Team"
        TA["Agent A"]
        TB["Agent B"]
        TC["Agent C"]
        MB["Shared Mailbox"]
        TA <-->|message| MB
        TB <-->|message| MB
        TC <-->|message| MB
    end
维度ForkCoordinatorTeam
拓扑星形(父 → 子)轮毂辐射型(coordinator → worker)网状(点对点)
通信单向(返回结果)双向(任务/报告)多向(mailbox)
子节点自主性发后即忘受管生命周期完全自主
父节点开销高(协调逻辑)无(无中央控制)
复杂度⭐⭐⭐⭐⭐⭐⭐⭐
最适合独立子任务复杂工作流协作式问题解决
最大 agent 数5-103-72-4
Cache 复用✅ 极好✅ 好⚠️ 有限
错误隔离✅ 每个子节点独立⚠️ coordinator 是单点故障❌ 错误可能级联

最简单的 multi-agent 模式。父 agent 派生出独立的子 agent,每个子 agent 处理一个独立的子任务。子 agent 并行运行,返回结果,父 agent 进行综合。

  • 任务是令人尴尬的并行(子任务之间没有依赖)
  • 每个子任务是自包含的,有清晰的输入/输出
  • 希望最大化 cache 复用(所有 fork 共享父级的前缀)
interface ForkResult {
taskId: string;
result: string;
tokenUsage: number;
}
async function forkPattern(
parentContext: AgentContext,
tasks: string[],
): Promise<ForkResult[]> {
// 所有 fork 共享父级的 system prompt 和 context
const sharedPrefix = {
system: parentContext.systemPrompt,
messages: parentContext.messages, // Cache 友好的共享前缀
};
// 并行启动所有 fork
const forkPromises = tasks.map((task, i) =>
runSubAgent({
...sharedPrefix,
messages: [
...sharedPrefix.messages,
{
role: 'user',
content: `Sub-task ${i + 1}: ${task}\n\nComplete this specific sub-task and return your findings.`,
},
],
maxTurns: 10,
}).then(result => ({
taskId: `fork-${i}`,
result: result.finalMessage,
tokenUsage: result.tokensUsed,
}))
);
return Promise.all(forkPromises);
}
// 父 agent 识别变更文件,然后 fork 审查者
const changedFiles = ['src/auth.ts', 'src/api.ts', 'src/utils.ts', 'tests/auth.test.ts'];
const reviews = await forkPattern(parentContext, changedFiles.map(file =>
`Review ${file} for:
1. Security vulnerabilities
2. Performance issues
3. Code style violations
Return a structured review with severity ratings.`
));
// 父 agent 将所有审查综合为统一报告
const synthesis = await callLLM({
messages: [{
role: 'user',
content: `Synthesize these code reviews into a unified PR report:\n${
reviews.map(r => `### ${r.taskId}\n${r.result}`).join('\n\n')
}`,
}],
});

中央 coordinator agent 管理工作流,将任务分配给 worker agent,监控进度,并根据需要重新分配工作。与 Fork 不同,coordinator 可以根据中间结果做出动态决策

  • 任务之间有依赖关系(任务 B 需要任务 A 的输出)
  • 工作流需要自适应规划(下一个任务取决于结果)
  • 需要集中式错误处理和重试
interface WorkerTask {
id: string;
description: string;
dependencies: string[]; // 必须先完成的任务 ID
assignedTo?: string;
status: 'pending' | 'running' | 'complete' | 'failed';
result?: string;
}
class CoordinatorPattern {
private tasks: WorkerTask[] = [];
private workers = new Map<string, SubAgent>();
private results = new Map<string, string>();
constructor(
private readonly coordinatorLLM: LLMClient,
private readonly maxWorkers: number = 3,
) {}
async execute(goal: string): Promise<string> {
// 第一阶段:规划任务
this.tasks = await this.planTasks(goal);
// 第二阶段:动态协调执行
while (this.hasPendingTasks()) {
// 找出依赖已满足的任务
const ready = this.tasks.filter(t =>
t.status === 'pending' &&
t.dependencies.every(d => this.results.has(d))
);
// 为就绪任务启动 worker(不超过并发限制)
const toRun = ready.slice(0, this.maxWorkers - this.workers.size);
for (const task of toRun) {
task.status = 'running';
const worker = this.launchWorker(task);
this.workers.set(task.id, worker);
}
// 等待任意一个 worker 完成
const completed = await Promise.race(
[...this.workers.entries()].map(async ([id, worker]) => {
const result = await worker.waitForCompletion();
return { id, result };
})
);
// 处理完成
this.results.set(completed.id, completed.result);
this.workers.delete(completed.id);
this.tasks.find(t => t.id === completed.id)!.status = 'complete';
// 第三阶段:如有必要则重新规划
const shouldReplan = await this.evaluateProgress();
if (shouldReplan) {
const newTasks = await this.replanRemainingTasks();
this.tasks.push(...newTasks);
}
}
// 第四阶段:综合最终结果
return this.synthesizeResults();
}
private async planTasks(goal: string): Promise<WorkerTask[]> {
const plan = await this.coordinatorLLM.complete({
messages: [{
role: 'user',
content: `Break this goal into concrete tasks with dependencies:
Goal: ${goal}
Return JSON: [{id, description, dependencies: []}]`,
}],
});
return JSON.parse(plan.content);
}
private launchWorker(task: WorkerTask): SubAgent {
// 将依赖结果注入 worker 的 context
const dependencyContext = task.dependencies
.map(d => `Result of "${d}": ${this.results.get(d)}`)
.join('\n');
return createSubAgent({
task: `${task.description}\n\nContext from prior tasks:\n${dependencyContext}`,
maxTurns: 15,
});
}
}
sequenceDiagram
    participant C as Coordinator
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant W3 as Worker 3

    C->>C: Plan tasks (A, B, C, D)
    Note over C: A, B have no deps; C depends on A; D depends on B, C

    par Independent tasks
        C->>W1: Task A (analyze codebase)
        C->>W2: Task B (read requirements)
    end

    W1-->>C: Result A
    C->>W3: Task C (design solution) [depends on A]

    W2-->>C: Result B
    Note over C: D waiting for B and C

    W3-->>C: Result C
    C->>W1: Task D (implement) [depends on B, C]

    W1-->>C: Result D
    C->>C: Synthesize final result

多个 agent 以对等方式运作,没有中央权威。它们通过共享 mailbox(消息总线)进行通信,每个 agent 根据来自其他 agent 的消息独立决定要处理什么。

  • 问题需要多元专业知识(每个 agent 专精某个领域)
  • Agent 需要迭代协作(来回讨论)
  • 没有单个 agent 能看到全局
interface MailboxMessage {
from: string;
to: string | 'all';
type: 'request' | 'response' | 'update' | 'done';
content: string;
timestamp: number;
}
class TeamPattern {
private mailbox: MailboxMessage[] = [];
private agents = new Map<string, TeamAgent>();
addAgent(id: string, role: string, specialization: string) {
this.agents.set(id, new TeamAgent(id, role, specialization, this.mailbox));
}
async execute(goal: string, maxRounds: number = 10): Promise<string> {
// 向所有 agent 广播目标
this.broadcast({ from: 'system', to: 'all', type: 'request', content: goal, timestamp: Date.now() });
for (let round = 0; round < maxRounds; round++) {
// 每个 agent 处理未读消息并响应
const roundPromises = [...this.agents.values()].map(agent =>
agent.processMailbox()
);
await Promise.all(roundPromises);
// 检查是否有 agent 发出完成信号
const doneMessages = this.mailbox.filter(m => m.type === 'done');
if (doneMessages.length >= this.agents.size * 0.5) {
break; // 多数共识:完成
}
}
return this.synthesizeMailbox();
}
private broadcast(message: MailboxMessage) {
this.mailbox.push(message);
}
private synthesizeMailbox(): string {
return this.mailbox
.filter(m => m.type === 'response' || m.type === 'done')
.map(m => `[${m.from}]: ${m.content}`)
.join('\n\n');
}
}
class TeamAgent {
private lastReadIndex = 0;
constructor(
private id: string,
private role: string,
private specialization: string,
private mailbox: MailboxMessage[],
) {}
async processMailbox(): Promise<void> {
// 读取未读消息
const unread = this.mailbox.slice(this.lastReadIndex);
this.lastReadIndex = this.mailbox.length;
if (unread.length === 0) return;
// 使用角色专属 system prompt 的 LLM 生成响应
const response = await callLLM({
system: `You are ${this.role}, specializing in ${this.specialization}.
Read the team messages and contribute your expertise.
Reply to relevant messages or add new insights.`,
messages: [{
role: 'user',
content: unread.map(m => `[${m.from}] (${m.type}): ${m.content}`).join('\n'),
}],
});
// 将响应发布到 mailbox
this.mailbox.push({
from: this.id,
to: 'all',
type: 'response',
content: response.content,
timestamp: Date.now(),
});
}
}

这些模式与流行的 multi-agent 框架相比如何?

特性Claude CodeAutoGenCrewAILangGraph
Fork(并行)✅ 原生⚠️ 通过 GroupChat❌ 仅顺序✅ 通过分支
Coordinator✅ 原生✅ 通过 AssistantAgent✅ 通过 Manager✅ 通过状态图
Team/Swarm✅ 原生✅ GroupChat✅ Crew roles⚠️ 设置复杂
通信Mailbox + 直接GroupChat委托状态传递
Cache 复用✅ Prompt Cache❌ 无❌ 无❌ 无
隔离✅ Git worktree❌ 共享内存❌ 共享状态⚠️ 每节点状态
动态重规划✅ Coordinator 重规划✅ 嵌套对话⚠️ 手动✅ 条件边
实用最大 agent 数5-1010+5-7无硬限制
学习曲线中等
Token 效率✅ 最优(cache 复用)⚠️ 一般⚠️ 一般⚠️ 一般
graph TD
    START["任务结构是什么?"] --> Q1{"子任务是否<br/>独立?"}

    Q1 -->|"是,完全独立"| FORK["✅ 使用 Fork<br/>最大并行度<br/>最佳 cache 复用"]

    Q1 -->|"有一些依赖"| Q2{"需要动态<br/>重规划?"}

    Q2 -->|"不需要,固定计划"| Q3{"有几个<br/>依赖层级?"}
    Q3 -->|"1 层"| FORK2["✅ 使用 Fork<br/>分组顺序执行"]
    Q3 -->|"2 层以上"| COORD["✅ 使用 Coordinator<br/>管理依赖 DAG"]

    Q2 -->|"需要"| COORD

    Q1 -->|"深度交织"| Q4{"需要多元<br/>专业知识?"}
    Q4 -->|"需要"| TEAM["✅ 使用 Team<br/>对等协作"]
    Q4 -->|"不需要"| COORD2["✅ 使用 Coordinator<br/>带迭代"]

    style FORK fill:#4ade80
    style FORK2 fill:#4ade80
    style COORD fill:#60a5fa
    style COORD2 fill:#60a5fa
    style TEAM fill:#c084fc
场景推荐模式
独立审查 5 个文件Fork
跨多个文件实现一个功能Coordinator
调试复杂的跨系统问题Team
并行运行测试Fork
带依赖分析的重构Coordinator
前端 + 后端 + 安全视角的设计评审Team
为 10 种语言生成翻译Fork
多步骤数据流水线Coordinator
// ============================================
// 通用 Agent 编排器
// ============================================
type OrchestrationType = 'fork' | 'coordinator' | 'team';
interface OrchestrationConfig {
type: OrchestrationType;
maxAgents: number;
maxRounds: number;
timeout: number;
cacheReuse: boolean;
}
interface OrchestratorResult {
type: OrchestrationType;
results: AgentResult[];
totalTokens: number;
wallTime: number;
}
async function orchestrate(
goal: string,
context: AgentContext,
config: OrchestrationConfig,
): Promise<OrchestratorResult> {
const start = performance.now();
switch (config.type) {
case 'fork': {
// 第一步:分解为独立任务
const tasks = await decomposeTasks(goal, context);
// 第二步:全部并行运行
const results = await Promise.all(
tasks.slice(0, config.maxAgents).map(task =>
runSubAgent({
system: context.systemPrompt,
messages: [...context.messages, { role: 'user', content: task }],
maxTurns: config.maxRounds,
})
)
);
return {
type: 'fork',
results,
totalTokens: results.reduce((sum, r) => sum + r.tokensUsed, 0),
wallTime: performance.now() - start,
};
}
case 'coordinator': {
const coordinator = new CoordinatorPattern(context.llm, config.maxAgents);
const result = await coordinator.execute(goal);
return {
type: 'coordinator',
results: [{ finalMessage: result, tokensUsed: 0 }],
totalTokens: 0, // 内部跟踪
wallTime: performance.now() - start,
};
}
case 'team': {
const team = new TeamPattern();
// 根据目标添加专业化 agent
const roles = await identifyNeededRoles(goal, context);
roles.forEach(role => team.addAgent(role.id, role.name, role.specialization));
const result = await team.execute(goal, config.maxRounds);
return {
type: 'team',
results: [{ finalMessage: result, tokensUsed: 0 }],
totalTokens: 0,
wallTime: performance.now() - start,
};
}
}
}

演进路径:从简单开始,逐步扩展

Section titled “演进路径:从简单开始,逐步扩展”
graph LR
    S1["单 Agent<br/>1 个,顺序"] -->|"任务过大"| S2["Fork<br/>N 个,并行"]
    S2 -->|"需要协调"| S3["Coordinator<br/>1 个管理者 + N 个 worker"]
    S3 -->|"需要协作"| S4["Team<br/>N 个对等 + mailbox"]

    style S1 fill:#94a3b8
    style S2 fill:#4ade80
    style S3 fill:#60a5fa
    style S4 fill:#c084fc
反模式问题更好的做法
对独立任务使用 Team不必要的通信开销使用 Fork
对有依赖的任务使用 Fork竞态条件,输入缺失使用 Coordinator
Agent 数量过多(>10 个)指数级通信成本拆分为子组
没有每个 agent 的超时一个卡住的 agent 阻塞所有人每个 agent 的超时 + 回退
共享可变状态竞态条件不可变消息或隔离的 worktree
Coordinator 做所有实际工作违背初衷,形成瓶颈将实际工作委托给 worker