跳转到内容

Pattern: Async Generator Loop

Async Generator Loop 是 Claude Code agentic 架构的核心心跳。Claude Code 没有使用传统的递归、事件发射器或有限状态机,而是将核心 agent loop 实现为一个 async generator 函数 —— 一个在跨越异步边界时能保持执行 context、并不断 yield 中间结果的函数。

// 基本模式:一个在每一步都 yield 的 async generator
async function* agenticLoop(
messages: Message[],
tools: Tool[],
options: LoopOptions
): AsyncGenerator<AgentEvent> {
while (true) {
// 1. 调用 LLM
const response = yield* streamCompletion(messages, tools);
// 2. yield 助手回复以供观察
yield { type: 'assistant_message', content: response };
// 3. 检查停止条件
if (response.stopReason === 'end_turn' && !hasToolUse(response)) {
return; // generator 自然结束
}
// 4. 执行 tool,逐一 yield 结果
for (const toolUse of response.toolCalls) {
const result = yield* executeToolWithProgress(toolUse);
yield { type: 'tool_result', toolUse, result };
messages.push(toToolResultMessage(toolUse, result));
}
// 5. 追加助手消息并继续循环
messages.push(toAssistantMessage(response));
}
}
维度递归事件发射器状态机Async Generator
栈溢出风险❌ 有✅ 无✅ 无✅ 无
线性可读性✅ 好❌ 差❌ 差极好
可暂停❌ 否❌ 困难✅ 是原生支持
可恢复❌ 否❌ 困难✅ 是原生支持
Backpressure❌ 否❌ 否⚠️ 手动内置
可观察❌ 否✅ 是⚠️ 手动每次 yield
可取消❌ 困难⚠️ 手动⚠️ 手动.return()
每步内存📈 增长✅ 恒定✅ 恒定恒定

朴素的递归 agent loop 在每次迭代时都会累积栈帧:

// ❌ 递归:每轮都增长栈
async function recursiveLoop(messages: Message[]): Promise<void> {
const response = await callLLM(messages);
messages.push(response);
if (hasToolUse(response)) {
const results = await executeTools(response.toolCalls);
messages.push(...results);
return recursiveLoop(messages); // 栈帧累积
}
// 即使有尾调用优化(JS 不保证),
// 也无法观察中间状态
}

对于需要 50+ 次 tool 调用的深度研究任务,这存在栈溢出风险。更关键的是,没有观察点 —— 调用方无法看到执行过程中发生了什么。

事件发射器将观察与执行解耦,但引入了”意大利面回调”问题:

// ❌ 事件发射器:控制流变得隐式
class AgentLoop extends EventEmitter {
async run(messages: Message[]) {
const response = await callLLM(messages);
this.emit('response', response); // 谁来处理?
this.emit('tool_start', response); // 什么顺序?
// ... 状态分散在各监听器中
this.emit('tool_end', results); // 竞态条件?
}
}

执行流现在被拆散到多个监听器注册中,很难推理顺序、错误处理和取消逻辑。

显式状态机是正确的,但过于冗长:

// ❌ 状态机:正确但难以阅读
type State = 'idle' | 'calling_llm' | 'executing_tools' | 'checking_stop' | 'done';
class AgentStateMachine {
state: State = 'idle';
async transition() {
switch (this.state) {
case 'idle':
this.state = 'calling_llm';
break;
case 'calling_llm':
this.response = await callLLM(this.messages);
this.state = 'executing_tools';
break;
case 'executing_tools':
// 还有 20 行状态转换逻辑...
this.state = 'checking_stop';
break;
// ...以此类推
}
}
}

每增加一个步骤,就要添加一个状态和两个转换。业务逻辑淹没在状态管理的样板代码中。

在 Claude Code 源码中,核心 loop 位于 processConversationTurn 区域。以下是提炼出的模式:

// 简化自 Claude Code 的实际架构
async function* processConversation(
apiClient: ApiClient,
messages: Message[],
systemPrompt: string,
tools: Tool[],
abortSignal: AbortSignal,
): AsyncGenerator<StreamEvent> {
let continueLoop = true;
while (continueLoop) {
// 阶段 1:stream API 响应
const streamEvents = apiClient.stream({
system: systemPrompt,
messages,
tools: tools.map(t => t.schema),
});
const assistantBlocks: ContentBlock[] = [];
for await (const event of streamEvents) {
// yield 每个 streaming token 以实时展示
yield { type: 'stream', event };
if (event.type === 'content_block') {
assistantBlocks.push(event.block);
}
}
// 阶段 2:处理 tool 调用
const toolUseBlocks = assistantBlocks.filter(isToolUse);
if (toolUseBlocks.length === 0) {
continueLoop = false;
yield { type: 'turn_complete' };
break;
}
// 阶段 3:逐一 yield 执行 tool
const toolResults: ToolResult[] = [];
for (const toolUse of toolUseBlocks) {
yield { type: 'tool_start', tool: toolUse.name };
// 在 tool 执行间隙检查是否中止
if (abortSignal.aborted) {
yield { type: 'aborted' };
return;
}
const result = await executeTool(toolUse, tools);
toolResults.push(result);
yield { type: 'tool_end', tool: toolUse.name, result };
}
// 阶段 4:追加到对话并继续
messages.push({ role: 'assistant', content: assistantBlocks });
messages.push({ role: 'user', content: toolResults.map(toToolResultBlock) });
}
}

generator 的魔力在看到消费者时才真正显现:

// 消费者控制节奏
async function runAgent(config: AgentConfig) {
const loop = processConversation(
config.apiClient,
config.messages,
config.systemPrompt,
config.tools,
config.abortController.signal,
);
for await (const event of loop) {
// 每次迭代从 generator 拉取下一个值。
// 直到我们请求下一个值,generator 才会继续执行。
switch (event.type) {
case 'stream':
renderToTerminal(event); // 实时更新 UI
break;
case 'tool_start':
if (needsPermission(event.tool)) {
const allowed = await askUser(`Allow ${event.tool}?`);
if (!allowed) {
loop.return(undefined); // 优雅取消
return;
}
}
showSpinner(event.tool);
break;
case 'tool_end':
hideSpinner();
logToolResult(event.result);
break;
case 'turn_complete':
showFinalResponse();
break;
case 'aborted':
showAbortMessage();
return;
}
}
}
graph TD
    A[取消请求] --> B{策略}
    B -->|协作式| C["abortSignal.aborted 检查<br/>在 tool 执行间隙"]
    B -->|立即| D["generator.return()<br/>从消费者侧调用"]
    B -->|超时| E["AbortSignal.timeout(ms)<br/>传给 API client"]

    C --> F[干净关闭<br/>保留部分结果]
    D --> G[generator 完成收尾<br/>finally 块执行]
    E --> H[API 调用中止<br/>释放网络资源]
// 策略 1:协作式(在 loop 中检查 abort signal)
if (abortSignal.aborted) {
yield { type: 'aborted' };
return;
}
// 策略 2:立即(消费者调用 .return())
const generator = agenticLoop(messages, tools);
// ... 之后:
generator.return(undefined); // generator 的 finally 块执行
// 策略 3:超时(内置于 AbortSignal)
const signal = AbortSignal.timeout(300_000); // 5 分钟超时
const generator = agenticLoop(messages, tools, { signal });

以下是可直接使用的生产就绪模板:

// ============================================
// 可复用 Agentic Loop 模板
// ============================================
interface AgentEvent {
type: 'thinking' | 'response' | 'tool_call' | 'tool_result' | 'done' | 'error';
data: unknown;
}
interface AgentConfig {
maxTurns: number;
tools: Map<string, ToolHandler>;
shouldContinue: (response: LLMResponse) => boolean;
onBeforeToolCall?: (call: ToolCall) => Promise<boolean>;
}
async function* createAgentLoop(
llm: LLMClient,
initialMessages: Message[],
config: AgentConfig,
): AsyncGenerator<AgentEvent> {
const messages = [...initialMessages];
let turns = 0;
try {
while (turns < config.maxTurns) {
turns++;
// 调用 LLM
yield { type: 'thinking', data: { turn: turns } };
const response = await llm.complete(messages);
yield { type: 'response', data: response };
// 检查是否应该停止
if (!config.shouldContinue(response)) {
yield { type: 'done', data: { turns, reason: 'stop_condition' } };
return;
}
// 执行 tool 调用
for (const call of response.toolCalls ?? []) {
// 执行前 hook(permission 检查等)
if (config.onBeforeToolCall) {
const allowed = await config.onBeforeToolCall(call);
if (!allowed) {
yield { type: 'done', data: { turns, reason: 'user_denied' } };
return;
}
}
yield { type: 'tool_call', data: call };
const handler = config.tools.get(call.name);
if (!handler) {
const error = `Unknown tool: ${call.name}`;
messages.push(toolErrorMessage(call.id, error));
yield { type: 'error', data: { call, error } };
continue;
}
const result = await handler(call.input);
messages.push(toolResultMessage(call.id, result));
yield { type: 'tool_result', data: { call, result } };
}
// 追加助手消息供下一轮使用
messages.push(assistantMessage(response));
}
yield { type: 'done', data: { turns, reason: 'max_turns' } };
} finally {
// 即使调用 .return() 或抛出错误也会执行清理
console.log(`Agent loop finished after ${turns} turns`);
}
}

