Streaming Executor
The StreamingToolExecutor is Claude Code’s primary tool execution engine. Located at src/services/tools/StreamingToolExecutor.ts, it overlaps tool execution with API streaming — starting tools before the model finishes responding. This chapter walks through its internals in detail.
Why Streaming Execution?
Section titled “Why Streaming Execution?”In a typical agentic turn, the model may emit multiple tool calls. Without streaming execution:
API streams tool_use A ──────────────┐API streams tool_use B ──────────────┤API streams text ────────────────────┤API done ────────────────────────────┘Execute tool A ────────────── 2sExecute tool B ────────────── 1.5s Total: API time + 3.5sWith streaming execution:
API streams tool_use A ──────┐ Execute tool A starts ─────┤────── 2sAPI streams tool_use B ──────┤ Execute tool B starts ─────┤────── 1.5sAPI streams text ────────────┤API done ────────────────────┘ Tool A completes ──────────┘ Tool B completes ───────────┘ Total: API time + max(0, tools - overlap)The savings are significant: 40-80% of tool execution time can overlap with streaming.
Class Structure
Section titled “Class Structure”export class StreamingToolExecutor { private tools: TrackedTool[] = []; private toolUseContext: ToolUseContext; private hasErrored = false; private erroredToolDescription = ''; private siblingAbortController: AbortController; private discarded = false; private progressAvailableResolve?: () => void;
constructor( private readonly toolDefinitions: Tools, private readonly canUseTool: CanUseToolFn, toolUseContext: ToolUseContext, ) { this.toolUseContext = toolUseContext; this.siblingAbortController = createChildAbortController( toolUseContext.abortController, ); }}Key fields:
tools: Ordered list of all tracked toolssiblingAbortController: Child controller for Bash error cascadinghasErrored: Flag to cancel remaining tools on Bash errordiscarded: Flag for streaming fallback (entire response abandoned)progressAvailableResolve: Promise resolver for progress notification
The TrackedTool Type
Section titled “The TrackedTool Type”Each tool call is wrapped in a TrackedTool:
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = { id: string; // tool_use block ID block: ToolUseBlock; // original API block assistantMessage: AssistantMessage; status: ToolStatus; isConcurrencySafe: boolean; promise?: Promise<void>; // execution promise results?: Message[]; // completed results pendingProgress: Message[]; // progress messages (yielded immediately) contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;};State Machine
Section titled “State Machine”stateDiagram-v2 [*] --> queued: addTool() queued --> executing: processQueue() canExecute=true queued --> completed: abort before execution executing --> completed: tool finishes or abort during completed --> yielded: getCompletedResults() yieldsTransition: queued → executing
Section titled “Transition: queued → executing”private canExecuteTool(isConcurrencySafe: boolean): boolean { const executingTools = this.tools.filter(t => t.status === 'executing'); return ( executingTools.length === 0 || (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe)) );}A tool can move from queued to executing when:
- No tools are executing — any tool can start
- Only concurrent-safe tools are executing AND the new tool is also concurrent-safe
Transition: executing → completed
Section titled “Transition: executing → completed”A tool completes when its runToolUse generator is exhausted (all results collected) OR when it’s aborted (sibling error, user interrupt, streaming fallback).
Transition: completed → yielded
Section titled “Transition: completed → yielded”Results are yielded in order via getCompletedResults(). A completed tool’s results are only yielded when all preceding tools have been yielded first (for non-concurrent tools) or immediately (for concurrent tools, but respecting the overall queue order).
Adding Tools During Streaming
Section titled “Adding Tools During Streaming”addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { const toolDefinition = findToolByName(this.toolDefinitions, block.name);
if (!toolDefinition) { // Unknown tool → immediate "completed" with error this.tools.push({ id: block.id, block, assistantMessage, status: 'completed', isConcurrencySafe: true, pendingProgress: [], results: [createUserMessage({ content: [{ type: 'tool_result', content: `<tool_use_error>Error: No such tool: ${block.name}</tool_use_error>`, is_error: true, tool_use_id: block.id, }], })], }); return; }
// Determine concurrency safety from parsed input const parsedInput = toolDefinition.inputSchema.safeParse(block.input); const isConcurrencySafe = parsedInput?.success ? (() => { try { return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data)); } catch { return false; } })() : false;
this.tools.push({ id: block.id, block, assistantMessage, status: 'queued', isConcurrencySafe, pendingProgress: [], });
void this.processQueue();}Queue Processing
Section titled “Queue Processing”private async processQueue(): Promise<void> { for (const tool of this.tools) { if (tool.status !== 'queued') continue;
if (this.canExecuteTool(tool.isConcurrencySafe)) { await this.executeTool(tool); } else { // Non-concurrent tool must wait for exclusive access if (!tool.isConcurrencySafe) break; } }}The break on non-concurrent tools is crucial: it maintains execution order. If tools arrive as [Read, Read, Write, Read], the executor runs:
- Both Reads concurrently
- Write alone (after Reads complete)
- Last Read alone (after Write completes)
Tool Execution
Section titled “Tool Execution”private async executeTool(tool: TrackedTool): Promise<void> { tool.status = 'executing'; this.toolUseContext.setInProgressToolUseIDs(prev => new Set(prev).add(tool.id)); this.updateInterruptibleState();
const collectResults = async () => { // Check for pre-existing abort const initialAbortReason = this.getAbortReason(tool); if (initialAbortReason) { messages.push(this.createSyntheticErrorMessage(tool.id, initialAbortReason, ...)); tool.status = 'completed'; return; }
// Create per-tool abort controller const toolAbortController = createChildAbortController(this.siblingAbortController);
// Bubble up non-sibling aborts to the query controller toolAbortController.signal.addEventListener('abort', () => { if (toolAbortController.signal.reason !== 'sibling_error' && !this.toolUseContext.abortController.signal.aborted && !this.discarded) { this.toolUseContext.abortController.abort(toolAbortController.signal.reason); } }, { once: true });
// Execute via runToolUse generator const generator = runToolUse( tool.block, tool.assistantMessage, this.canUseTool, { ...this.toolUseContext, abortController: toolAbortController }, );
for await (const update of generator) { // Check for mid-execution abort const abortReason = this.getAbortReason(tool); if (abortReason && !thisToolErrored) { messages.push(this.createSyntheticErrorMessage(...)); break; }
// Handle Bash errors → cascade to siblings if (isErrorResult && tool.block.name === BASH_TOOL_NAME) { this.hasErrored = true; this.siblingAbortController.abort('sibling_error'); }
// Route progress messages for immediate yielding if (update.message.type === 'progress') { tool.pendingProgress.push(update.message); this.progressAvailableResolve?.(); } else { messages.push(update.message); } }
tool.results = messages; tool.status = 'completed'; };
const promise = collectResults(); tool.promise = promise;
// Re-trigger queue when this tool completes void promise.finally(() => void this.processQueue());}Abort Controller Hierarchy
Section titled “Abort Controller Hierarchy”graph TD A[toolUseContext.abortController] --> B[siblingAbortController] B --> C[toolAbortController - Tool 1] B --> D[toolAbortController - Tool 2] B --> E[toolAbortController - Tool N]
style A fill:#f9f,stroke:#333 style B fill:#ff9,stroke:#333- Parent (toolUseContext): Aborting this ends the entire turn
- Sibling: Aborting this kills all tool subprocesses but NOT the turn
- Per-tool: Aborting this kills one tool’s subprocess
The bubble-up logic ensures that permission denials (which abort the per-tool controller) propagate to the parent — ending the turn. Without this, the query loop would continue with partial results.
Result Yielding
Section titled “Result Yielding”During Streaming: getCompletedResults()
Section titled “During Streaming: getCompletedResults()”A synchronous generator that yields completed results without blocking:
*getCompletedResults(): Generator<MessageUpdate, void> { if (this.discarded) return;
for (const tool of this.tools) { // Always yield progress immediately while (tool.pendingProgress.length > 0) { yield { message: tool.pendingProgress.shift()!, newContext: this.toolUseContext }; }
if (tool.status === 'yielded') continue;
if (tool.status === 'completed' && tool.results) { tool.status = 'yielded'; for (const message of tool.results) { yield { message, newContext: this.toolUseContext }; } markToolUseAsComplete(this.toolUseContext, tool.id); } else if (tool.status === 'executing' && !tool.isConcurrencySafe) { break; // Can't yield past an executing non-concurrent tool } }}After Streaming: getRemainingResults()
Section titled “After Streaming: getRemainingResults()”An async generator that waits for all tools to complete:
async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> { if (this.discarded) return;
while (this.hasUnfinishedTools()) { await this.processQueue(); for (const result of this.getCompletedResults()) yield result;
// Wait for any tool to complete OR progress to arrive if (this.hasExecutingTools() && !this.hasCompletedResults() && !this.hasPendingProgress()) { const executingPromises = this.tools .filter(t => t.status === 'executing' && t.promise) .map(t => t.promise!);
const progressPromise = new Promise<void>(resolve => { this.progressAvailableResolve = resolve; });
if (executingPromises.length > 0) { await Promise.race([...executingPromises, progressPromise]); } } }
// Final flush for (const result of this.getCompletedResults()) yield result;}The Promise.race pattern ensures the loop wakes up on EITHER tool completion OR progress availability — enabling real-time progress display while waiting for slow tools.
Error Handling
Section titled “Error Handling”Synthetic Error Messages
Section titled “Synthetic Error Messages”When a tool is aborted, it receives a synthetic error result to maintain the API invariant:
private createSyntheticErrorMessage( toolUseId: string, reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback', assistantMessage: AssistantMessage,): Message { if (reason === 'user_interrupted') { return createUserMessage({ content: [{ type: 'tool_result', content: withMemoryCorrectionHint(REJECT_MESSAGE), is_error: true, tool_use_id: toolUseId, }], toolUseResult: 'User rejected tool use', }); } // ... similar for sibling_error and streaming_fallback}Interrupt Behavior
Section titled “Interrupt Behavior”Tools declare how they handle user interrupts:
private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' { const definition = findToolByName(this.toolDefinitions, tool.block.name); if (!definition?.interruptBehavior) return 'block'; // default: keep running try { return definition.interruptBehavior(); } catch { return 'block'; }}When a user submits a new message (reason === 'interrupt'):
canceltools are immediately aborted with a synthetic errorblocktools keep running; the new message waits for them to finish
The Discard Mechanism
Section titled “The Discard Mechanism”When a streaming fallback occurs (model switch mid-stream), the entire executor is discarded:
discard(): void { this.discarded = true;}After discard:
addToolstill works but tools go straight tocompletedwith fallback errorsgetCompletedResultsandgetRemainingResultsreturn immediately- The query loop creates a fresh executor for the retry