跳转到内容

Streaming Infrastructure

此内容尚不支持你的语言。

Claude Code’s streaming infrastructure is the backbone that connects the Anthropic API to tool execution and user display. Understanding the streaming protocol, content block types, and the StreamingToolExecutor is essential for comprehending how the agentic loop works.

Claude Code uses the Anthropic SDK’s streaming API via Server-Sent Events (SSE). The streaming protocol consists of a sequence of typed events:

sequenceDiagram
participant CC as Claude Code
participant API as Anthropic API
CC->>API: POST /v1/messages (stream: true)
API-->>CC: message_start
API-->>CC: content_block_start (thinking)
API-->>CC: content_block_delta (thinking text)
API-->>CC: content_block_stop
API-->>CC: content_block_start (text)
API-->>CC: content_block_delta (text chunks)
API-->>CC: content_block_stop
API-->>CC: content_block_start (tool_use)
API-->>CC: content_block_delta (tool input JSON)
API-->>CC: content_block_stop
API-->>CC: message_delta (stop_reason, usage)
API-->>CC: message_stop

The SDK exposes these as BetaRawMessageStreamEvent:

// From @anthropic-ai/sdk/resources/beta/messages/messages.mjs
type BetaRawMessageStreamEvent =
| { type: 'message_start'; message: BetaMessage }
| { type: 'content_block_start'; index: number; content_block: BetaContentBlock }
| { type: 'content_block_delta'; index: number; delta: ContentDelta }
| { type: 'content_block_stop'; index: number }
| { type: 'message_delta'; delta: { stop_reason: string }; usage: BetaMessageDeltaUsage }
| { type: 'message_stop' }

The API response contains multiple content blocks that Claude Code must process:

Block TypeDescriptionHow Claude Code Handles It
thinkingExtended thinking/reasoningDisplayed in TUI, optionally redacted
textNatural language responseRendered to user, extracted for SDK result
tool_useTool call requestDispatched to StreamingToolExecutor
tool_resultTool execution resultConstructed by Claude Code, sent back
// A tool_use block from the API
interface ToolUseBlock {
type: 'tool_use'
id: string // Unique ID like "toolu_abc123"
name: string // Tool name like "Bash" or "FileRead"
input: object // Tool-specific input (JSON)
}
// The corresponding tool_result sent back
interface ToolResultBlockParam {
type: 'tool_result'
tool_use_id: string // Must match the tool_use id
content: string | Array<ContentBlockParam>
is_error?: boolean
}

Claude Code uses AsyncGenerator throughout its streaming pipeline. This provides natural backpressure — producers only generate values when consumers are ready to consume:

graph LR
A["claude.ts<br/>createStreamedResponse()"] -->|"AsyncGenerator<br/>Message"| B["query.ts<br/>query()"]
B -->|"AsyncGenerator<br/>Message"| C["QueryEngine.ts<br/>submitMessage()"]
C -->|"AsyncGenerator<br/>SDKMessage"| D["Consumer<br/>(REPL or SDK)"]

The core agentic loop in query.ts is itself an AsyncGenerator:

// src/query.ts (conceptual)
export async function* query({
messages,
systemPrompt,
canUseTool,
toolUseContext,
// ...
}): AsyncGenerator<Message, void, unknown> {
while (true) {
// 1. Call the API
const stream = createStreamedResponse(messages, systemPrompt, ...)
// 2. Process streaming response
for await (const event of stream) {
if (event.type === 'assistant') {
yield event // Pass to consumer
}
// Collect tool_use blocks...
}
// 3. Execute tools
const toolResults = await executeTools(toolBlocks)
yield* toolResults // Pass results to consumer
// 4. If no tool calls, we're done
if (toolBlocks.length === 0) break
// 5. Otherwise, loop with tool results added to messages
messages.push(...toolResults)
}
}

Even the retry logic uses AsyncGenerator to yield status messages during retries:

src/services/api/withRetry.ts
export async function* withRetry<T>(
getClient: () => Promise<Anthropic>,
operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>,
options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T> {
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
try {
return await operation(client, attempt, retryContext)
} catch (error) {
// Yield error message for UI display
if (error instanceof APIError) {
yield createSystemAPIErrorMessage(error, delayMs, attempt, maxRetries)
}
await sleep(delayMs, options.signal)
}
}
}

The return statement makes this generator produce a final value of type T (the API response), while yield produces intermediate status messages of type SystemAPIErrorMessage. The consumer uses returnValue() to extract the final result.

The StreamingToolExecutor class (src/services/tools/StreamingToolExecutor.ts) is responsible for executing tools as they stream in from the API, with sophisticated concurrency control.

graph TB
subgraph "Streaming from API"
T1["tool_use: Grep<br/>(concurrent-safe)"]
T2["tool_use: Grep<br/>(concurrent-safe)"]
T3["tool_use: FileEdit<br/>(NOT concurrent-safe)"]
end
subgraph "Execution"
P1["T1: Execute immediately"]
P2["T2: Execute in parallel"]
P3["T3: Queue until T1,T2 finish"]
end
subgraph "Result Ordering"
R1["Result 1 (from T1)"]
R2["Result 2 (from T2)"]
R3["Result 3 (from T3)"]
end
T1 --> P1
T2 --> P2
T3 --> P3
P1 --> R1
P2 --> R2
P3 --> R3

