Streaming 基础设施
Claude Code 的 streaming 基础设施是连接 Anthropic API 与 tool 执行和用户展示的骨干。理解 streaming 协议、内容块类型以及 StreamingToolExecutor 对于理解 agentic loop 的工作原理至关重要。
Anthropic Streaming 协议
Section titled “Anthropic Streaming 协议”Claude Code 通过 Server-Sent Events(SSE)使用 Anthropic SDK 的 streaming API。streaming 协议由一系列有类型的事件组成:
sequenceDiagram
participant CC as Claude Code
participant API as Anthropic API
CC->>API: POST /v1/messages (stream: true)
API-->>CC: message_start
API-->>CC: content_block_start (thinking)
API-->>CC: content_block_delta (thinking text)
API-->>CC: content_block_stop
API-->>CC: content_block_start (text)
API-->>CC: content_block_delta (text chunks)
API-->>CC: content_block_stop
API-->>CC: content_block_start (tool_use)
API-->>CC: content_block_delta (tool input JSON)
API-->>CC: content_block_stop
API-->>CC: message_delta (stop_reason, usage)
API-->>CC: message_stop
SDK 将这些事件暴露为 BetaRawMessageStreamEvent:
// From @anthropic-ai/sdk/resources/beta/messages/messages.mjstype BetaRawMessageStreamEvent = | { type: 'message_start'; message: BetaMessage } | { type: 'content_block_start'; index: number; content_block: BetaContentBlock } | { type: 'content_block_delta'; index: number; delta: ContentDelta } | { type: 'content_block_stop'; index: number } | { type: 'message_delta'; delta: { stop_reason: string }; usage: BetaMessageDeltaUsage } | { type: 'message_stop' }API 响应包含多种内容块,Claude Code 必须逐一处理:
| 块类型 | 描述 | Claude Code 的处理方式 |
|---|---|---|
thinking | 扩展思考/推理过程 | 在 TUI 中显示,可选择性遮蔽 |
text | 自然语言回复 | 渲染给用户,提取为 SDK 结果 |
tool_use | tool 调用请求 | 分发给 StreamingToolExecutor |
tool_result | tool 执行结果 | 由 Claude Code 构建后发回 |
Tool Use 块结构
Section titled “Tool Use 块结构”// A tool_use block from the APIinterface ToolUseBlock { type: 'tool_use' id: string // Unique ID like "toolu_abc123" name: string // Tool name like "Bash" or "FileRead" input: object // Tool-specific input (JSON)}
// The corresponding tool_result sent backinterface ToolResultBlockParam { type: 'tool_result' tool_use_id: string // Must match the tool_use id content: string | Array<ContentBlockParam> is_error?: boolean}AsyncGenerator 管道
Section titled “AsyncGenerator 管道”Claude Code 在整个 streaming 管道中使用 AsyncGenerator。这提供了自然的背压机制——生产者只在消费者准备好时才产出值:
graph LR
A["claude.ts<br/>createStreamedResponse()"] -->|"AsyncGenerator<br/>Message"| B["query.ts<br/>query()"]
B -->|"AsyncGenerator<br/>Message"| C["QueryEngine.ts<br/>submitMessage()"]
C -->|"AsyncGenerator<br/>SDKMessage"| D["Consumer<br/>(REPL or SDK)"]
query() Generator
Section titled “query() Generator”query.ts 中的核心 agentic loop 本身就是一个 AsyncGenerator:
// src/query.ts (conceptual)export async function* query({ messages, systemPrompt, canUseTool, toolUseContext, // ...}): AsyncGenerator<Message, void, unknown> { while (true) { // 1. Call the API const stream = createStreamedResponse(messages, systemPrompt, ...)
// 2. Process streaming response for await (const event of stream) { if (event.type === 'assistant') { yield event // Pass to consumer } // Collect tool_use blocks... }
// 3. Execute tools const toolResults = await executeTools(toolBlocks) yield* toolResults // Pass results to consumer
// 4. If no tool calls, we're done if (toolBlocks.length === 0) break
// 5. Otherwise, loop with tool results added to messages messages.push(...toolResults) }}withRetry() Generator
Section titled “withRetry() Generator”即使 retry 逻辑也使用 AsyncGenerator,在 retry 期间产出状态消息:
export async function* withRetry<T>( getClient: () => Promise<Anthropic>, operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>, options: RetryOptions,): AsyncGenerator<SystemAPIErrorMessage, T> { for (let attempt = 1; attempt <= maxRetries + 1; attempt++) { try { return await operation(client, attempt, retryContext) } catch (error) { // Yield error message for UI display if (error instanceof APIError) { yield createSystemAPIErrorMessage(error, delayMs, attempt, maxRetries) } await sleep(delayMs, options.signal) } }}return 语句使该 generator 产出类型为 T 的最终值(API 响应),而 yield 产出类型为 SystemAPIErrorMessage 的中间状态消息。消费者使用 returnValue() 提取最终结果。
StreamingToolExecutor
Section titled “StreamingToolExecutor”StreamingToolExecutor 类(src/services/tools/StreamingToolExecutor.ts)负责在 tool 从 API stream 进来时执行它们,并具备复杂的并发控制。
graph TB
subgraph "Streaming from API"
T1["tool_use: Grep<br/>(concurrent-safe)"]
T2["tool_use: Grep<br/>(concurrent-safe)"]
T3["tool_use: FileEdit<br/>(NOT concurrent-safe)"]
end
subgraph "Execution"
P1["T1: Execute immediately"]
P2["T2: Execute in parallel"]
P3["T3: Queue until T1,T2 finish"]
end
subgraph "Result Ordering"
R1["Result 1 (from T1)"]
R2["Result 2 (from T2)"]
R3["Result 3 (from T3)"]
end
T1 --> P1
T2 --> P2
T3 --> P3
P1 --> R1
P2 --> R2
P3 --> R3
规则:
- 并发安全的 tool(如
Grep、Glob、FileRead)并行执行 - 非并发 tool(如
FileEdit、Bash)独占执行——等待所有进行中的 tool 完成 - 结果始终按顺序产出——即使 T2 先于 T1 完成,T2 的结果也会等待
Tool 状态生命周期
Section titled “Tool 状态生命周期”type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = { id: string block: ToolUseBlock assistantMessage: AssistantMessage status: ToolStatus isConcurrencySafe: boolean promise?: Promise<void> results?: Message[] pendingProgress: Message[] contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>}Streaming Tool 添加
Section titled “Streaming Tool 添加”当 API 为 tool_use 块 stream 出 content_block_stop 时,executor 接收它:
export class StreamingToolExecutor { addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { const toolDefinition = findToolByName(this.toolDefinitions, block.name) if (!toolDefinition) { // Unknown tool — create error result immediately this.tools.push({ id: block.id, status: 'completed', results: [createUserMessage({ content: [{ type: 'tool_result', content: `Error: No such tool available: ${block.name}`, is_error: true, tool_use_id: block.id, }], })], // ... }) return }
// Queue the tool and attempt execution this.tools.push({ id: block.id, block, status: 'queued', isConcurrencySafe: toolDefinition.isConcurrencySafe(block.input), // ... }) this.tryExecuteNext() }}Error 传播
Section titled “Error 传播”当 Bash tool 出错时,executor 立即中止同级 tool:
constructor(toolDefinitions, canUseTool, toolUseContext) { // Child of toolUseContext.abortController // Fires when a Bash tool errors — siblings die immediately // Does NOT abort the parent — query.ts won't end the turn this.siblingAbortController = createChildAbortController( toolUseContext.abortController, )}当发生 streaming 降级(如从 streaming 模式切换到非 streaming 模式)时,executor 可以丢弃所有待处理工作:
discard(): void { this.discarded = true // Queued tools won't start // In-progress tools will receive synthetic errors}streaming 管道使用 src/types/message.ts 中定义的丰富消息类型系统:
type Message = | UserMessage // User input or tool results | AssistantMessage // Model responses | SystemMessage // System events (errors, compaction boundaries) | AttachmentMessage // File attachments, structured output | ProgressMessage // Tool execution progress | StreamEvent // Raw stream events (for SDK consumers) | RequestStartEvent // API request tracking | ToolUseSummaryMessage // Collapsed tool use summaries | TombstoneMessage // Control signal for message removalStream 事件到消息
Section titled “Stream 事件到消息”claude.ts 将原始 SDK stream 事件转换为类型化消息:
graph LR
SE["content_block_start<br/>type: tool_use"] --> AM["AssistantMessage<br/>with tool_use content"]
SE2["message_delta<br/>stop_reason: end_turn"] --> AM2["AssistantMessage<br/>updated stop_reason"]
SE3["content_block_delta<br/>type: text_delta"] --> AM3["AssistantMessage<br/>with text content"]
AsyncGenerator 模式提供了自然的背压:
- API → claude.ts:SDK 的
Stream对象缓冲 SSE 事件 - claude.ts → query.ts:
for await (const msg of stream)每次拉取一条消息 - query.ts → QueryEngine:
yield暂停 generator,直到消费者调用.next() - QueryEngine → REPL:Ink React 渲染器以自己的速度消费
这意味着如果 UI 渲染跟不上,整个管道会自然减速——无需显式的流量控制。
转录记录优化
Section titled “转录记录优化”在 SDK(无头)路径中,转录记录经过精心优化,不会阻塞管道:
// src/QueryEngine.ts — Fire-and-forget for assistant messagesif (message.type === 'assistant') { void recordTranscript(messages) // Non-blocking} else { await recordTranscript(messages) // Blocking for user messages}注释解释了原因:claude.ts 每个内容块产出一条 assistant 消息,然后在 message_delta 时修改最后一条消息的 usage/stop_reason。如果 await,会阻塞 generator,阻止该修改发生。
Stream 处理架构
Section titled “Stream 处理架构”graph TB
subgraph "API Layer"
SDK["Anthropic SDK<br/>SSE Stream"]
end
subgraph "claude.ts"
PARSE["Parse stream events"]
NORM["Normalize to Message types"]
RETRY["withRetry wrapper"]
end
subgraph "query.ts"
LOOP["Agentic loop"]
STE["StreamingToolExecutor"]
COMPACT["Auto-compaction check"]
end
subgraph "QueryEngine.ts"
SESSION["Session management"]
RECORD["Transcript recording"]
YIELD_SDK["Yield SDKMessage"]
end
subgraph "Consumer"
REPL["REPL (React/Ink)"]
SDK_OUT["SDK (stdout JSON)"]
end
SDK --> RETRY
RETRY --> PARSE
PARSE --> NORM
NORM --> LOOP
LOOP --> STE
STE --> LOOP
LOOP --> COMPACT
LOOP --> SESSION
SESSION --> RECORD
SESSION --> YIELD_SDK
YIELD_SDK --> REPL
YIELD_SDK --> SDK_OUT