Streaming Infrastructure
Claude Code’s streaming infrastructure is the backbone that connects the Anthropic API to tool execution and user display. Understanding the streaming protocol, content block types, and the StreamingToolExecutor is essential for comprehending how the agentic loop works.
Anthropic Streaming Protocol
Section titled “Anthropic Streaming Protocol”Claude Code uses the Anthropic SDK’s streaming API via Server-Sent Events (SSE). The streaming protocol consists of a sequence of typed events:
sequenceDiagram participant CC as Claude Code participant API as Anthropic API
CC->>API: POST /v1/messages (stream: true) API-->>CC: message_start API-->>CC: content_block_start (thinking) API-->>CC: content_block_delta (thinking text) API-->>CC: content_block_stop API-->>CC: content_block_start (text) API-->>CC: content_block_delta (text chunks) API-->>CC: content_block_stop API-->>CC: content_block_start (tool_use) API-->>CC: content_block_delta (tool input JSON) API-->>CC: content_block_stop API-->>CC: message_delta (stop_reason, usage) API-->>CC: message_stopEvent Types
Section titled “Event Types”The SDK exposes these as BetaRawMessageStreamEvent:
// From @anthropic-ai/sdk/resources/beta/messages/messages.mjstype BetaRawMessageStreamEvent = | { type: 'message_start'; message: BetaMessage } | { type: 'content_block_start'; index: number; content_block: BetaContentBlock } | { type: 'content_block_delta'; index: number; delta: ContentDelta } | { type: 'content_block_stop'; index: number } | { type: 'message_delta'; delta: { stop_reason: string }; usage: BetaMessageDeltaUsage } | { type: 'message_stop' }Content Block Types
Section titled “Content Block Types”The API response contains multiple content blocks that Claude Code must process:
| Block Type | Description | How Claude Code Handles It |
|---|---|---|
thinking | Extended thinking/reasoning | Displayed in TUI, optionally redacted |
text | Natural language response | Rendered to user, extracted for SDK result |
tool_use | Tool call request | Dispatched to StreamingToolExecutor |
tool_result | Tool execution result | Constructed by Claude Code, sent back |
Tool Use Block Structure
Section titled “Tool Use Block Structure”// A tool_use block from the APIinterface ToolUseBlock { type: 'tool_use' id: string // Unique ID like "toolu_abc123" name: string // Tool name like "Bash" or "FileRead" input: object // Tool-specific input (JSON)}
// The corresponding tool_result sent backinterface ToolResultBlockParam { type: 'tool_result' tool_use_id: string // Must match the tool_use id content: string | Array<ContentBlockParam> is_error?: boolean}AsyncGenerator Pipeline
Section titled “AsyncGenerator Pipeline”Claude Code uses AsyncGenerator throughout its streaming pipeline. This provides natural backpressure — producers only generate values when consumers are ready to consume:
graph LR A["claude.ts<br/>createStreamedResponse()"] -->|"AsyncGenerator<br/>Message"| B["query.ts<br/>query()"] B -->|"AsyncGenerator<br/>Message"| C["QueryEngine.ts<br/>submitMessage()"] C -->|"AsyncGenerator<br/>SDKMessage"| D["Consumer<br/>(REPL or SDK)"]The query() Generator
Section titled “The query() Generator”The core agentic loop in query.ts is itself an AsyncGenerator:
// src/query.ts (conceptual)export async function* query({ messages, systemPrompt, canUseTool, toolUseContext, // ...}): AsyncGenerator<Message, void, unknown> { while (true) { // 1. Call the API const stream = createStreamedResponse(messages, systemPrompt, ...)
// 2. Process streaming response for await (const event of stream) { if (event.type === 'assistant') { yield event // Pass to consumer } // Collect tool_use blocks... }
// 3. Execute tools const toolResults = await executeTools(toolBlocks) yield* toolResults // Pass results to consumer
// 4. If no tool calls, we're done if (toolBlocks.length === 0) break
// 5. Otherwise, loop with tool results added to messages messages.push(...toolResults) }}The withRetry() Generator
Section titled “The withRetry() Generator”Even the retry logic uses AsyncGenerator to yield status messages during retries:
export async function* withRetry<T>( getClient: () => Promise<Anthropic>, operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>, options: RetryOptions,): AsyncGenerator<SystemAPIErrorMessage, T> { for (let attempt = 1; attempt <= maxRetries + 1; attempt++) { try { return await operation(client, attempt, retryContext) } catch (error) { // Yield error message for UI display if (error instanceof APIError) { yield createSystemAPIErrorMessage(error, delayMs, attempt, maxRetries) } await sleep(delayMs, options.signal) } }}The return statement makes this generator produce a final value of type T (the API response), while yield produces intermediate status messages of type SystemAPIErrorMessage. The consumer uses returnValue() to extract the final result.
StreamingToolExecutor
Section titled “StreamingToolExecutor”The StreamingToolExecutor class (src/services/tools/StreamingToolExecutor.ts) is responsible for executing tools as they stream in from the API, with sophisticated concurrency control.
Concurrency Model
Section titled “Concurrency Model”graph TB subgraph "Streaming from API" T1["tool_use: Grep<br/>(concurrent-safe)"] T2["tool_use: Grep<br/>(concurrent-safe)"] T3["tool_use: FileEdit<br/>(NOT concurrent-safe)"] end
subgraph "Execution" P1["T1: Execute immediately"] P2["T2: Execute in parallel"] P3["T3: Queue until T1,T2 finish"] end
subgraph "Result Ordering" R1["Result 1 (from T1)"] R2["Result 2 (from T2)"] R3["Result 3 (from T3)"] end
T1 --> P1 T2 --> P2 T3 --> P3 P1 --> R1 P2 --> R2 P3 --> R3Rules:
- Concurrent-safe tools (e.g.,
Grep,Glob,FileRead) execute in parallel - Non-concurrent tools (e.g.,
FileEdit,Bash) execute exclusively — they wait for all in-flight tools to complete - Results are always yielded in order — even if T2 finishes before T1, T2’s result waits
Tool Status Lifecycle
Section titled “Tool Status Lifecycle”type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
type TrackedTool = { id: string block: ToolUseBlock assistantMessage: AssistantMessage status: ToolStatus isConcurrencySafe: boolean promise?: Promise<void> results?: Message[] pendingProgress: Message[] contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>}Streaming Tool Addition
Section titled “Streaming Tool Addition”When the API streams a content_block_stop for a tool_use block, the executor receives it:
export class StreamingToolExecutor { addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { const toolDefinition = findToolByName(this.toolDefinitions, block.name) if (!toolDefinition) { // Unknown tool — create error result immediately this.tools.push({ id: block.id, status: 'completed', results: [createUserMessage({ content: [{ type: 'tool_result', content: `Error: No such tool available: ${block.name}`, is_error: true, tool_use_id: block.id, }], })], // ... }) return }
// Queue the tool and attempt execution this.tools.push({ id: block.id, block, status: 'queued', isConcurrencySafe: toolDefinition.isConcurrencySafe(block.input), // ... }) this.tryExecuteNext() }}Error Propagation
Section titled “Error Propagation”When a Bash tool errors, the executor aborts sibling tools immediately:
constructor(toolDefinitions, canUseTool, toolUseContext) { // Child of toolUseContext.abortController // Fires when a Bash tool errors — siblings die immediately // Does NOT abort the parent — query.ts won't end the turn this.siblingAbortController = createChildAbortController( toolUseContext.abortController, )}Discard on Fallback
Section titled “Discard on Fallback”When a streaming fallback occurs (e.g., switching from streaming to non-streaming mode), the executor can discard all pending work:
discard(): void { this.discarded = true // Queued tools won't start // In-progress tools will receive synthetic errors}Message Types
Section titled “Message Types”The streaming pipeline uses a rich message type system defined in src/types/message.ts:
type Message = | UserMessage // User input or tool results | AssistantMessage // Model responses | SystemMessage // System events (errors, compaction boundaries) | AttachmentMessage // File attachments, structured output | ProgressMessage // Tool execution progress | StreamEvent // Raw stream events (for SDK consumers) | RequestStartEvent // API request tracking | ToolUseSummaryMessage // Collapsed tool use summaries | TombstoneMessage // Control signal for message removalStream Events to Messages
Section titled “Stream Events to Messages”claude.ts transforms raw SDK stream events into typed messages:
graph LR SE["content_block_start<br/>type: tool_use"] --> AM["AssistantMessage<br/>with tool_use content"] SE2["message_delta<br/>stop_reason: end_turn"] --> AM2["AssistantMessage<br/>updated stop_reason"] SE3["content_block_delta<br/>type: text_delta"] --> AM3["AssistantMessage<br/>with text content"]Backpressure Handling
Section titled “Backpressure Handling”The AsyncGenerator pattern provides natural backpressure:
- API → claude.ts: The SDK’s
Streamobject buffers SSE events - claude.ts → query.ts:
for await (const msg of stream)pulls one message at a time - query.ts → QueryEngine:
yieldpauses the generator until the consumer calls.next() - QueryEngine → REPL: The Ink React renderer consumes at its own pace
This means if the UI can’t keep up with rendering, the entire pipeline naturally slows down — no explicit flow control needed.
Transcript Recording Optimization
Section titled “Transcript Recording Optimization”In the SDK (headless) path, transcript recording is carefully optimized to not block the pipeline:
// src/QueryEngine.ts — Fire-and-forget for assistant messagesif (message.type === 'assistant') { void recordTranscript(messages) // Non-blocking} else { await recordTranscript(messages) // Blocking for user messages}The comment explains why: claude.ts yields one assistant message per content block, then mutates the last one’s usage/stop_reason on message_delta. Awaiting would block the generator, preventing the mutation.
Stream Processing Architecture
Section titled “Stream Processing Architecture”graph TB subgraph "API Layer" SDK["Anthropic SDK<br/>SSE Stream"] end
subgraph "claude.ts" PARSE["Parse stream events"] NORM["Normalize to Message types"] RETRY["withRetry wrapper"] end
subgraph "query.ts" LOOP["Agentic loop"] STE["StreamingToolExecutor"] COMPACT["Auto-compaction check"] end
subgraph "QueryEngine.ts" SESSION["Session management"] RECORD["Transcript recording"] YIELD_SDK["Yield SDKMessage"] end
subgraph "Consumer" REPL["REPL (React/Ink)"] SDK_OUT["SDK (stdout JSON)"] end
SDK --> RETRY RETRY --> PARSE PARSE --> NORM NORM --> LOOP LOOP --> STE STE --> LOOP LOOP --> COMPACT LOOP --> SESSION SESSION --> RECORD SESSION --> YIELD_SDK YIELD_SDK --> REPL YIELD_SDK --> SDK_OUT