Skip to content

Streaming Executor

The StreamingToolExecutor is Claude Code’s primary tool execution engine. Located at src/services/tools/StreamingToolExecutor.ts, it overlaps tool execution with API streaming — starting tools before the model finishes responding. This chapter walks through its internals in detail.

In a typical agentic turn, the model may emit multiple tool calls. Without streaming execution:

API streams tool_use A ──────────────┐
API streams tool_use B ──────────────┤
API streams text ────────────────────┤
API done ────────────────────────────┘
Execute tool A ────────────── 2s
Execute tool B ────────────── 1.5s
Total: API time + 3.5s

With streaming execution:

API streams tool_use A ──────┐
Execute tool A starts ─────┤────── 2s
API streams tool_use B ──────┤
Execute tool B starts ─────┤────── 1.5s
API streams text ────────────┤
API done ────────────────────┘
Tool A completes ──────────┘
Tool B completes ───────────┘
Total: API time + max(0, tools - overlap)

The savings are significant: 40-80% of tool execution time can overlap with streaming.

src/services/tools/StreamingToolExecutor.ts
export class StreamingToolExecutor {
private tools: TrackedTool[] = [];
private toolUseContext: ToolUseContext;
private hasErrored = false;
private erroredToolDescription = '';
private siblingAbortController: AbortController;
private discarded = false;
private progressAvailableResolve?: () => void;
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {
this.toolUseContext = toolUseContext;
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
);
}
}

Key fields:

  • tools: Ordered list of all tracked tools
  • siblingAbortController: Child controller for Bash error cascading
  • hasErrored: Flag to cancel remaining tools on Bash error
  • discarded: Flag for streaming fallback (entire response abandoned)
  • progressAvailableResolve: Promise resolver for progress notification

Each tool call is wrapped in a TrackedTool:

type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = {
id: string; // tool_use block ID
block: ToolUseBlock; // original API block
assistantMessage: AssistantMessage;
status: ToolStatus;
isConcurrencySafe: boolean;
promise?: Promise<void>; // execution promise
results?: Message[]; // completed results
pendingProgress: Message[]; // progress messages (yielded immediately)
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;
};
stateDiagram-v2
[*] --> queued: addTool()
queued --> executing: processQueue() canExecute=true
queued --> completed: abort before execution
executing --> completed: tool finishes or abort during
completed --> yielded: getCompletedResults() yields
private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing');
return (
executingTools.length === 0 ||
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
);
}

A tool can move from queued to executing when:

  1. No tools are executing — any tool can start
  2. Only concurrent-safe tools are executing AND the new tool is also concurrent-safe

A tool completes when its runToolUse generator is exhausted (all results collected) OR when it’s aborted (sibling error, user interrupt, streaming fallback).

Results are yielded in order via getCompletedResults(). A completed tool’s results are only yielded when all preceding tools have been yielded first (for non-concurrent tools) or immediately (for concurrent tools, but respecting the overall queue order).

addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name);
if (!toolDefinition) {
// Unknown tool → immediate "completed" with error
this.tools.push({
id: block.id, block, assistantMessage,
status: 'completed',
isConcurrencySafe: true,
pendingProgress: [],
results: [createUserMessage({
content: [{
type: 'tool_result',
content: `<tool_use_error>Error: No such tool: ${block.name}</tool_use_error>`,
is_error: true,
tool_use_id: block.id,
}],
})],
});
return;
}
// Determine concurrency safety from parsed input
const parsedInput = toolDefinition.inputSchema.safeParse(block.input);
const isConcurrencySafe = parsedInput?.success
? (() => {
try { return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data)); }
catch { return false; }
})()
: false;
this.tools.push({
id: block.id, block, assistantMessage,
status: 'queued', isConcurrencySafe,
pendingProgress: [],
});
void this.processQueue();
}
private async processQueue(): Promise<void> {
for (const tool of this.tools) {
if (tool.status !== 'queued') continue;
if (this.canExecuteTool(tool.isConcurrencySafe)) {
await this.executeTool(tool);
} else {
// Non-concurrent tool must wait for exclusive access
if (!tool.isConcurrencySafe) break;
}
}
}

The break on non-concurrent tools is crucial: it maintains execution order. If tools arrive as [Read, Read, Write, Read], the executor runs:

  1. Both Reads concurrently
  2. Write alone (after Reads complete)
  3. Last Read alone (after Write completes)
