跳转到内容

Streaming 基础设施

Claude Code 的 streaming 基础设施是连接 Anthropic API 与 tool 执行和用户展示的骨干。理解 streaming 协议、内容块类型以及 StreamingToolExecutor 对于理解 agentic loop 的工作原理至关重要。

Claude Code 通过 Server-Sent Events(SSE)使用 Anthropic SDK 的 streaming API。streaming 协议由一系列有类型的事件组成:

sequenceDiagram
    participant CC as Claude Code
    participant API as Anthropic API

    CC->>API: POST /v1/messages (stream: true)
    API-->>CC: message_start
    API-->>CC: content_block_start (thinking)
    API-->>CC: content_block_delta (thinking text)
    API-->>CC: content_block_stop
    API-->>CC: content_block_start (text)
    API-->>CC: content_block_delta (text chunks)
    API-->>CC: content_block_stop
    API-->>CC: content_block_start (tool_use)
    API-->>CC: content_block_delta (tool input JSON)
    API-->>CC: content_block_stop
    API-->>CC: message_delta (stop_reason, usage)
    API-->>CC: message_stop

SDK 将这些事件暴露为 BetaRawMessageStreamEvent

// From @anthropic-ai/sdk/resources/beta/messages/messages.mjs
type BetaRawMessageStreamEvent =
| { type: 'message_start'; message: BetaMessage }
| { type: 'content_block_start'; index: number; content_block: BetaContentBlock }
| { type: 'content_block_delta'; index: number; delta: ContentDelta }
| { type: 'content_block_stop'; index: number }
| { type: 'message_delta'; delta: { stop_reason: string }; usage: BetaMessageDeltaUsage }
| { type: 'message_stop' }

API 响应包含多种内容块,Claude Code 必须逐一处理:

块类型描述Claude Code 的处理方式
thinking扩展思考/推理过程在 TUI 中显示,可选择性遮蔽
text自然语言回复渲染给用户,提取为 SDK 结果
tool_usetool 调用请求分发给 StreamingToolExecutor
tool_resulttool 执行结果由 Claude Code 构建后发回
// A tool_use block from the API
interface ToolUseBlock {
type: 'tool_use'
id: string // Unique ID like "toolu_abc123"
name: string // Tool name like "Bash" or "FileRead"
input: object // Tool-specific input (JSON)
}
// The corresponding tool_result sent back
interface ToolResultBlockParam {
type: 'tool_result'
tool_use_id: string // Must match the tool_use id
content: string | Array<ContentBlockParam>
is_error?: boolean
}

Claude Code 在整个 streaming 管道中使用 AsyncGenerator。这提供了自然的背压机制——生产者只在消费者准备好时才产出值:

graph LR
    A["claude.ts<br/>createStreamedResponse()"] -->|"AsyncGenerator<br/>Message"| B["query.ts<br/>query()"]
    B -->|"AsyncGenerator<br/>Message"| C["QueryEngine.ts<br/>submitMessage()"]
    C -->|"AsyncGenerator<br/>SDKMessage"| D["Consumer<br/>(REPL or SDK)"]

query.ts 中的核心 agentic loop 本身就是一个 AsyncGenerator

// src/query.ts (conceptual)
export async function* query({
messages,
systemPrompt,
canUseTool,
toolUseContext,
// ...
}): AsyncGenerator<Message, void, unknown> {
while (true) {
// 1. Call the API
const stream = createStreamedResponse(messages, systemPrompt, ...)
// 2. Process streaming response
for await (const event of stream) {
if (event.type === 'assistant') {
yield event // Pass to consumer
}
// Collect tool_use blocks...
}
// 3. Execute tools
const toolResults = await executeTools(toolBlocks)
yield* toolResults // Pass results to consumer
// 4. If no tool calls, we're done
if (toolBlocks.length === 0) break
// 5. Otherwise, loop with tool results added to messages
messages.push(...toolResults)
}
}

即使 retry 逻辑也使用 AsyncGenerator,在 retry 期间产出状态消息:

src/services/api/withRetry.ts
export async function* withRetry<T>(
getClient: () => Promise<Anthropic>,
operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>,
options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T> {
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
try {
return await operation(client, attempt, retryContext)
} catch (error) {
// Yield error message for UI display
if (error instanceof APIError) {
yield createSystemAPIErrorMessage(error, delayMs, attempt, maxRetries)
}
await sleep(delayMs, options.signal)
}
}
}

return 语句使该 generator 产出类型为 T 的最终值(API 响应),而 yield 产出类型为 SystemAPIErrorMessage 的中间状态消息。消费者使用 returnValue() 提取最终结果。

StreamingToolExecutor 类(src/services/tools/StreamingToolExecutor.ts)负责在 tool 从 API stream 进来时执行它们,并具备复杂的并发控制。

