Skip to content

Pattern: Streaming Tool Execution

Traditional LLM tool execution follows a strict sequence: wait for the complete API response, parse the JSON tool calls, then execute them one by one. Streaming Tool Execution breaks this barrier — it begins executing tools while the API is still streaming, by parsing partial JSON incrementally.

sequenceDiagram
participant API as Claude API
participant Parser as Stream Parser
participant Executor as Tool Executor
participant UI as Terminal UI
API->>Parser: {"type":"tool_use","name":"Read
API->>Parser: File","input":{"path":"/src
Parser-->>UI: Show: "Reading /src..."
API->>Parser: /index.ts"}}
Parser->>Executor: Execute ReadFile({path: "/src/index.ts"})
Note over Executor: Tool starts BEFORE API stream ends
API->>Parser: (more content blocks...)
Executor-->>Parser: File content result
API->>Parser: [stream end]
Note over Parser: All tools already executed!

Claude Code’s StreamingToolExecutor manages a sophisticated concurrency model that handles the inherent tension between “start early” and “execute correctly.”

stateDiagram-v2
[*] --> Idle
Idle --> Buffering: content_block_start (tool_use)
Buffering --> Buffering: content_block_delta (partial JSON)
Buffering --> Ready: content_block_stop (JSON complete)
Buffering --> Ready: JSON parseable early
Ready --> Executing: Permission granted
Ready --> Blocked: Permission required
Blocked --> Executing: User approves
Blocked --> Skipped: User denies
Executing --> Complete: Tool returns result
Executing --> Failed: Tool throws error
Complete --> [*]
Failed --> [*]
Skipped --> [*]
interface ExecutionStrategy {
// Strategy 1: Sequential — wait for each tool to complete
sequential: 'one-at-a-time';
// Strategy 2: Parallel — execute all ready tools simultaneously
parallel: 'all-at-once';
// Strategy 3: Streaming — execute as soon as each tool's JSON is complete
streaming: 'as-soon-as-ready'; // ← Claude Code uses this
}
StrategyLatencyComplexitySafety
SequentialHighestLowSafest
ParallelMediumMediumNeeds isolation
StreamingLowestHighNeeds careful state management

Here’s the distilled architecture of the streaming tool executor:

class StreamingToolExecutor {
private pendingTools = new Map<string, ToolCallState>();
private completedTools = new Map<string, ToolResult>();
private activeExecutions = new Set<Promise<void>>();
// Called for every SSE event from the API stream
async processStreamEvent(event: StreamEvent): Promise<ToolExecutionEvent[]> {
const events: ToolExecutionEvent[] = [];
switch (event.type) {
case 'content_block_start': {
if (event.content_block.type === 'tool_use') {
// Register a new pending tool call
this.pendingTools.set(event.content_block.id, {
id: event.content_block.id,
name: event.content_block.name,
jsonBuffer: '',
status: 'buffering',
});
events.push({ type: 'tool_detected', name: event.content_block.name });
}
break;
}
case 'content_block_delta': {
if (event.delta.type === 'input_json_delta') {
const tool = this.pendingTools.get(event.index_id);
if (tool) {
tool.jsonBuffer += event.delta.partial_json;
// Attempt early parse — if the JSON is valid, start execution
const parsed = this.tryParseJSON(tool.jsonBuffer);
if (parsed !== null && tool.status === 'buffering') {
tool.status = 'ready';
tool.input = parsed;
events.push(...await this.tryExecute(tool));
}
}
}
break;
}
case 'content_block_stop': {
const tool = this.pendingTools.get(event.index_id);
if (tool && tool.status === 'buffering') {
// Final parse on block completion
tool.input = JSON.parse(tool.jsonBuffer);
tool.status = 'ready';
events.push(...await this.tryExecute(tool));
}
break;
}
case 'message_stop': {
// Wait for all in-flight executions
await Promise.all(this.activeExecutions);
events.push({ type: 'all_tools_complete' });
break;
}
}
return events;
}
private tryParseJSON(partial: string): unknown | null {
try {
return JSON.parse(partial);
} catch {
return null; // Not yet valid JSON, keep buffering
}
}
private async tryExecute(tool: ToolCallState): Promise<ToolExecutionEvent[]> {
const events: ToolExecutionEvent[] = [];
// Check if this tool can execute without permission
const permCheck = await checkPermission(tool.name, tool.input);
if (permCheck === 'allowed') {
const execution = this.executeToolAsync(tool);
this.activeExecutions.add(execution);
execution.finally(() => this.activeExecutions.delete(execution));
events.push({ type: 'tool_executing', id: tool.id, name: tool.name });
} else if (permCheck === 'needs_approval') {
tool.status = 'blocked';
events.push({ type: 'tool_needs_permission', id: tool.id, name: tool.name });
} else {
tool.status = 'denied';
events.push({ type: 'tool_denied', id: tool.id, name: tool.name });
}
return events;
}
private async executeToolAsync(tool: ToolCallState): Promise<void> {
try {
tool.status = 'executing';
const result = await executeTool(tool.name, tool.input!);
tool.status = 'complete';
this.completedTools.set(tool.id, result);
} catch (error) {
tool.status = 'failed';
this.completedTools.set(tool.id, {
type: 'error',
error: String(error),
});
}
}
}

The key challenge is knowing when a partial JSON string is “complete enough” to start execution. Claude Code uses a pragmatic approach:

// Simplified incremental JSON parser strategy
class IncrementalJSONParser {
private buffer = '';
private depth = 0;
private inString = false;
private escaped = false;
// Feed partial chunks and get notified when a complete value is found
feed(chunk: string): { complete: boolean; value?: unknown } {
for (const char of chunk) {
this.buffer += char;
if (this.escaped) {
this.escaped = false;
continue;
}
if (char === '\\' && this.inString) {
this.escaped = true;
continue;
}
if (char === '"') {
this.inString = !this.inString;
continue;
}
if (this.inString) continue;
if (char === '{' || char === '[') this.depth++;
if (char === '}' || char === ']') this.depth--;
// When depth returns to 0, we have a complete JSON value
if (this.depth === 0 && this.buffer.trim().length > 0) {
try {
const value = JSON.parse(this.buffer);
return { complete: true, value };
} catch {
// Malformed JSON, continue buffering
}
}
}
return { complete: false };
}
}

Not all tools can execute in parallel. Some tools have implicit dependencies:

// Tool dependency resolution
interface ToolDependencyResolver {
canExecuteInParallel(toolA: ToolCall, toolB: ToolCall): boolean;
}
class FileSystemDependencyResolver implements ToolDependencyResolver {
canExecuteInParallel(a: ToolCall, b: ToolCall): boolean {
// Read + Read: safe in parallel
if (a.name === 'ReadFile' && b.name === 'ReadFile') return true;
// Write + Write to same file: NOT safe
if (a.name === 'WriteFile' && b.name === 'WriteFile') {
return a.input.path !== b.input.path;
}
// Write + Read to same file: NOT safe
if (
(a.name === 'WriteFile' && b.name === 'ReadFile') ||
(a.name === 'ReadFile' && b.name === 'WriteFile')
) {
return a.input.path !== b.input.path;
}
// Default: allow parallel execution
return true;
}
}
graph LR
subgraph "Turn: 3 tool calls"
T1["ReadFile(/src/a.ts)"]
T2["ReadFile(/src/b.ts)"]
T3["WriteFile(/src/c.ts)"]
end
T1 -->|"parallel ✅"| T2
T1 -->|"parallel ✅"| T3
T2 -->|"parallel ✅"| T3
style T1 fill:#4ade80
style T2 fill:#4ade80
style T3 fill:#60a5fa

Streaming execution must integrate with the permission system. A tool that needs user approval cannot start early:

enum ToolPermissionLevel {
// Always allowed — can start immediately during streaming
AlwaysAllow = 'always_allow',
// Needs one-time approval — blocks until user responds
RequireApproval = 'require_approval',
// Never allowed — immediately rejected
NeverAllow = 'never_allow',
}
// In the streaming executor:
async function handleToolReady(tool: ToolCallState) {
const level = getPermissionLevel(tool.name, tool.input);
switch (level) {
case ToolPermissionLevel.AlwaysAllow:
// 🚀 Execute immediately — this is the streaming advantage
return executeNow(tool);
case ToolPermissionLevel.RequireApproval:
// ⏸️ Queue for permission — but other tools can still proceed
return queueForApproval(tool);
case ToolPermissionLevel.NeverAllow:
// ❌ Reject immediately
return rejectTool(tool);
}
}
// ============================================
// Reusable Streaming Tool Executor Template
// ============================================
type ToolStatus = 'buffering' | 'ready' | 'executing' | 'complete' | 'failed';
interface StreamingExecutorConfig {
maxConcurrency: number;
parseTimeout: number;
canAutoExecute: (toolName: string) => boolean;
}
function createStreamingExecutor(config: StreamingExecutorConfig) {
const tools = new Map<string, { status: ToolStatus; buffer: string; input?: unknown }>();
const results = new Map<string, unknown>();
const semaphore = new Semaphore(config.maxConcurrency);
return {
// Process each SSE event
async onEvent(event: SSEEvent) {
if (event.type === 'tool_use_start') {
tools.set(event.id, { status: 'buffering', buffer: '' });
}
if (event.type === 'tool_use_delta') {
const tool = tools.get(event.id)!;
tool.buffer += event.json;
// Try early execution
if (tool.status === 'buffering') {
try {
tool.input = JSON.parse(tool.buffer);
tool.status = 'ready';
if (config.canAutoExecute(event.name)) {
await semaphore.acquire();
this.execute(event.id, event.name, tool);
}
} catch { /* not yet valid */ }
}
}
},
async execute(id: string, name: string, tool: { status: ToolStatus; input?: unknown }) {
tool.status = 'executing';
try {
results.set(id, await runTool(name, tool.input));
tool.status = 'complete';
} catch (e) {
results.set(id, { error: e });
tool.status = 'failed';
} finally {
semaphore.release();
}
},
getResults: () => results,
};
}

Real-world latency improvement in a typical multi-tool turn:

Traditional (Sequential):
API Stream: ████████████████████ 3.2s
Parse JSON: █ 0.01s
Tool 1 (ReadFile): ████ 0.8s
Tool 2 (ReadFile): ████ 0.8s
Tool 3 (Bash): ██████████ 2.1s
Total: 6.91s
Streaming (Overlapped):
API Stream: ████████████████████ 3.2s
Tool 1: ████ 0.8s (started at 0.4s)
Tool 2: ████ 0.8s (started at 0.9s)
Tool 3: ██████████ 2.1s (started at 1.5s)
Total: 3.6s ← 48% faster

AI Agent Systems

Any system where an LLM produces tool calls via streaming API. The earlier you start, the faster the user sees results.

Build Systems

Streaming build configurations where independent compilation tasks can start before the full manifest is parsed.

API Orchestration

Microservice orchestration where independent API calls can fire as soon as their parameters are known.

Data Pipelines

Stream processing systems where downstream stages begin before upstream completes.

PitfallDescriptionMitigation
Premature executionTool executes with incomplete inputOnly execute when JSON is fully parseable
Race conditionsTwo tools modify the same resourceDependency resolution + file-level locking
Permission bypassAuto-execute a tool that needs approvalCheck permissions before any execution
Error cascadingOne tool failure corrupts othersIsolate each tool execution in try/catch
Memory pressureToo many concurrent tool executionsSemaphore/concurrency limit
Partial results on abortStream ends mid-toolTrack in-flight executions, await on abort