private async executeTool(tool: TrackedTool): Promise<void> {
tool.status = 'executing';
this.toolUseContext.setInProgressToolUseIDs(prev => new Set(prev).add(tool.id));
this.updateInterruptibleState();
const collectResults = async () => {
// Check for pre-existing abort
const initialAbortReason = this.getAbortReason(tool);
if (initialAbortReason) {
messages.push(this.createSyntheticErrorMessage(tool.id, initialAbortReason, ...));
tool.status = 'completed';
return;
}
// Create per-tool abort controller
const toolAbortController = createChildAbortController(this.siblingAbortController);
// Bubble up non-sibling aborts to the query controller
toolAbortController.signal.addEventListener('abort', () => {
if (toolAbortController.signal.reason !== 'sibling_error' &&
!this.toolUseContext.abortController.signal.aborted &&
!this.discarded) {
this.toolUseContext.abortController.abort(toolAbortController.signal.reason);
}
}, { once: true });
// Execute via runToolUse generator
const generator = runToolUse(
tool.block, tool.assistantMessage, this.canUseTool,
{ ...this.toolUseContext, abortController: toolAbortController },
);
for await (const update of generator) {
// Check for mid-execution abort
const abortReason = this.getAbortReason(tool);
if (abortReason && !thisToolErrored) {
messages.push(this.createSyntheticErrorMessage(...));
break;
}
// Handle Bash errors → cascade to siblings
if (isErrorResult && tool.block.name === BASH_TOOL_NAME) {
this.hasErrored = true;
this.siblingAbortController.abort('sibling_error');
}
// Route progress messages for immediate yielding
if (update.message.type === 'progress') {
tool.pendingProgress.push(update.message);
this.progressAvailableResolve?.();
} else {
messages.push(update.message);
}
}
tool.results = messages;
tool.status = 'completed';
};
const promise = collectResults();
tool.promise = promise;
// Re-trigger queue when this tool completes
void promise.finally(() => void this.processQueue());
}
graph TD
A[toolUseContext.abortController] --> B[siblingAbortController]
B --> C[toolAbortController - Tool 1]
B --> D[toolAbortController - Tool 2]
B --> E[toolAbortController - Tool N]
style A fill:#f9f,stroke:#333
style B fill:#ff9,stroke:#333
  • Parent (toolUseContext): Aborting this ends the entire turn
  • Sibling: Aborting this kills all tool subprocesses but NOT the turn
  • Per-tool: Aborting this kills one tool’s subprocess

The bubble-up logic ensures that permission denials (which abort the per-tool controller) propagate to the parent — ending the turn. Without this, the query loop would continue with partial results.

A synchronous generator that yields completed results without blocking:

*getCompletedResults(): Generator<MessageUpdate, void> {
if (this.discarded) return;
for (const tool of this.tools) {
// Always yield progress immediately
while (tool.pendingProgress.length > 0) {
yield { message: tool.pendingProgress.shift()!, newContext: this.toolUseContext };
}
if (tool.status === 'yielded') continue;
if (tool.status === 'completed' && tool.results) {
tool.status = 'yielded';
for (const message of tool.results) {
yield { message, newContext: this.toolUseContext };
}
markToolUseAsComplete(this.toolUseContext, tool.id);
} else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
break; // Can't yield past an executing non-concurrent tool
}
}
}

An async generator that waits for all tools to complete:

async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
if (this.discarded) return;
while (this.hasUnfinishedTools()) {
await this.processQueue();
for (const result of this.getCompletedResults()) yield result;
// Wait for any tool to complete OR progress to arrive
if (this.hasExecutingTools() && !this.hasCompletedResults() && !this.hasPendingProgress()) {
const executingPromises = this.tools
.filter(t => t.status === 'executing' && t.promise)
.map(t => t.promise!);
const progressPromise = new Promise<void>(resolve => {
this.progressAvailableResolve = resolve;
});
if (executingPromises.length > 0) {
await Promise.race([...executingPromises, progressPromise]);
}
}
}
// Final flush
for (const result of this.getCompletedResults()) yield result;
}

The Promise.race pattern ensures the loop wakes up on EITHER tool completion OR progress availability — enabling real-time progress display while waiting for slow tools.

When a tool is aborted, it receives a synthetic error result to maintain the API invariant:

private createSyntheticErrorMessage(
toolUseId: string,
reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
assistantMessage: AssistantMessage,
): Message {
if (reason === 'user_interrupted') {
return createUserMessage({
content: [{
type: 'tool_result',
content: withMemoryCorrectionHint(REJECT_MESSAGE),
is_error: true,
tool_use_id: toolUseId,
}],
toolUseResult: 'User rejected tool use',
});
}
// ... similar for sibling_error and streaming_fallback
}

Tools declare how they handle user interrupts:

private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' {
const definition = findToolByName(this.toolDefinitions, tool.block.name);
if (!definition?.interruptBehavior) return 'block'; // default: keep running
try {
return definition.interruptBehavior();
} catch {
return 'block';
}
}

When a user submits a new message (reason === 'interrupt'):

  • cancel tools are immediately aborted with a synthetic error
  • block tools keep running; the new message waits for them to finish

When a streaming fallback occurs (model switch mid-stream), the entire executor is discarded:

discard(): void {
this.discarded = true;
}

After discard:

  • addTool still works but tools go straight to completed with fallback errors
  • getCompletedResults and getRemainingResults return immediately
  • The query loop creates a fresh executor for the retry