Rules:

  1. Concurrent-safe tools (e.g., Grep, Glob, FileRead) execute in parallel
  2. Non-concurrent tools (e.g., FileEdit, Bash) execute exclusively — they wait for all in-flight tools to complete
  3. Results are always yielded in order — even if T2 finishes before T1, T2’s result waits
src/services/tools/StreamingToolExecutor.ts
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = {
id: string
block: ToolUseBlock
assistantMessage: AssistantMessage
status: ToolStatus
isConcurrencySafe: boolean
promise?: Promise<void>
results?: Message[]
pendingProgress: Message[]
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
}

When the API streams a content_block_stop for a tool_use block, the executor receives it:

src/services/tools/StreamingToolExecutor.ts
export class StreamingToolExecutor {
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
// Unknown tool — create error result immediately
this.tools.push({
id: block.id,
status: 'completed',
results: [createUserMessage({
content: [{
type: 'tool_result',
content: `Error: No such tool available: ${block.name}`,
is_error: true,
tool_use_id: block.id,
}],
})],
// ...
})
return
}
// Queue the tool and attempt execution
this.tools.push({
id: block.id,
block,
status: 'queued',
isConcurrencySafe: toolDefinition.isConcurrencySafe(block.input),
// ...
})
this.tryExecuteNext()
}
}

When a Bash tool errors, the executor aborts sibling tools immediately:

src/services/tools/StreamingToolExecutor.ts
constructor(toolDefinitions, canUseTool, toolUseContext) {
// Child of toolUseContext.abortController
// Fires when a Bash tool errors — siblings die immediately
// Does NOT abort the parent — query.ts won't end the turn
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}

When a streaming fallback occurs (e.g., switching from streaming to non-streaming mode), the executor can discard all pending work:

discard(): void {
this.discarded = true
// Queued tools won't start
// In-progress tools will receive synthetic errors
}

The streaming pipeline uses a rich message type system defined in src/types/message.ts:

type Message =
| UserMessage // User input or tool results
| AssistantMessage // Model responses
| SystemMessage // System events (errors, compaction boundaries)
| AttachmentMessage // File attachments, structured output
| ProgressMessage // Tool execution progress
| StreamEvent // Raw stream events (for SDK consumers)
| RequestStartEvent // API request tracking
| ToolUseSummaryMessage // Collapsed tool use summaries
| TombstoneMessage // Control signal for message removal

claude.ts transforms raw SDK stream events into typed messages:

graph LR
SE["content_block_start<br/>type: tool_use"] --> AM["AssistantMessage<br/>with tool_use content"]
SE2["message_delta<br/>stop_reason: end_turn"] --> AM2["AssistantMessage<br/>updated stop_reason"]
SE3["content_block_delta<br/>type: text_delta"] --> AM3["AssistantMessage<br/>with text content"]

The AsyncGenerator pattern provides natural backpressure:

  1. API → claude.ts: The SDK’s Stream object buffers SSE events
  2. claude.ts → query.ts: for await (const msg of stream) pulls one message at a time
  3. query.ts → QueryEngine: yield pauses the generator until the consumer calls .next()
  4. QueryEngine → REPL: The Ink React renderer consumes at its own pace

This means if the UI can’t keep up with rendering, the entire pipeline naturally slows down — no explicit flow control needed.

In the SDK (headless) path, transcript recording is carefully optimized to not block the pipeline:

// src/QueryEngine.ts — Fire-and-forget for assistant messages
if (message.type === 'assistant') {
void recordTranscript(messages) // Non-blocking
} else {
await recordTranscript(messages) // Blocking for user messages
}

The comment explains why: claude.ts yields one assistant message per content block, then mutates the last one’s usage/stop_reason on message_delta. Awaiting would block the generator, preventing the mutation.

graph TB
subgraph "API Layer"
SDK["Anthropic SDK<br/>SSE Stream"]
end
subgraph "claude.ts"
PARSE["Parse stream events"]
NORM["Normalize to Message types"]
RETRY["withRetry wrapper"]
end
subgraph "query.ts"
LOOP["Agentic loop"]
STE["StreamingToolExecutor"]
COMPACT["Auto-compaction check"]
end
subgraph "QueryEngine.ts"
SESSION["Session management"]
RECORD["Transcript recording"]
YIELD_SDK["Yield SDKMessage"]
end
subgraph "Consumer"
REPL["REPL (React/Ink)"]
SDK_OUT["SDK (stdout JSON)"]
end
SDK --> RETRY
RETRY --> PARSE
PARSE --> NORM
NORM --> LOOP
LOOP --> STE
STE --> LOOP
LOOP --> COMPACT
LOOP --> SESSION
SESSION --> RECORD
SESSION --> YIELD_SDK
YIELD_SDK --> REPL
YIELD_SDK --> SDK_OUT