graph TB
    subgraph "Streaming from API"
        T1["tool_use: Grep<br/>(concurrent-safe)"]
        T2["tool_use: Grep<br/>(concurrent-safe)"]
        T3["tool_use: FileEdit<br/>(NOT concurrent-safe)"]
    end

    subgraph "Execution"
        P1["T1: Execute immediately"]
        P2["T2: Execute in parallel"]
        P3["T3: Queue until T1,T2 finish"]
    end

    subgraph "Result Ordering"
        R1["Result 1 (from T1)"]
        R2["Result 2 (from T2)"]
        R3["Result 3 (from T3)"]
    end

    T1 --> P1
    T2 --> P2
    T3 --> P3
    P1 --> R1
    P2 --> R2
    P3 --> R3

规则:

  1. 并发安全的 tool(如 GrepGlobFileRead)并行执行
  2. 非并发 tool(如 FileEditBash)独占执行——等待所有进行中的 tool 完成
  3. 结果始终按顺序产出——即使 T2 先于 T1 完成,T2 的结果也会等待
src/services/tools/StreamingToolExecutor.ts
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {
id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}

当 API 为 tool_use 块 stream 出 content_block_stop 时,executor 接收它:

src/services/tools/StreamingToolExecutor.ts
export class StreamingToolExecutor {
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
// Unknown tool — create error result immediately
this.tools.push({
id: block.id,
status: 'completed',
results: [createUserMessage({
content: [{
type: 'tool_result',
content: `Error: No such tool available: ${block.name}`,
is_error: true,
tool_use_id: block.id,
}],
})],
// ...
})
return
}
// Queue the tool and attempt execution
this.tools.push({
id: block.id,
block,
status: 'queued',
isConcurrencySafe: toolDefinition.isConcurrencySafe(block.input),
// ...
})
this.tryExecuteNext()
}
}

Bash tool 出错时,executor 立即中止同级 tool:

src/services/tools/StreamingToolExecutor.ts
constructor(toolDefinitions, canUseTool, toolUseContext) {
// Child of toolUseContext.abortController
// Fires when a Bash tool errors — siblings die immediately
// Does NOT abort the parent — query.ts won't end the turn
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}

当发生 streaming 降级(如从 streaming 模式切换到非 streaming 模式)时,executor 可以丢弃所有待处理工作:

discard(): void {
this.discarded = true
// Queued tools won't start
// In-progress tools will receive synthetic errors
}

streaming 管道使用 src/types/message.ts 中定义的丰富消息类型系统:

type Message =
| UserMessage // User input or tool results
| AssistantMessage // Model responses
| SystemMessage // System events (errors, compaction boundaries)
| AttachmentMessage // File attachments, structured output
| ProgressMessage // Tool execution progress
| StreamEvent // Raw stream events (for SDK consumers)
| RequestStartEvent // API request tracking
| ToolUseSummaryMessage // Collapsed tool use summaries
| TombstoneMessage // Control signal for message removal

claude.ts 将原始 SDK stream 事件转换为类型化消息:

graph LR
    SE["content_block_start<br/>type: tool_use"] --> AM["AssistantMessage<br/>with tool_use content"]
    SE2["message_delta<br/>stop_reason: end_turn"] --> AM2["AssistantMessage<br/>updated stop_reason"]
    SE3["content_block_delta<br/>type: text_delta"] --> AM3["AssistantMessage<br/>with text content"]

AsyncGenerator 模式提供了自然的背压:

  1. API → claude.ts:SDK 的 Stream 对象缓冲 SSE 事件
  2. claude.ts → query.tsfor await (const msg of stream) 每次拉取一条消息
  3. query.ts → QueryEngineyield 暂停 generator,直到消费者调用 .next()
  4. QueryEngine → REPL:Ink React 渲染器以自己的速度消费

这意味着如果 UI 渲染跟不上,整个管道会自然减速——无需显式的流量控制。

在 SDK(无头)路径中,转录记录经过精心优化,不会阻塞管道:

// src/QueryEngine.ts — Fire-and-forget for assistant messages
if (message.type === 'assistant') {
void recordTranscript(messages) // Non-blocking
} else {
await recordTranscript(messages) // Blocking for user messages
}

注释解释了原因:claude.ts 每个内容块产出一条 assistant 消息,然后在 message_delta 时修改最后一条消息的 usage/stop_reason。如果 await,会阻塞 generator,阻止该修改发生。

graph TB
    subgraph "API Layer"
        SDK["Anthropic SDK<br/>SSE Stream"]
    end

    subgraph "claude.ts"
        PARSE["Parse stream events"]
        NORM["Normalize to Message types"]
        RETRY["withRetry wrapper"]
    end

    subgraph "query.ts"
        LOOP["Agentic loop"]
        STE["StreamingToolExecutor"]
        COMPACT["Auto-compaction check"]
    end

    subgraph "QueryEngine.ts"
        SESSION["Session management"]
        RECORD["Transcript recording"]
        YIELD_SDK["Yield SDKMessage"]
    end

    subgraph "Consumer"
        REPL["REPL (React/Ink)"]
        SDK_OUT["SDK (stdout JSON)"]
    end

    SDK --> RETRY
    RETRY --> PARSE
    PARSE --> NORM
    NORM --> LOOP
    LOOP --> STE
    STE --> LOOP
    LOOP --> COMPACT
    LOOP --> SESSION
    SESSION --> RECORD
    SESSION --> YIELD_SDK
    YIELD_SDK --> REPL
    YIELD_SDK --> SDK_OUT