AI Agent Systems
Any system where an LLM produces tool calls via streaming API. The earlier you start, the faster the user sees results.
Traditional LLM tool execution follows a strict sequence: wait for the complete API response, parse the JSON tool calls, then execute them one by one. Streaming Tool Execution breaks this barrier — it begins executing tools while the API is still streaming, by parsing partial JSON incrementally.
sequenceDiagram participant API as Claude API participant Parser as Stream Parser participant Executor as Tool Executor participant UI as Terminal UI
API->>Parser: {"type":"tool_use","name":"Read API->>Parser: File","input":{"path":"/src Parser-->>UI: Show: "Reading /src..." API->>Parser: /index.ts"}} Parser->>Executor: Execute ReadFile({path: "/src/index.ts"}) Note over Executor: Tool starts BEFORE API stream ends API->>Parser: (more content blocks...) Executor-->>Parser: File content result API->>Parser: [stream end] Note over Parser: All tools already executed!Claude Code’s StreamingToolExecutor manages a sophisticated concurrency model that handles the inherent tension between “start early” and “execute correctly.”
stateDiagram-v2 [*] --> Idle
Idle --> Buffering: content_block_start (tool_use) Buffering --> Buffering: content_block_delta (partial JSON) Buffering --> Ready: content_block_stop (JSON complete) Buffering --> Ready: JSON parseable early
Ready --> Executing: Permission granted Ready --> Blocked: Permission required Blocked --> Executing: User approves Blocked --> Skipped: User denies
Executing --> Complete: Tool returns result Executing --> Failed: Tool throws error
Complete --> [*] Failed --> [*] Skipped --> [*]interface ExecutionStrategy { // Strategy 1: Sequential — wait for each tool to complete sequential: 'one-at-a-time';
// Strategy 2: Parallel — execute all ready tools simultaneously parallel: 'all-at-once';
// Strategy 3: Streaming — execute as soon as each tool's JSON is complete streaming: 'as-soon-as-ready'; // ← Claude Code uses this}| Strategy | Latency | Complexity | Safety |
|---|---|---|---|
| Sequential | Highest | Low | Safest |
| Parallel | Medium | Medium | Needs isolation |
| Streaming | Lowest | High | Needs careful state management |
Here’s the distilled architecture of the streaming tool executor:
class StreamingToolExecutor { private pendingTools = new Map<string, ToolCallState>(); private completedTools = new Map<string, ToolResult>(); private activeExecutions = new Set<Promise<void>>();
// Called for every SSE event from the API stream async processStreamEvent(event: StreamEvent): Promise<ToolExecutionEvent[]> { const events: ToolExecutionEvent[] = [];
switch (event.type) { case 'content_block_start': { if (event.content_block.type === 'tool_use') { // Register a new pending tool call this.pendingTools.set(event.content_block.id, { id: event.content_block.id, name: event.content_block.name, jsonBuffer: '', status: 'buffering', }); events.push({ type: 'tool_detected', name: event.content_block.name }); } break; }
case 'content_block_delta': { if (event.delta.type === 'input_json_delta') { const tool = this.pendingTools.get(event.index_id); if (tool) { tool.jsonBuffer += event.delta.partial_json;
// Attempt early parse — if the JSON is valid, start execution const parsed = this.tryParseJSON(tool.jsonBuffer); if (parsed !== null && tool.status === 'buffering') { tool.status = 'ready'; tool.input = parsed; events.push(...await this.tryExecute(tool)); } } } break; }
case 'content_block_stop': { const tool = this.pendingTools.get(event.index_id); if (tool && tool.status === 'buffering') { // Final parse on block completion tool.input = JSON.parse(tool.jsonBuffer); tool.status = 'ready'; events.push(...await this.tryExecute(tool)); } break; }
case 'message_stop': { // Wait for all in-flight executions await Promise.all(this.activeExecutions); events.push({ type: 'all_tools_complete' }); break; } }
return events; }
private tryParseJSON(partial: string): unknown | null { try { return JSON.parse(partial); } catch { return null; // Not yet valid JSON, keep buffering } }
private async tryExecute(tool: ToolCallState): Promise<ToolExecutionEvent[]> { const events: ToolExecutionEvent[] = [];
// Check if this tool can execute without permission const permCheck = await checkPermission(tool.name, tool.input);
if (permCheck === 'allowed') { const execution = this.executeToolAsync(tool); this.activeExecutions.add(execution); execution.finally(() => this.activeExecutions.delete(execution)); events.push({ type: 'tool_executing', id: tool.id, name: tool.name }); } else if (permCheck === 'needs_approval') { tool.status = 'blocked'; events.push({ type: 'tool_needs_permission', id: tool.id, name: tool.name }); } else { tool.status = 'denied'; events.push({ type: 'tool_denied', id: tool.id, name: tool.name }); }
return events; }
private async executeToolAsync(tool: ToolCallState): Promise<void> { try { tool.status = 'executing'; const result = await executeTool(tool.name, tool.input!); tool.status = 'complete'; this.completedTools.set(tool.id, result); } catch (error) { tool.status = 'failed'; this.completedTools.set(tool.id, { type: 'error', error: String(error), }); } }}The key challenge is knowing when a partial JSON string is “complete enough” to start execution. Claude Code uses a pragmatic approach:
// Simplified incremental JSON parser strategyclass IncrementalJSONParser { private buffer = ''; private depth = 0; private inString = false; private escaped = false;
// Feed partial chunks and get notified when a complete value is found feed(chunk: string): { complete: boolean; value?: unknown } { for (const char of chunk) { this.buffer += char;
if (this.escaped) { this.escaped = false; continue; }
if (char === '\\' && this.inString) { this.escaped = true; continue; }
if (char === '"') { this.inString = !this.inString; continue; }
if (this.inString) continue;
if (char === '{' || char === '[') this.depth++; if (char === '}' || char === ']') this.depth--;
// When depth returns to 0, we have a complete JSON value if (this.depth === 0 && this.buffer.trim().length > 0) { try { const value = JSON.parse(this.buffer); return { complete: true, value }; } catch { // Malformed JSON, continue buffering } } }
return { complete: false }; }}Not all tools can execute in parallel. Some tools have implicit dependencies:
// Tool dependency resolutioninterface ToolDependencyResolver { canExecuteInParallel(toolA: ToolCall, toolB: ToolCall): boolean;}
class FileSystemDependencyResolver implements ToolDependencyResolver { canExecuteInParallel(a: ToolCall, b: ToolCall): boolean { // Read + Read: safe in parallel if (a.name === 'ReadFile' && b.name === 'ReadFile') return true;
// Write + Write to same file: NOT safe if (a.name === 'WriteFile' && b.name === 'WriteFile') { return a.input.path !== b.input.path; }
// Write + Read to same file: NOT safe if ( (a.name === 'WriteFile' && b.name === 'ReadFile') || (a.name === 'ReadFile' && b.name === 'WriteFile') ) { return a.input.path !== b.input.path; }
// Default: allow parallel execution return true; }}graph LR subgraph "Turn: 3 tool calls" T1["ReadFile(/src/a.ts)"] T2["ReadFile(/src/b.ts)"] T3["WriteFile(/src/c.ts)"] end
T1 -->|"parallel ✅"| T2 T1 -->|"parallel ✅"| T3 T2 -->|"parallel ✅"| T3
style T1 fill:#4ade80 style T2 fill:#4ade80 style T3 fill:#60a5faStreaming execution must integrate with the permission system. A tool that needs user approval cannot start early:
enum ToolPermissionLevel { // Always allowed — can start immediately during streaming AlwaysAllow = 'always_allow',
// Needs one-time approval — blocks until user responds RequireApproval = 'require_approval',
// Never allowed — immediately rejected NeverAllow = 'never_allow',}
// In the streaming executor:async function handleToolReady(tool: ToolCallState) { const level = getPermissionLevel(tool.name, tool.input);
switch (level) { case ToolPermissionLevel.AlwaysAllow: // 🚀 Execute immediately — this is the streaming advantage return executeNow(tool);
case ToolPermissionLevel.RequireApproval: // ⏸️ Queue for permission — but other tools can still proceed return queueForApproval(tool);
case ToolPermissionLevel.NeverAllow: // ❌ Reject immediately return rejectTool(tool); }}// ============================================// Reusable Streaming Tool Executor Template// ============================================
type ToolStatus = 'buffering' | 'ready' | 'executing' | 'complete' | 'failed';
interface StreamingExecutorConfig { maxConcurrency: number; parseTimeout: number; canAutoExecute: (toolName: string) => boolean;}
function createStreamingExecutor(config: StreamingExecutorConfig) { const tools = new Map<string, { status: ToolStatus; buffer: string; input?: unknown }>(); const results = new Map<string, unknown>(); const semaphore = new Semaphore(config.maxConcurrency);
return { // Process each SSE event async onEvent(event: SSEEvent) { if (event.type === 'tool_use_start') { tools.set(event.id, { status: 'buffering', buffer: '' }); }
if (event.type === 'tool_use_delta') { const tool = tools.get(event.id)!; tool.buffer += event.json;
// Try early execution if (tool.status === 'buffering') { try { tool.input = JSON.parse(tool.buffer); tool.status = 'ready'; if (config.canAutoExecute(event.name)) { await semaphore.acquire(); this.execute(event.id, event.name, tool); } } catch { /* not yet valid */ } } } },
async execute(id: string, name: string, tool: { status: ToolStatus; input?: unknown }) { tool.status = 'executing'; try { results.set(id, await runTool(name, tool.input)); tool.status = 'complete'; } catch (e) { results.set(id, { error: e }); tool.status = 'failed'; } finally { semaphore.release(); } },
getResults: () => results, };}Real-world latency improvement in a typical multi-tool turn:
Traditional (Sequential): API Stream: ████████████████████ 3.2s Parse JSON: █ 0.01s Tool 1 (ReadFile): ████ 0.8s Tool 2 (ReadFile): ████ 0.8s Tool 3 (Bash): ██████████ 2.1s Total: 6.91s
Streaming (Overlapped): API Stream: ████████████████████ 3.2s Tool 1: ████ 0.8s (started at 0.4s) Tool 2: ████ 0.8s (started at 0.9s) Tool 3: ██████████ 2.1s (started at 1.5s) Total: 3.6s ← 48% fasterAI Agent Systems
Any system where an LLM produces tool calls via streaming API. The earlier you start, the faster the user sees results.
Build Systems
Streaming build configurations where independent compilation tasks can start before the full manifest is parsed.
API Orchestration
Microservice orchestration where independent API calls can fire as soon as their parameters are known.
Data Pipelines
Stream processing systems where downstream stages begin before upstream completes.
| Pitfall | Description | Mitigation |
|---|---|---|
| Premature execution | Tool executes with incomplete input | Only execute when JSON is fully parseable |
| Race conditions | Two tools modify the same resource | Dependency resolution + file-level locking |
| Permission bypass | Auto-execute a tool that needs approval | Check permissions before any execution |
| Error cascading | One tool failure corrupts others | Isolate each tool execution in try/catch |
| Memory pressure | Too many concurrent tool executions | Semaphore/concurrency limit |
| Partial results on abort | Stream ends mid-tool | Track in-flight executions, await on abort |