Claude Code 使用 yield* 来组合 generator,形成模块化的流水线:

// 父 generator 委托给子 generator
async function* outerLoop(): AsyncGenerator<AgentEvent> {
// yield* 透明地转发内部 generator 的所有 yield
yield* innerStreamPhase();
yield* innerToolPhase();
yield* innerCompactionPhase();
}
// 每个阶段都是独立的 generator
async function* innerStreamPhase(): AsyncGenerator<AgentEvent> {
for await (const token of apiStream) {
yield { type: 'stream_token', data: token };
}
}
async function* innerToolPhase(): AsyncGenerator<AgentEvent> {
for (const tool of pendingTools) {
yield { type: 'tool_start', data: tool };
const result = await tool.execute();
yield { type: 'tool_end', data: result };
}
}

这种 yield* 委托创建了可组合的流水线,同时保持了扁平的控制流。

特性Async GeneratorRxJS Observable
学习曲线低(原生 JS)高(100+ 操作符)
包体积0 KB(内置)~30 KB min
Backpressure自动(拉取式)手动(需要策略)
取消.return() / AbortSignalsubscription.unsubscribe()
组合yield* 委托pipe() + 操作符
Hot/Cold始终 cold(惰性)两者均支持
错误处理try/catch.pipe(catchError(...))
多播手动(需要 tee)内置 subject
时间操作符手动debouncethrottle
最适合顺序异步工作流复杂事件 stream

AI Agent Loop

任何 LLM 驱动的 loop,需要观察每一步、检查 permission,以及潜在的暂停/恢复。

ETL 流水线

提取 → 转换 → 加载工作流,每个阶段 yield 进度并支持取消。

长时运行任务

构建系统、测试运行器或部署流水线,需要实时进度上报。

交互式工作流

任何在自动化步骤与用户输入/审批之间交替的流程。

// ❌ 不要缓冲所有结果再最后 yield
async function* badGenerator() {
const allResults = [];
for (const item of items) {
allResults.push(await process(item));
}
yield allResults; // 这违背了 streaming 的目的
}
// ✅ 每个结果产出时立即 yield
async function* goodGenerator() {
for (const item of items) {
yield await process(item); // 消费者立即看到每个结果
}
}
// ❌ 不要在 generator 内部静默吞噬错误
async function* badErrorHandling() {
try {
yield await riskyOperation();
} catch (e) {
// 静默吞噬 —— 消费者不知道发生了错误
}
}
// ✅ 将错误作为事件 yield 或重新抛出
async function* goodErrorHandling() {
try {
yield await riskyOperation();
} catch (e) {
yield { type: 'error', error: e }; // 消费者可以决定如何处理
}
}

Async Generator Loop pattern 是 Claude Code 最根本的架构决策,它提供了:

  1. 线性可读性 —— 复杂的异步工作流从上到下阅读
  2. 天然可观察 —— 每个 yield 都是观察 loop 状态的窗口
  3. 内置 backpressure —— 消费者控制节奏
  4. 可组合性 —— yield* 使模块化子 generator 成为可能
  5. 优雅取消 —— .return() 与 AbortSignal 协同工作
  6. 零依赖 —— 原生 JavaScript,无需任何库

这一单一的模式选择消除了整类 bug(栈溢出、事件丢失、竞态条件),同时保持了对任何 TypeScript 开发者都友好的代码可读性。