跳转到内容

The Loop

agentic loop 是 Claude Code 的核心引擎。它将用户的单条消息转化为与 Claude API 的多步骤、使用 tool 的对话。本章深入剖析 src/query.ts 中的 while(true) 循环——整个代码库中最关键的约 1700 行代码。

Claude Code 的 agentic loop 分为两层:

层级文件职责
外层src/QueryEngine.ts会话生命周期、消息持久化、预算执行、SDK 消息 yield
内层src/query.tswhile(true) 循环——API 调用、streaming、tool 分发、继续决策

外层(QueryEngine.submitMessage)是一个 AsyncGenerator,驱动内层(query())并对其输出做后处理以供 SDK 消费者使用。内层是实际 agentic 推理发生的地方。

graph TD
    A[User Message] --> B[QueryEngine.submitMessage]
    B --> C[processUserInput]
    C --> D[query loop entry]
    D --> E{while true}
    E --> F[Message Build & Compact]
    F --> G[API Call & Stream]
    G --> H{needsFollowUp?}
    H -- yes --> I[Tool Dispatch]
    I --> J[Result Collection]
    J --> K[Attachment Injection]
    K --> L{maxTurns reached?}
    L -- no --> E
    L -- yes --> M[Return max_turns]
    H -- no --> N{Stop Hooks}
    N --> O[Return completed]
    B --> P[Yield SDK Messages]

核心循环位于 src/query.tsqueryLoop() 中。它是一个字面意义上的 while (true)——没有迭代计数,没有 for 循环边界。循环持续运行,直到某个显式 return 语句触发。

// src/query.ts — the core loop structure
async function* queryLoop(
params: QueryParams,
consumedCommandUuids: string[],
): AsyncGenerator<
StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage,
Terminal
> {
const { systemPrompt, userContext, systemContext, canUseTool, ... } = params;
let state: State = {
messages: params.messages,
toolUseContext: params.toolUseContext,
autoCompactTracking: undefined,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
turnCount: 1,
transition: undefined,
// ...
};
// eslint-disable-next-line no-constant-condition
while (true) {
let { toolUseContext } = state;
const { messages, turnCount, ... } = state;
// === STAGE 1-7 happen here ===
state = next; // transition to next iteration
}
}

该循环无法用有界迭代来表达,原因如下:

  1. 不可预测的 tool 调用链:模型在完成任务前可能调用 1 个或 50 个 tool。
  2. 恢复循环:触发 max_output_tokens → 注入恢复消息 → 重新进入。每个 turn 最多可发生 3 次。
  3. 响应式 compaction:若 context 过大,执行 compact 并重试——但每次 turn 只做一次。
  4. Stop hooks:hook 可以阻断响应,注入错误消息并强制再次迭代。
  5. token budget 续行:启用时,若模型相对于其 token budget 提前停止,循环会自动继续。

每次经过 while(true) 循环执行 7 个概念阶段:

第一阶段为 API 调用准备消息数组,包含若干子步骤:

// 1a. Get messages after the last compact boundary
let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)];
// 1b. Apply tool result budget (persist large results to disk)
messagesForQuery = await applyToolResultBudget(
messagesForQuery,
toolUseContext.contentReplacementState,
// ...
);
// 1c. Apply snip compaction (HISTORY_SNIP feature)
if (feature('HISTORY_SNIP')) {
const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery);
messagesForQuery = snipResult.messages;
snipTokensFreed = snipResult.tokensFreed;
}
// 1d. Apply microcompact (remove stale tool results)
const microcompactResult = await deps.microcompact(
messagesForQuery, toolUseContext, querySource
);
messagesForQuery = microcompactResult.messages;
// 1e. Apply context collapse (if enabled)
if (feature('CONTEXT_COLLAPSE') && contextCollapse) {
const collapseResult = await contextCollapse.applyCollapsesIfNeeded(
messagesForQuery, toolUseContext, querySource
);
messagesForQuery = collapseResult.messages;
}
// 1f. Build the full system prompt
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext)
);
// 1g. Auto-compact if needed
const { compactionResult } = await deps.autocompact(
messagesForQuery, toolUseContext, /* ... */
);

消息准备 pipeline 有严格的顺序:tool result budget → snip → microcompact → context collapse → autocompact。每个阶段都可以减少 token 数量,从而可能阻止下一阶段触发。

消息准备完毕后,循环配置并发起 API 请求:

for await (const message of deps.callModel({
messages: prependUserContext(messagesForQuery, userContext),
systemPrompt: fullSystemPrompt,
thinkingConfig: toolUseContext.options.thinkingConfig,
tools: toolUseContext.options.tools,
signal: toolUseContext.abortController.signal,
options: {
model: currentModel,
fallbackModel,
querySource,
maxOutputTokensOverride,
taskBudget: params.taskBudget && { /* ... */ },
// ...
},
})) {
// Stage 3: Stream processing
}

deps.callModel 默认为 src/services/api/claude.ts 中的 queryModelWithStreaming,它封装了 Anthropic SDK 的 messages.stream()

随着 API stream 传入内容块,循环逐一处理:

for await (const message of deps.callModel({ /* ... */ })) {
// Handle streaming fallback (model switch mid-stream)
if (streamingFallbackOccured) {
assistantMessages.length = 0;
toolResults.length = 0;
// Reset and retry with fallback model
}
// Withhold recoverable errors (prompt-too-long, max-output-tokens)
let withheld = false;
if (reactiveCompact?.isWithheldPromptTooLong(message)) withheld = true;
if (isWithheldMaxOutputTokens(message)) withheld = true;
if (!withheld) yield yieldMessage;
// Track assistant messages and detect tool_use blocks
if (message.type === 'assistant') {
assistantMessages.push(message);
const msgToolUseBlocks = message.message.content.filter(
content => content.type === 'tool_use'
);
if (msgToolUseBlocks.length > 0) {
toolUseBlocks.push(...msgToolUseBlocks);
needsFollowUp = true;
}
// Feed tool blocks to StreamingToolExecutor for parallel execution
if (streamingToolExecutor) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message);
}
}
}
}

关键洞察:tool 执行可以在 API 仍在 streaming 时启动StreamingToolExecutor 在 tool 块到达时立即接收并开始执行并发安全的 tool。

API 响应完成后,收集剩余的 tool 结果:

const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext);
for await (const update of toolUpdates) {
if (update.message) {
yield update.message;
toolResults.push(
...normalizeMessagesForAPI([update.message], tools).filter(_ => _.type === 'user')
);
}
if (update.newContext) {
updatedToolUseContext = { ...update.newContext, queryTracking };
}
}

存在两条执行路径:

  • StreamingToolExecutor(默认):tool 在其 tool_use 块 stream 进来后立即开始执行。并发安全的 tool 并行运行。
  • runTools(回退):来自 toolOrchestration.ts 的传统顺序/批量执行。

tool 完成后,循环收集需要注入的额外 context:

// Get queued commands (btw messages, task notifications)
const queuedCommandsSnapshot = getCommandsByMaxPriority(
sleepRan ? 'later' : 'next'
);
// Inject file change attachments, memory attachments, skill discovery
for await (const attachment of getAttachmentMessages(
null, updatedToolUseContext, null, queuedCommandsSnapshot,
[...messagesForQuery, ...assistantMessages, ...toolResults],
querySource,
)) {
yield attachment;
toolResults.push(attachment);
}
// Memory prefetch consume
if (pendingMemoryPrefetch?.settledAt !== null) {
const memoryAttachments = filterDuplicateMemoryAttachments(
await pendingMemoryPrefetch.promise,
toolUseContext.readFileState,
);
// yield memory attachments
}

这是关键的 fork 点。所有 tool 结果收集完毕后:

if (!needsFollowUp) {
// No tool_use blocks → model is done (or errored)
// Handle recoverable errors: prompt-too-long, max-output-tokens
// Handle stop hooks
// Handle token budget continuation
return { reason: 'completed' };
}
// Tool results exist → check limits and continue
const nextTurnCount = turnCount + 1;
if (maxTurns && nextTurnCount > maxTurns) {
yield createAttachmentMessage({ type: 'max_turns_reached', maxTurns, turnCount: nextTurnCount });
return { reason: 'max_turns', turnCount: nextTurnCount };
}
// Prepare next iteration
state = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
toolUseContext: toolUseContextWithQueryTracking,
turnCount: nextTurnCount,
transition: { reason: 'next_turn' },
// ...
};
// implicit continue → back to while(true)

循环体通过写入新的 State 对象并落入下一次 while(true) 迭代来结束。State 类型捕获迭代间所有变化的内容:

type State = {
messages: Message[];
toolUseContext: ToolUseContext;
autoCompactTracking: AutoCompactTrackingState | undefined;
maxOutputTokensRecoveryCount: number;
hasAttemptedReactiveCompact: boolean;
maxOutputTokensOverride: number | undefined;
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined;
stopHookActive: boolean | undefined;
turnCount: number;
transition: Continue | undefined; // why we continued
};

transition 字段记录了循环为何继续——对调试和测试极为宝贵:

转换原因含义
next_turn正常:tool 结果需要后续 API 调用
reactive_compact_retryprompt 过长,已 compact 并重试
collapse_drain_retrycontext collapse 排空了暂存的折叠
max_output_tokens_recovery触达输出 token 上限,注入恢复消息
max_output_tokens_escalate从 8K 提升至 64K 最大输出 token
stop_hook_blockingstop hook 注入了阻断错误
token_budget_continuation模型提前停止,budget 指示继续

queryLoop 函数是一个 AsyncGenerator,这意味着它在消息可用时即 yield 给调用方。这是一个关键设计选择,它允许:

  1. 实时 streaming:SDK 消费者在内容 stream 进来时即可看到助手文本,而无需等待完整 turn。
  2. 背压:调用方控制消费速度。若 SDK 消费者处理较慢,generator 会暂停。
  3. 提前终止:调用方可以对 generator 调用 .return() 以在任意时刻中止循环。

关键 yield 点:

yield { type: 'stream_request_start' }; // Turn boundary marker
yield yieldMessage; // Streamed content blocks
yield result; // Completed tool results
yield attachment; // Context attachments
yield createAttachmentMessage({ type: 'max_turns_reached' }); // Limit hit

外层 QueryEngine.submitMessage 消费这些 yield 并将其转换为 SDK 输出:

for await (const message of query({
messages, systemPrompt, userContext, systemContext,
canUseTool, toolUseContext, /* ... */
})) {
switch (message.type) {
case 'assistant':
this.mutableMessages.push(message);
yield* normalizeMessage(message);
break;
case 'stream_event':
if (message.event.type === 'message_start') {
currentMessageUsage = updateUsage(currentMessageUsage, message.event.message.usage);
}
if (message.event.type === 'message_stop') {
this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage);
}
break;
// ...
}
// Check USD budget after each message
if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) {
yield { type: 'result', subtype: 'error_max_budget_usd', /* ... */ };
return;
}
}

状态通过一个可变的 State 对象管理,该对象在每个继续点被完整替换。循环中共有 7 个继续点,每个都构建一个全新的 State

graph LR
    A[next_turn] --> S[State]
    B[reactive_compact_retry] --> S
    C[collapse_drain_retry] --> S
    D[max_output_tokens_recovery] --> S
    E[max_output_tokens_escalate] --> S
    F[stop_hook_blocking] --> S
    G[token_budget_continuation] --> S

每个继续点都显式构建完整状态,确保不同恢复路径之间不会有陈旧值泄漏。例如,reactive_compact_retryautoCompactTracking 重置为 undefined,但保留 hasAttemptedReactiveCompact: true 以防止无限重试循环。

循环在控制流中内置了复杂的错误恢复机制:

1. API 返回 prompt-too-long 错误
2. 错误从 SDK stream 中被 WITHHELD(扣押)
3. 尝试 context-collapse drain(成本低,保留细粒度 context)
4. 若仍失败,尝试 reactive compact(完整摘要)
5. 若两者均失败,暴露被扣押的错误并返回
1. API 返回 max_output_tokens 停止原因
2. 错误从 SDK stream 中被 WITHHELD(扣押)
3. 首先:尝试从 8K 提升至 64K(单次重试,无用户消息)
4. 若仍触达上限:注入"Resume directly — no recap"消息
5. 最多允许 3 次恢复尝试
6. 若耗尽,暴露被扣押的错误
1. API stream 在响应中途失败
2. 捕获 FallbackTriggeredError
3. 将孤立消息标记为 tombstone(无效的 thinking 签名)
4. 切换至回退模型
5. 重试整个请求

将所有内容整合,以下是循环中单次迭代的完整生命周期:

sequenceDiagram
    participant L as Loop Entry
    participant P as Message Prep
    participant A as API Call
    participant S as Stream
    participant T as Tool Dispatch
    participant R as Result Collection
    participant D as Continue Decision

    L->>P: Destructure state
    P->>P: Tool result budget
    P->>P: Snip compaction
    P->>P: Microcompact
    P->>P: Context collapse
    P->>P: Auto-compact
    P->>A: Configure API params
    A->>S: Stream response
    S->>S: Yield content blocks
    S->>T: Feed tool_use to StreamingToolExecutor
    S-->>S: Collect completed results during stream
    Note over S: Stream ends
    T->>T: Await remaining tool results
    T->>R: Yield tool results
    R->>R: Inject attachments
    R->>R: Memory prefetch consume
    R->>D: Check needsFollowUp
    alt No tool use
        D->>D: Run stop hooks
        D-->>L: return {reason: 'completed'}
    else Has tool results
        D->>D: Check maxTurns
        D-->>L: state = next; continue
    end

循环在运行过程中维护若干不变量:

  1. 每个 tool_use 都有对应的 tool_result:即使在中止时,也会生成合成的错误 tool_result 块。API 要求成对匹配。

  2. 消息在单次迭代内只追加不修改messagesForQuery 在每次迭代时全新构建,但在迭代内部,消息只通过 push 增长。

  3. 每次迭代最多执行一次 compactionhasAttemptedReactiveCompact 标志防止无限的 compact→重试→compact 循环。

  4. stop hook 在每个终止位置最多运行一次stopHookActive 标志防止在 hook 注入阻断错误后的重试中重复触发。

  5. 预算检查在 yield 之后QueryEngine 在每条 yield 消息后检查 maxBudgetUsd,而不是在循环内部。这将预算执行集中在一处。

指标典型值备注
每条用户消息的迭代次数2-15取决于任务复杂度
每次迭代耗时2-30s主要受 API 延迟影响
tool 与 streaming 的重叠度40-80%StreamingToolExecutor 在 stream 期间启动 tool
compact 频率每 5-20 个 turn取决于 context window 使用情况
每次迭代内存占用~2-5MB消息数组是主要开销

streaming tool executor 是循环中最大的性能优化。通过在 API 仍在 streaming 时启动 tool 执行,它可以在多 tool 响应的每次迭代中节省 1-5 秒。