跳转到内容

Pattern: Streaming Tool Execution

传统的 LLM tool 执行遵循严格的顺序:等待完整的 API 响应,解析 JSON 中的 tool 调用,然后逐一执行。Streaming Tool Execution 打破了这一壁垒 —— 它通过增量解析 JSON,在 API 仍在 streaming 时就开始执行 tool。

sequenceDiagram
    participant API as Claude API
    participant Parser as Stream Parser
    participant Executor as Tool Executor
    participant UI as Terminal UI

    API->>Parser: {"type":"tool_use","name":"Read
    API->>Parser: File","input":{"path":"/src
    Parser-->>UI: Show: "Reading /src..."
    API->>Parser: /index.ts"}}
    Parser->>Executor: Execute ReadFile({path: "/src/index.ts"})
    Note over Executor: Tool starts BEFORE API stream ends
    API->>Parser: (more content blocks...)
    Executor-->>Parser: File content result
    API->>Parser: [stream end]
    Note over Parser: All tools already executed!

Claude Code 的 StreamingToolExecutor 管理着一套复杂的并发模型,平衡”尽早开始”与”正确执行”之间的内在张力。

stateDiagram-v2
    [*] --> Idle

    Idle --> Buffering: content_block_start (tool_use)
    Buffering --> Buffering: content_block_delta (partial JSON)
    Buffering --> Ready: content_block_stop (JSON complete)
    Buffering --> Ready: JSON parseable early

    Ready --> Executing: Permission granted
    Ready --> Blocked: Permission required
    Blocked --> Executing: User approves
    Blocked --> Skipped: User denies

    Executing --> Complete: Tool returns result
    Executing --> Failed: Tool throws error

    Complete --> [*]
    Failed --> [*]
    Skipped --> [*]
interface ExecutionStrategy {
// 策略 1:顺序 —— 等待每个 tool 完成
sequential: 'one-at-a-time';
// 策略 2:并行 —— 同时执行所有就绪的 tool
parallel: 'all-at-once';
// 策略 3:Streaming —— 每个 tool 的 JSON 完整后立即执行
streaming: 'as-soon-as-ready'; // ← Claude Code 采用此策略
}
策略延迟复杂度安全性
顺序最高最安全
并行中等中等需要隔离
Streaming最低需要谨慎的状态管理

以下是 streaming tool 执行器的精炼架构:

class StreamingToolExecutor {
private pendingTools = new Map<string, ToolCallState>();
private completedTools = new Map<string, ToolResult>();
private activeExecutions = new Set<Promise<void>>();
// 对 API stream 的每个 SSE 事件调用
async processStreamEvent(event: StreamEvent): Promise<ToolExecutionEvent[]> {
const events: ToolExecutionEvent[] = [];
switch (event.type) {
case 'content_block_start': {
if (event.content_block.type === 'tool_use') {
// 注册一个新的待处理 tool 调用
this.pendingTools.set(event.content_block.id, {
id: event.content_block.id,
name: event.content_block.name,
jsonBuffer: '',
status: 'buffering',
});
events.push({ type: 'tool_detected', name: event.content_block.name });
}
break;
}
case 'content_block_delta': {
if (event.delta.type === 'input_json_delta') {
const tool = this.pendingTools.get(event.index_id);
if (tool) {
tool.jsonBuffer += event.delta.partial_json;
// 尝试提前解析 —— 如果 JSON 有效,立即开始执行
const parsed = this.tryParseJSON(tool.jsonBuffer);
if (parsed !== null && tool.status === 'buffering') {
tool.status = 'ready';
tool.input = parsed;
events.push(...await this.tryExecute(tool));
}
}
}
break;
}
case 'content_block_stop': {
const tool = this.pendingTools.get(event.index_id);
if (tool && tool.status === 'buffering') {
// block 完成时的最终解析
tool.input = JSON.parse(tool.jsonBuffer);
tool.status = 'ready';
events.push(...await this.tryExecute(tool));
}
break;
}
case 'message_stop': {
// 等待所有正在执行的任务
await Promise.all(this.activeExecutions);
events.push({ type: 'all_tools_complete' });
break;
}
}
return events;
}
private tryParseJSON(partial: string): unknown | null {
try {
return JSON.parse(partial);
} catch {
return null; // 尚未是有效 JSON,继续缓冲
}
}
private async tryExecute(tool: ToolCallState): Promise<ToolExecutionEvent[]> {
const events: ToolExecutionEvent[] = [];
// 检查此 tool 是否可以无需 permission 直接执行
const permCheck = await checkPermission(tool.name, tool.input);
if (permCheck === 'allowed') {
const execution = this.executeToolAsync(tool);
this.activeExecutions.add(execution);
execution.finally(() => this.activeExecutions.delete(execution));
events.push({ type: 'tool_executing', id: tool.id, name: tool.name });
} else if (permCheck === 'needs_approval') {
tool.status = 'blocked';
events.push({ type: 'tool_needs_permission', id: tool.id, name: tool.name });
} else {
tool.status = 'denied';
events.push({ type: 'tool_denied', id: tool.id, name: tool.name });
}
return events;
}
private async executeToolAsync(tool: ToolCallState): Promise<void> {
try {
tool.status = 'executing';
const result = await executeTool(tool.name, tool.input!);
tool.status = 'complete';
this.completedTools.set(tool.id, result);
} catch (error) {
tool.status = 'failed';
this.completedTools.set(tool.id, {
type: 'error',
error: String(error),
});
}
}
}

关键挑战在于判断部分 JSON 字符串何时”足够完整”可以开始执行。Claude Code 采用了一种务实的方法:

// 简化的增量 JSON 解析器策略
class IncrementalJSONParser {
private buffer = '';
private depth = 0;
private inString = false;
private escaped = false;
// 输入部分 chunk,当发现完整值时通知
feed(chunk: string): { complete: boolean; value?: unknown } {
for (const char of chunk) {
this.buffer += char;
if (this.escaped) {
this.escaped = false;
continue;
}
if (char === '\\' && this.inString) {
this.escaped = true;
continue;
}
if (char === '"') {
this.inString = !this.inString;
continue;
}
if (this.inString) continue;
if (char === '{' || char === '[') this.depth++;
if (char === '}' || char === ']') this.depth--;
// 当深度回到 0 时,我们得到了一个完整的 JSON 值
if (this.depth === 0 && this.buffer.trim().length > 0) {
try {
const value = JSON.parse(this.buffer);
return { complete: true, value };
} catch {
// 格式不正确的 JSON,继续缓冲
}
}
}
return { complete: false };
}
}

并非所有 tool 都能并行执行。某些 tool 之间存在隐式依赖:

// Tool 依赖关系解析
interface ToolDependencyResolver {
canExecuteInParallel(toolA: ToolCall, toolB: ToolCall): boolean;
}
class FileSystemDependencyResolver implements ToolDependencyResolver {
canExecuteInParallel(a: ToolCall, b: ToolCall): boolean {
// 读 + 读:可以并行
if (a.name === 'ReadFile' && b.name === 'ReadFile') return true;
// 写 + 写同一文件:不安全
if (a.name === 'WriteFile' && b.name === 'WriteFile') {
return a.input.path !== b.input.path;
}
// 写 + 读同一文件:不安全
if (
(a.name === 'WriteFile' && b.name === 'ReadFile') ||
(a.name === 'ReadFile' && b.name === 'WriteFile')
) {
return a.input.path !== b.input.path;
}
// 默认:允许并行执行
return true;
}
}
graph LR
    subgraph "Turn: 3 tool calls"
        T1["ReadFile(/src/a.ts)"]
        T2["ReadFile(/src/b.ts)"]
        T3["WriteFile(/src/c.ts)"]
    end

    T1 -->|"parallel ✅"| T2
    T1 -->|"parallel ✅"| T3
    T2 -->|"parallel ✅"| T3

    style T1 fill:#4ade80
    style T2 fill:#4ade80
    style T3 fill:#60a5fa

Streaming 执行必须与 permission 系统集成。需要用户审批的 tool 不能提前开始:

enum ToolPermissionLevel {
// 始终允许 —— 可在 streaming 时立即开始
AlwaysAllow = 'always_allow',
// 需要一次性审批 —— 阻塞直到用户响应
RequireApproval = 'require_approval',
// 永远不允许 —— 立即拒绝
NeverAllow = 'never_allow',
}
// 在 streaming 执行器中:
async function handleToolReady(tool: ToolCallState) {
const level = getPermissionLevel(tool.name, tool.input);
switch (level) {
case ToolPermissionLevel.AlwaysAllow:
// 🚀 立即执行 —— 这就是 streaming 的优势
return executeNow(tool);
case ToolPermissionLevel.RequireApproval:
// ⏸️ 排队等待 permission —— 但其他 tool 仍可继续
return queueForApproval(tool);
case ToolPermissionLevel.NeverAllow:
// ❌ 立即拒绝
return rejectTool(tool);
}
}
// ============================================
// 可复用 Streaming Tool 执行器模板
// ============================================
type ToolStatus = 'buffering' | 'ready' | 'executing' | 'complete' | 'failed';
interface StreamingExecutorConfig {
maxConcurrency: number;
parseTimeout: number;
canAutoExecute: (toolName: string) => boolean;
}
function createStreamingExecutor(config: StreamingExecutorConfig) {
const tools = new Map<string, { status: ToolStatus; buffer: string; input?: unknown }>();
const results = new Map<string, unknown>();
const semaphore = new Semaphore(config.maxConcurrency);
return {
// 处理每个 SSE 事件
async onEvent(event: SSEEvent) {
if (event.type === 'tool_use_start') {
tools.set(event.id, { status: 'buffering', buffer: '' });
}
if (event.type === 'tool_use_delta') {
const tool = tools.get(event.id)!;
tool.buffer += event.json;
// 尝试提前执行
if (tool.status === 'buffering') {
try {
tool.input = JSON.parse(tool.buffer);
tool.status = 'ready';
if (config.canAutoExecute(event.name)) {
await semaphore.acquire();
this.execute(event.id, event.name, tool);
}
} catch { /* 尚未有效 */ }
}
}
},
async execute(id: string, name: string, tool: { status: ToolStatus; input?: unknown }) {
tool.status = 'executing';
try {
results.set(id, await runTool(name, tool.input));
tool.status = 'complete';
} catch (e) {
results.set(id, { error: e });
tool.status = 'failed';
} finally {
semaphore.release();
}
},
getResults: () => results,
};
}

典型多 tool 轮次的真实世界延迟对比:

传统方式(顺序):
API Stream: ████████████████████ 3.2s
解析 JSON: █ 0.01s
Tool 1 (ReadFile): ████ 0.8s
Tool 2 (ReadFile): ████ 0.8s
Tool 3 (Bash): ██████████ 2.1s
总计: 6.91s
Streaming 方式(重叠):
API Stream: ████████████████████ 3.2s
Tool 1: ████ 0.8s(0.4s 时启动)
Tool 2: ████ 0.8s(0.9s 时启动)
Tool 3: ██████████ 2.1s(1.5s 时启动)
总计: 3.6s ← 快 48%

AI Agent 系统

任何通过 streaming API 让 LLM 产生 tool 调用的系统。越早开始,用户越快看到结果。

构建系统

流式构建配置,独立的编译任务可在完整清单解析前就开始。

API 编排

微服务编排,独立的 API 调用可在参数已知后立即发出。

数据流水线

流式处理系统,下游阶段在上游完成前就开始处理。

陷阱描述规避方案
过早执行tool 以不完整的输入执行仅在 JSON 完全可解析时才执行
竞态条件两个 tool 修改同一资源依赖关系解析 + 文件级锁定
绕过 permission自动执行需要审批的 tool执行前检查 permission
错误级联一个 tool 失败导致其他 tool 受影响用 try/catch 隔离每个 tool 执行
内存压力过多并发 tool 执行Semaphore/并发数限制
中止时的部分结果stream 在 tool 执行中途结束跟踪正在执行的任务,中止时 await