The Loop
agentic loop 是 Claude Code 的核心引擎。它将用户的单条消息转化为与 Claude API 的多步骤、使用 tool 的对话。本章深入剖析 src/query.ts 中的 while(true) 循环——整个代码库中最关键的约 1700 行代码。
Claude Code 的 agentic loop 分为两层:
| 层级 | 文件 | 职责 |
|---|---|---|
| 外层 | src/QueryEngine.ts | 会话生命周期、消息持久化、预算执行、SDK 消息 yield |
| 内层 | src/query.ts | while(true) 循环——API 调用、streaming、tool 分发、继续决策 |
外层(QueryEngine.submitMessage)是一个 AsyncGenerator,驱动内层(query())并对其输出做后处理以供 SDK 消费者使用。内层是实际 agentic 推理发生的地方。
graph TD
A[User Message] --> B[QueryEngine.submitMessage]
B --> C[processUserInput]
C --> D[query loop entry]
D --> E{while true}
E --> F[Message Build & Compact]
F --> G[API Call & Stream]
G --> H{needsFollowUp?}
H -- yes --> I[Tool Dispatch]
I --> J[Result Collection]
J --> K[Attachment Injection]
K --> L{maxTurns reached?}
L -- no --> E
L -- yes --> M[Return max_turns]
H -- no --> N{Stop Hooks}
N --> O[Return completed]
B --> P[Yield SDK Messages]
while(true) 模式
Section titled “while(true) 模式”核心循环位于 src/query.ts 的 queryLoop() 中。它是一个字面意义上的 while (true)——没有迭代计数,没有 for 循环边界。循环持续运行,直到某个显式 return 语句触发。
// src/query.ts — the core loop structureasync function* queryLoop( params: QueryParams, consumedCommandUuids: string[],): AsyncGenerator< StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage, Terminal> { const { systemPrompt, userContext, systemContext, canUseTool, ... } = params;
let state: State = { messages: params.messages, toolUseContext: params.toolUseContext, autoCompactTracking: undefined, maxOutputTokensRecoveryCount: 0, hasAttemptedReactiveCompact: false, turnCount: 1, transition: undefined, // ... };
// eslint-disable-next-line no-constant-condition while (true) { let { toolUseContext } = state; const { messages, turnCount, ... } = state;
// === STAGE 1-7 happen here ===
state = next; // transition to next iteration }}为何选择 while(true)?
Section titled “为何选择 while(true)?”该循环无法用有界迭代来表达,原因如下:
- 不可预测的 tool 调用链:模型在完成任务前可能调用 1 个或 50 个 tool。
- 恢复循环:触发
max_output_tokens→ 注入恢复消息 → 重新进入。每个 turn 最多可发生 3 次。 - 响应式 compaction:若 context 过大,执行 compact 并重试——但每次 turn 只做一次。
- Stop hooks:hook 可以阻断响应,注入错误消息并强制再次迭代。
- token budget 续行:启用时,若模型相对于其 token budget 提前停止,循环会自动继续。
每次迭代的 7 个阶段
Section titled “每次迭代的 7 个阶段”每次经过 while(true) 循环执行 7 个概念阶段:
阶段 1:消息构建与 context 准备
Section titled “阶段 1:消息构建与 context 准备”第一阶段为 API 调用准备消息数组,包含若干子步骤:
// 1a. Get messages after the last compact boundarylet messagesForQuery = [...getMessagesAfterCompactBoundary(messages)];
// 1b. Apply tool result budget (persist large results to disk)messagesForQuery = await applyToolResultBudget( messagesForQuery, toolUseContext.contentReplacementState, // ...);
// 1c. Apply snip compaction (HISTORY_SNIP feature)if (feature('HISTORY_SNIP')) { const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery); messagesForQuery = snipResult.messages; snipTokensFreed = snipResult.tokensFreed;}
// 1d. Apply microcompact (remove stale tool results)const microcompactResult = await deps.microcompact( messagesForQuery, toolUseContext, querySource);messagesForQuery = microcompactResult.messages;
// 1e. Apply context collapse (if enabled)if (feature('CONTEXT_COLLAPSE') && contextCollapse) { const collapseResult = await contextCollapse.applyCollapsesIfNeeded( messagesForQuery, toolUseContext, querySource ); messagesForQuery = collapseResult.messages;}
// 1f. Build the full system promptconst fullSystemPrompt = asSystemPrompt( appendSystemContext(systemPrompt, systemContext));
// 1g. Auto-compact if neededconst { compactionResult } = await deps.autocompact( messagesForQuery, toolUseContext, /* ... */);消息准备 pipeline 有严格的顺序:tool result budget → snip → microcompact → context collapse → autocompact。每个阶段都可以减少 token 数量,从而可能阻止下一阶段触发。
阶段 2:API 调用配置
Section titled “阶段 2:API 调用配置”消息准备完毕后,循环配置并发起 API 请求:
for await (const message of deps.callModel({ messages: prependUserContext(messagesForQuery, userContext), systemPrompt: fullSystemPrompt, thinkingConfig: toolUseContext.options.thinkingConfig, tools: toolUseContext.options.tools, signal: toolUseContext.abortController.signal, options: { model: currentModel, fallbackModel, querySource, maxOutputTokensOverride, taskBudget: params.taskBudget && { /* ... */ }, // ... },})) { // Stage 3: Stream processing}deps.callModel 默认为 src/services/api/claude.ts 中的 queryModelWithStreaming,它封装了 Anthropic SDK 的 messages.stream()。
阶段 3:stream 处理
Section titled “阶段 3:stream 处理”随着 API stream 传入内容块,循环逐一处理:
for await (const message of deps.callModel({ /* ... */ })) { // Handle streaming fallback (model switch mid-stream) if (streamingFallbackOccured) { assistantMessages.length = 0; toolResults.length = 0; // Reset and retry with fallback model }
// Withhold recoverable errors (prompt-too-long, max-output-tokens) let withheld = false; if (reactiveCompact?.isWithheldPromptTooLong(message)) withheld = true; if (isWithheldMaxOutputTokens(message)) withheld = true; if (!withheld) yield yieldMessage;
// Track assistant messages and detect tool_use blocks if (message.type === 'assistant') { assistantMessages.push(message); const msgToolUseBlocks = message.message.content.filter( content => content.type === 'tool_use' ); if (msgToolUseBlocks.length > 0) { toolUseBlocks.push(...msgToolUseBlocks); needsFollowUp = true; }
// Feed tool blocks to StreamingToolExecutor for parallel execution if (streamingToolExecutor) { for (const toolBlock of msgToolUseBlocks) { streamingToolExecutor.addTool(toolBlock, message); } } }}关键洞察:tool 执行可以在 API 仍在 streaming 时启动。StreamingToolExecutor 在 tool 块到达时立即接收并开始执行并发安全的 tool。
阶段 4:tool 分发
Section titled “阶段 4:tool 分发”API 响应完成后,收集剩余的 tool 结果:
const toolUpdates = streamingToolExecutor ? streamingToolExecutor.getRemainingResults() : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext);
for await (const update of toolUpdates) { if (update.message) { yield update.message; toolResults.push( ...normalizeMessagesForAPI([update.message], tools).filter(_ => _.type === 'user') ); } if (update.newContext) { updatedToolUseContext = { ...update.newContext, queryTracking }; }}存在两条执行路径:
- StreamingToolExecutor(默认):tool 在其
tool_use块 stream 进来后立即开始执行。并发安全的 tool 并行运行。 - runTools(回退):来自
toolOrchestration.ts的传统顺序/批量执行。
阶段 5:结果收集与附件
Section titled “阶段 5:结果收集与附件”tool 完成后,循环收集需要注入的额外 context:
// Get queued commands (btw messages, task notifications)const queuedCommandsSnapshot = getCommandsByMaxPriority( sleepRan ? 'later' : 'next');
// Inject file change attachments, memory attachments, skill discoveryfor await (const attachment of getAttachmentMessages( null, updatedToolUseContext, null, queuedCommandsSnapshot, [...messagesForQuery, ...assistantMessages, ...toolResults], querySource,)) { yield attachment; toolResults.push(attachment);}
// Memory prefetch consumeif (pendingMemoryPrefetch?.settledAt !== null) { const memoryAttachments = filterDuplicateMemoryAttachments( await pendingMemoryPrefetch.promise, toolUseContext.readFileState, ); // yield memory attachments}阶段 6:继续决策
Section titled “阶段 6:继续决策”这是关键的 fork 点。所有 tool 结果收集完毕后:
if (!needsFollowUp) { // No tool_use blocks → model is done (or errored)
// Handle recoverable errors: prompt-too-long, max-output-tokens // Handle stop hooks // Handle token budget continuation
return { reason: 'completed' };}
// Tool results exist → check limits and continueconst nextTurnCount = turnCount + 1;if (maxTurns && nextTurnCount > maxTurns) { yield createAttachmentMessage({ type: 'max_turns_reached', maxTurns, turnCount: nextTurnCount }); return { reason: 'max_turns', turnCount: nextTurnCount };}
// Prepare next iterationstate = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], toolUseContext: toolUseContextWithQueryTracking, turnCount: nextTurnCount, transition: { reason: 'next_turn' }, // ...};// implicit continue → back to while(true)阶段 7:状态转换
Section titled “阶段 7:状态转换”循环体通过写入新的 State 对象并落入下一次 while(true) 迭代来结束。State 类型捕获迭代间所有变化的内容:
type State = { messages: Message[]; toolUseContext: ToolUseContext; autoCompactTracking: AutoCompactTrackingState | undefined; maxOutputTokensRecoveryCount: number; hasAttemptedReactiveCompact: boolean; maxOutputTokensOverride: number | undefined; pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined; stopHookActive: boolean | undefined; turnCount: number; transition: Continue | undefined; // why we continued};transition 字段记录了循环为何继续——对调试和测试极为宝贵:
| 转换原因 | 含义 |
|---|---|
next_turn | 正常:tool 结果需要后续 API 调用 |
reactive_compact_retry | prompt 过长,已 compact 并重试 |
collapse_drain_retry | context collapse 排空了暂存的折叠 |
max_output_tokens_recovery | 触达输出 token 上限,注入恢复消息 |
max_output_tokens_escalate | 从 8K 提升至 64K 最大输出 token |
stop_hook_blocking | stop hook 注入了阻断错误 |
token_budget_continuation | 模型提前停止,budget 指示继续 |
AsyncGenerator yield 点
Section titled “AsyncGenerator yield 点”queryLoop 函数是一个 AsyncGenerator,这意味着它在消息可用时即 yield 给调用方。这是一个关键设计选择,它允许:
- 实时 streaming:SDK 消费者在内容 stream 进来时即可看到助手文本,而无需等待完整 turn。
- 背压:调用方控制消费速度。若 SDK 消费者处理较慢,generator 会暂停。
- 提前终止:调用方可以对 generator 调用
.return()以在任意时刻中止循环。
关键 yield 点:
yield { type: 'stream_request_start' }; // Turn boundary markeryield yieldMessage; // Streamed content blocksyield result; // Completed tool resultsyield attachment; // Context attachmentsyield createAttachmentMessage({ type: 'max_turns_reached' }); // Limit hit外层 QueryEngine.submitMessage 消费这些 yield 并将其转换为 SDK 输出:
for await (const message of query({ messages, systemPrompt, userContext, systemContext, canUseTool, toolUseContext, /* ... */})) { switch (message.type) { case 'assistant': this.mutableMessages.push(message); yield* normalizeMessage(message); break; case 'stream_event': if (message.event.type === 'message_start') { currentMessageUsage = updateUsage(currentMessageUsage, message.event.message.usage); } if (message.event.type === 'message_stop') { this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage); } break; // ... }
// Check USD budget after each message if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) { yield { type: 'result', subtype: 'error_max_budget_usd', /* ... */ }; return; }}跨迭代的状态管理
Section titled “跨迭代的状态管理”状态通过一个可变的 State 对象管理,该对象在每个继续点被完整替换。循环中共有 7 个继续点,每个都构建一个全新的 State:
graph LR
A[next_turn] --> S[State]
B[reactive_compact_retry] --> S
C[collapse_drain_retry] --> S
D[max_output_tokens_recovery] --> S
E[max_output_tokens_escalate] --> S
F[stop_hook_blocking] --> S
G[token_budget_continuation] --> S
每个继续点都显式构建完整状态,确保不同恢复路径之间不会有陈旧值泄漏。例如,reactive_compact_retry 将 autoCompactTracking 重置为 undefined,但保留 hasAttemptedReactiveCompact: true 以防止无限重试循环。
错误恢复路径
Section titled “错误恢复路径”循环在控制流中内置了复杂的错误恢复机制:
Prompt 过长(413)
Section titled “Prompt 过长(413)”1. API 返回 prompt-too-long 错误2. 错误从 SDK stream 中被 WITHHELD(扣押)3. 尝试 context-collapse drain(成本低,保留细粒度 context)4. 若仍失败,尝试 reactive compact(完整摘要)5. 若两者均失败,暴露被扣押的错误并返回最大输出 token
Section titled “最大输出 token”1. API 返回 max_output_tokens 停止原因2. 错误从 SDK stream 中被 WITHHELD(扣押)3. 首先:尝试从 8K 提升至 64K(单次重试,无用户消息)4. 若仍触达上限:注入"Resume directly — no recap"消息5. 最多允许 3 次恢复尝试6. 若耗尽,暴露被扣押的错误streaming 回退
Section titled “streaming 回退”1. API stream 在响应中途失败2. 捕获 FallbackTriggeredError3. 将孤立消息标记为 tombstone(无效的 thinking 签名)4. 切换至回退模型5. 重试整个请求将所有内容整合,以下是循环中单次迭代的完整生命周期:
sequenceDiagram
participant L as Loop Entry
participant P as Message Prep
participant A as API Call
participant S as Stream
participant T as Tool Dispatch
participant R as Result Collection
participant D as Continue Decision
L->>P: Destructure state
P->>P: Tool result budget
P->>P: Snip compaction
P->>P: Microcompact
P->>P: Context collapse
P->>P: Auto-compact
P->>A: Configure API params
A->>S: Stream response
S->>S: Yield content blocks
S->>T: Feed tool_use to StreamingToolExecutor
S-->>S: Collect completed results during stream
Note over S: Stream ends
T->>T: Await remaining tool results
T->>R: Yield tool results
R->>R: Inject attachments
R->>R: Memory prefetch consume
R->>D: Check needsFollowUp
alt No tool use
D->>D: Run stop hooks
D-->>L: return {reason: 'completed'}
else Has tool results
D->>D: Check maxTurns
D-->>L: state = next; continue
end
循环在运行过程中维护若干不变量:
-
每个
tool_use都有对应的tool_result:即使在中止时,也会生成合成的错误tool_result块。API 要求成对匹配。 -
消息在单次迭代内只追加不修改:
messagesForQuery在每次迭代时全新构建,但在迭代内部,消息只通过push增长。 -
每次迭代最多执行一次 compaction:
hasAttemptedReactiveCompact标志防止无限的 compact→重试→compact 循环。 -
stop hook 在每个终止位置最多运行一次:
stopHookActive标志防止在 hook 注入阻断错误后的重试中重复触发。 -
预算检查在 yield 之后:
QueryEngine在每条 yield 消息后检查maxBudgetUsd,而不是在循环内部。这将预算执行集中在一处。
| 指标 | 典型值 | 备注 |
|---|---|---|
| 每条用户消息的迭代次数 | 2-15 | 取决于任务复杂度 |
| 每次迭代耗时 | 2-30s | 主要受 API 延迟影响 |
| tool 与 streaming 的重叠度 | 40-80% | StreamingToolExecutor 在 stream 期间启动 tool |
| compact 频率 | 每 5-20 个 turn | 取决于 context window 使用情况 |
| 每次迭代内存占用 | ~2-5MB | 消息数组是主要开销 |
streaming tool executor 是循环中最大的性能优化。通过在 API 仍在 streaming 时启动 tool 执行,它可以在多 tool 响应的每次迭代中节省 1-5 秒。