AI Agent Loop
任何 LLM 驱动的 loop,需要观察每一步、检查 permission,以及潜在的暂停/恢复。
Async Generator Loop 是 Claude Code agentic 架构的核心心跳。Claude Code 没有使用传统的递归、事件发射器或有限状态机,而是将核心 agent loop 实现为一个 async generator 函数 —— 一个在跨越异步边界时能保持执行 context、并不断 yield 中间结果的函数。
// 基本模式:一个在每一步都 yield 的 async generatorasync function* agenticLoop( messages: Message[], tools: Tool[], options: LoopOptions): AsyncGenerator<AgentEvent> { while (true) { // 1. 调用 LLM const response = yield* streamCompletion(messages, tools);
// 2. yield 助手回复以供观察 yield { type: 'assistant_message', content: response };
// 3. 检查停止条件 if (response.stopReason === 'end_turn' && !hasToolUse(response)) { return; // generator 自然结束 }
// 4. 执行 tool,逐一 yield 结果 for (const toolUse of response.toolCalls) { const result = yield* executeToolWithProgress(toolUse); yield { type: 'tool_result', toolUse, result }; messages.push(toToolResultMessage(toolUse, result)); }
// 5. 追加助手消息并继续循环 messages.push(toAssistantMessage(response)); }}| 维度 | 递归 | 事件发射器 | 状态机 | Async Generator |
|---|---|---|---|---|
| 栈溢出风险 | ❌ 有 | ✅ 无 | ✅ 无 | ✅ 无 |
| 线性可读性 | ✅ 好 | ❌ 差 | ❌ 差 | ✅ 极好 |
| 可暂停 | ❌ 否 | ❌ 困难 | ✅ 是 | ✅ 原生支持 |
| 可恢复 | ❌ 否 | ❌ 困难 | ✅ 是 | ✅ 原生支持 |
| Backpressure | ❌ 否 | ❌ 否 | ⚠️ 手动 | ✅ 内置 |
| 可观察 | ❌ 否 | ✅ 是 | ⚠️ 手动 | ✅ 每次 yield |
| 可取消 | ❌ 困难 | ⚠️ 手动 | ⚠️ 手动 | ✅ .return() |
| 每步内存 | 📈 增长 | ✅ 恒定 | ✅ 恒定 | ✅ 恒定 |
朴素的递归 agent loop 在每次迭代时都会累积栈帧:
// ❌ 递归:每轮都增长栈async function recursiveLoop(messages: Message[]): Promise<void> { const response = await callLLM(messages); messages.push(response);
if (hasToolUse(response)) { const results = await executeTools(response.toolCalls); messages.push(...results); return recursiveLoop(messages); // 栈帧累积 } // 即使有尾调用优化(JS 不保证), // 也无法观察中间状态}对于需要 50+ 次 tool 调用的深度研究任务,这存在栈溢出风险。更关键的是,没有观察点 —— 调用方无法看到执行过程中发生了什么。
事件发射器将观察与执行解耦,但引入了”意大利面回调”问题:
// ❌ 事件发射器:控制流变得隐式class AgentLoop extends EventEmitter { async run(messages: Message[]) { const response = await callLLM(messages); this.emit('response', response); // 谁来处理? this.emit('tool_start', response); // 什么顺序? // ... 状态分散在各监听器中 this.emit('tool_end', results); // 竞态条件? }}执行流现在被拆散到多个监听器注册中,很难推理顺序、错误处理和取消逻辑。
显式状态机是正确的,但过于冗长:
// ❌ 状态机:正确但难以阅读type State = 'idle' | 'calling_llm' | 'executing_tools' | 'checking_stop' | 'done';
class AgentStateMachine { state: State = 'idle';
async transition() { switch (this.state) { case 'idle': this.state = 'calling_llm'; break; case 'calling_llm': this.response = await callLLM(this.messages); this.state = 'executing_tools'; break; case 'executing_tools': // 还有 20 行状态转换逻辑... this.state = 'checking_stop'; break; // ...以此类推 } }}每增加一个步骤,就要添加一个状态和两个转换。业务逻辑淹没在状态管理的样板代码中。
在 Claude Code 源码中,核心 loop 位于 processConversationTurn 区域。以下是提炼出的模式:
// 简化自 Claude Code 的实际架构async function* processConversation( apiClient: ApiClient, messages: Message[], systemPrompt: string, tools: Tool[], abortSignal: AbortSignal,): AsyncGenerator<StreamEvent> { let continueLoop = true;
while (continueLoop) { // 阶段 1:stream API 响应 const streamEvents = apiClient.stream({ system: systemPrompt, messages, tools: tools.map(t => t.schema), });
const assistantBlocks: ContentBlock[] = [];
for await (const event of streamEvents) { // yield 每个 streaming token 以实时展示 yield { type: 'stream', event };
if (event.type === 'content_block') { assistantBlocks.push(event.block); } }
// 阶段 2:处理 tool 调用 const toolUseBlocks = assistantBlocks.filter(isToolUse);
if (toolUseBlocks.length === 0) { continueLoop = false; yield { type: 'turn_complete' }; break; }
// 阶段 3:逐一 yield 执行 tool const toolResults: ToolResult[] = [];
for (const toolUse of toolUseBlocks) { yield { type: 'tool_start', tool: toolUse.name };
// 在 tool 执行间隙检查是否中止 if (abortSignal.aborted) { yield { type: 'aborted' }; return; }
const result = await executeTool(toolUse, tools); toolResults.push(result);
yield { type: 'tool_end', tool: toolUse.name, result }; }
// 阶段 4:追加到对话并继续 messages.push({ role: 'assistant', content: assistantBlocks }); messages.push({ role: 'user', content: toolResults.map(toToolResultBlock) }); }}generator 的魔力在看到消费者时才真正显现:
// 消费者控制节奏async function runAgent(config: AgentConfig) { const loop = processConversation( config.apiClient, config.messages, config.systemPrompt, config.tools, config.abortController.signal, );
for await (const event of loop) { // 每次迭代从 generator 拉取下一个值。 // 直到我们请求下一个值,generator 才会继续执行。
switch (event.type) { case 'stream': renderToTerminal(event); // 实时更新 UI break;
case 'tool_start': if (needsPermission(event.tool)) { const allowed = await askUser(`Allow ${event.tool}?`); if (!allowed) { loop.return(undefined); // 优雅取消 return; } } showSpinner(event.tool); break;
case 'tool_end': hideSpinner(); logToolResult(event.result); break;
case 'turn_complete': showFinalResponse(); break;
case 'aborted': showAbortMessage(); return; } }}graph TD
A[取消请求] --> B{策略}
B -->|协作式| C["abortSignal.aborted 检查<br/>在 tool 执行间隙"]
B -->|立即| D["generator.return()<br/>从消费者侧调用"]
B -->|超时| E["AbortSignal.timeout(ms)<br/>传给 API client"]
C --> F[干净关闭<br/>保留部分结果]
D --> G[generator 完成收尾<br/>finally 块执行]
E --> H[API 调用中止<br/>释放网络资源]
// 策略 1:协作式(在 loop 中检查 abort signal)if (abortSignal.aborted) { yield { type: 'aborted' }; return;}
// 策略 2:立即(消费者调用 .return())const generator = agenticLoop(messages, tools);// ... 之后:generator.return(undefined); // generator 的 finally 块执行
// 策略 3:超时(内置于 AbortSignal)const signal = AbortSignal.timeout(300_000); // 5 分钟超时const generator = agenticLoop(messages, tools, { signal });以下是可直接使用的生产就绪模板:
// ============================================// 可复用 Agentic Loop 模板// ============================================
interface AgentEvent { type: 'thinking' | 'response' | 'tool_call' | 'tool_result' | 'done' | 'error'; data: unknown;}
interface AgentConfig { maxTurns: number; tools: Map<string, ToolHandler>; shouldContinue: (response: LLMResponse) => boolean; onBeforeToolCall?: (call: ToolCall) => Promise<boolean>;}
async function* createAgentLoop( llm: LLMClient, initialMessages: Message[], config: AgentConfig,): AsyncGenerator<AgentEvent> { const messages = [...initialMessages]; let turns = 0;
try { while (turns < config.maxTurns) { turns++;
// 调用 LLM yield { type: 'thinking', data: { turn: turns } }; const response = await llm.complete(messages); yield { type: 'response', data: response };
// 检查是否应该停止 if (!config.shouldContinue(response)) { yield { type: 'done', data: { turns, reason: 'stop_condition' } }; return; }
// 执行 tool 调用 for (const call of response.toolCalls ?? []) { // 执行前 hook(permission 检查等) if (config.onBeforeToolCall) { const allowed = await config.onBeforeToolCall(call); if (!allowed) { yield { type: 'done', data: { turns, reason: 'user_denied' } }; return; } }
yield { type: 'tool_call', data: call };
const handler = config.tools.get(call.name); if (!handler) { const error = `Unknown tool: ${call.name}`; messages.push(toolErrorMessage(call.id, error)); yield { type: 'error', data: { call, error } }; continue; }
const result = await handler(call.input); messages.push(toolResultMessage(call.id, result)); yield { type: 'tool_result', data: { call, result } }; }
// 追加助手消息供下一轮使用 messages.push(assistantMessage(response)); }
yield { type: 'done', data: { turns, reason: 'max_turns' } }; } finally { // 即使调用 .return() 或抛出错误也会执行清理 console.log(`Agent loop finished after ${turns} turns`); }}yield* 组合嵌套 GeneratorClaude Code 使用 yield* 来组合 generator,形成模块化的流水线:
// 父 generator 委托给子 generatorasync function* outerLoop(): AsyncGenerator<AgentEvent> { // yield* 透明地转发内部 generator 的所有 yield yield* innerStreamPhase(); yield* innerToolPhase(); yield* innerCompactionPhase();}
// 每个阶段都是独立的 generatorasync function* innerStreamPhase(): AsyncGenerator<AgentEvent> { for await (const token of apiStream) { yield { type: 'stream_token', data: token }; }}
async function* innerToolPhase(): AsyncGenerator<AgentEvent> { for (const tool of pendingTools) { yield { type: 'tool_start', data: tool }; const result = await tool.execute(); yield { type: 'tool_end', data: result }; }}这种 yield* 委托创建了可组合的流水线,同时保持了扁平的控制流。
| 特性 | Async Generator | RxJS Observable |
|---|---|---|
| 学习曲线 | 低(原生 JS) | 高(100+ 操作符) |
| 包体积 | 0 KB(内置) | ~30 KB min |
| Backpressure | 自动(拉取式) | 手动(需要策略) |
| 取消 | .return() / AbortSignal | subscription.unsubscribe() |
| 组合 | yield* 委托 | pipe() + 操作符 |
| Hot/Cold | 始终 cold(惰性) | 两者均支持 |
| 错误处理 | try/catch | .pipe(catchError(...)) |
| 多播 | 手动(需要 tee) | 内置 subject |
| 时间操作符 | 手动 | debounce、throttle 等 |
| 最适合 | 顺序异步工作流 | 复杂事件 stream |
AI Agent Loop
任何 LLM 驱动的 loop,需要观察每一步、检查 permission,以及潜在的暂停/恢复。
ETL 流水线
提取 → 转换 → 加载工作流,每个阶段 yield 进度并支持取消。
长时运行任务
构建系统、测试运行器或部署流水线,需要实时进度上报。
交互式工作流
任何在自动化步骤与用户输入/审批之间交替的流程。
// ❌ 不要缓冲所有结果再最后 yieldasync function* badGenerator() { const allResults = []; for (const item of items) { allResults.push(await process(item)); } yield allResults; // 这违背了 streaming 的目的}
// ✅ 每个结果产出时立即 yieldasync function* goodGenerator() { for (const item of items) { yield await process(item); // 消费者立即看到每个结果 }}
// ❌ 不要在 generator 内部静默吞噬错误async function* badErrorHandling() { try { yield await riskyOperation(); } catch (e) { // 静默吞噬 —— 消费者不知道发生了错误 }}
// ✅ 将错误作为事件 yield 或重新抛出async function* goodErrorHandling() { try { yield await riskyOperation(); } catch (e) { yield { type: 'error', error: e }; // 消费者可以决定如何处理 }}Async Generator Loop pattern 是 Claude Code 最根本的架构决策,它提供了:
yield 都是观察 loop 状态的窗口yield* 使模块化子 generator 成为可能.return() 与 AbortSignal 协同工作这一单一的模式选择消除了整类 bug(栈溢出、事件丢失、竞态条件),同时保持了对任何 TypeScript 开发者都友好的代码可读性。