AI Agent 系统
任何通过 streaming API 让 LLM 产生 tool 调用的系统。越早开始,用户越快看到结果。
传统的 LLM tool 执行遵循严格的顺序:等待完整的 API 响应,解析 JSON 中的 tool 调用,然后逐一执行。Streaming Tool Execution 打破了这一壁垒 —— 它通过增量解析 JSON,在 API 仍在 streaming 时就开始执行 tool。
sequenceDiagram
participant API as Claude API
participant Parser as Stream Parser
participant Executor as Tool Executor
participant UI as Terminal UI
API->>Parser: {"type":"tool_use","name":"Read
API->>Parser: File","input":{"path":"/src
Parser-->>UI: Show: "Reading /src..."
API->>Parser: /index.ts"}}
Parser->>Executor: Execute ReadFile({path: "/src/index.ts"})
Note over Executor: Tool starts BEFORE API stream ends
API->>Parser: (more content blocks...)
Executor-->>Parser: File content result
API->>Parser: [stream end]
Note over Parser: All tools already executed!
Claude Code 的 StreamingToolExecutor 管理着一套复杂的并发模型,平衡”尽早开始”与”正确执行”之间的内在张力。
stateDiagram-v2
[*] --> Idle
Idle --> Buffering: content_block_start (tool_use)
Buffering --> Buffering: content_block_delta (partial JSON)
Buffering --> Ready: content_block_stop (JSON complete)
Buffering --> Ready: JSON parseable early
Ready --> Executing: Permission granted
Ready --> Blocked: Permission required
Blocked --> Executing: User approves
Blocked --> Skipped: User denies
Executing --> Complete: Tool returns result
Executing --> Failed: Tool throws error
Complete --> [*]
Failed --> [*]
Skipped --> [*]
interface ExecutionStrategy { // 策略 1:顺序 —— 等待每个 tool 完成 sequential: 'one-at-a-time';
// 策略 2:并行 —— 同时执行所有就绪的 tool parallel: 'all-at-once';
// 策略 3:Streaming —— 每个 tool 的 JSON 完整后立即执行 streaming: 'as-soon-as-ready'; // ← Claude Code 采用此策略}| 策略 | 延迟 | 复杂度 | 安全性 |
|---|---|---|---|
| 顺序 | 最高 | 低 | 最安全 |
| 并行 | 中等 | 中等 | 需要隔离 |
| Streaming | 最低 | 高 | 需要谨慎的状态管理 |
以下是 streaming tool 执行器的精炼架构:
class StreamingToolExecutor { private pendingTools = new Map<string, ToolCallState>(); private completedTools = new Map<string, ToolResult>(); private activeExecutions = new Set<Promise<void>>();
// 对 API stream 的每个 SSE 事件调用 async processStreamEvent(event: StreamEvent): Promise<ToolExecutionEvent[]> { const events: ToolExecutionEvent[] = [];
switch (event.type) { case 'content_block_start': { if (event.content_block.type === 'tool_use') { // 注册一个新的待处理 tool 调用 this.pendingTools.set(event.content_block.id, { id: event.content_block.id, name: event.content_block.name, jsonBuffer: '', status: 'buffering', }); events.push({ type: 'tool_detected', name: event.content_block.name }); } break; }
case 'content_block_delta': { if (event.delta.type === 'input_json_delta') { const tool = this.pendingTools.get(event.index_id); if (tool) { tool.jsonBuffer += event.delta.partial_json;
// 尝试提前解析 —— 如果 JSON 有效,立即开始执行 const parsed = this.tryParseJSON(tool.jsonBuffer); if (parsed !== null && tool.status === 'buffering') { tool.status = 'ready'; tool.input = parsed; events.push(...await this.tryExecute(tool)); } } } break; }
case 'content_block_stop': { const tool = this.pendingTools.get(event.index_id); if (tool && tool.status === 'buffering') { // block 完成时的最终解析 tool.input = JSON.parse(tool.jsonBuffer); tool.status = 'ready'; events.push(...await this.tryExecute(tool)); } break; }
case 'message_stop': { // 等待所有正在执行的任务 await Promise.all(this.activeExecutions); events.push({ type: 'all_tools_complete' }); break; } }
return events; }
private tryParseJSON(partial: string): unknown | null { try { return JSON.parse(partial); } catch { return null; // 尚未是有效 JSON,继续缓冲 } }
private async tryExecute(tool: ToolCallState): Promise<ToolExecutionEvent[]> { const events: ToolExecutionEvent[] = [];
// 检查此 tool 是否可以无需 permission 直接执行 const permCheck = await checkPermission(tool.name, tool.input);
if (permCheck === 'allowed') { const execution = this.executeToolAsync(tool); this.activeExecutions.add(execution); execution.finally(() => this.activeExecutions.delete(execution)); events.push({ type: 'tool_executing', id: tool.id, name: tool.name }); } else if (permCheck === 'needs_approval') { tool.status = 'blocked'; events.push({ type: 'tool_needs_permission', id: tool.id, name: tool.name }); } else { tool.status = 'denied'; events.push({ type: 'tool_denied', id: tool.id, name: tool.name }); }
return events; }
private async executeToolAsync(tool: ToolCallState): Promise<void> { try { tool.status = 'executing'; const result = await executeTool(tool.name, tool.input!); tool.status = 'complete'; this.completedTools.set(tool.id, result); } catch (error) { tool.status = 'failed'; this.completedTools.set(tool.id, { type: 'error', error: String(error), }); } }}关键挑战在于判断部分 JSON 字符串何时”足够完整”可以开始执行。Claude Code 采用了一种务实的方法:
// 简化的增量 JSON 解析器策略class IncrementalJSONParser { private buffer = ''; private depth = 0; private inString = false; private escaped = false;
// 输入部分 chunk,当发现完整值时通知 feed(chunk: string): { complete: boolean; value?: unknown } { for (const char of chunk) { this.buffer += char;
if (this.escaped) { this.escaped = false; continue; }
if (char === '\\' && this.inString) { this.escaped = true; continue; }
if (char === '"') { this.inString = !this.inString; continue; }
if (this.inString) continue;
if (char === '{' || char === '[') this.depth++; if (char === '}' || char === ']') this.depth--;
// 当深度回到 0 时,我们得到了一个完整的 JSON 值 if (this.depth === 0 && this.buffer.trim().length > 0) { try { const value = JSON.parse(this.buffer); return { complete: true, value }; } catch { // 格式不正确的 JSON,继续缓冲 } } }
return { complete: false }; }}并非所有 tool 都能并行执行。某些 tool 之间存在隐式依赖:
// Tool 依赖关系解析interface ToolDependencyResolver { canExecuteInParallel(toolA: ToolCall, toolB: ToolCall): boolean;}
class FileSystemDependencyResolver implements ToolDependencyResolver { canExecuteInParallel(a: ToolCall, b: ToolCall): boolean { // 读 + 读:可以并行 if (a.name === 'ReadFile' && b.name === 'ReadFile') return true;
// 写 + 写同一文件:不安全 if (a.name === 'WriteFile' && b.name === 'WriteFile') { return a.input.path !== b.input.path; }
// 写 + 读同一文件:不安全 if ( (a.name === 'WriteFile' && b.name === 'ReadFile') || (a.name === 'ReadFile' && b.name === 'WriteFile') ) { return a.input.path !== b.input.path; }
// 默认:允许并行执行 return true; }}graph LR
subgraph "Turn: 3 tool calls"
T1["ReadFile(/src/a.ts)"]
T2["ReadFile(/src/b.ts)"]
T3["WriteFile(/src/c.ts)"]
end
T1 -->|"parallel ✅"| T2
T1 -->|"parallel ✅"| T3
T2 -->|"parallel ✅"| T3
style T1 fill:#4ade80
style T2 fill:#4ade80
style T3 fill:#60a5fa
Streaming 执行必须与 permission 系统集成。需要用户审批的 tool 不能提前开始:
enum ToolPermissionLevel { // 始终允许 —— 可在 streaming 时立即开始 AlwaysAllow = 'always_allow',
// 需要一次性审批 —— 阻塞直到用户响应 RequireApproval = 'require_approval',
// 永远不允许 —— 立即拒绝 NeverAllow = 'never_allow',}
// 在 streaming 执行器中:async function handleToolReady(tool: ToolCallState) { const level = getPermissionLevel(tool.name, tool.input);
switch (level) { case ToolPermissionLevel.AlwaysAllow: // 🚀 立即执行 —— 这就是 streaming 的优势 return executeNow(tool);
case ToolPermissionLevel.RequireApproval: // ⏸️ 排队等待 permission —— 但其他 tool 仍可继续 return queueForApproval(tool);
case ToolPermissionLevel.NeverAllow: // ❌ 立即拒绝 return rejectTool(tool); }}// ============================================// 可复用 Streaming Tool 执行器模板// ============================================
type ToolStatus = 'buffering' | 'ready' | 'executing' | 'complete' | 'failed';
interface StreamingExecutorConfig { maxConcurrency: number; parseTimeout: number; canAutoExecute: (toolName: string) => boolean;}
function createStreamingExecutor(config: StreamingExecutorConfig) { const tools = new Map<string, { status: ToolStatus; buffer: string; input?: unknown }>(); const results = new Map<string, unknown>(); const semaphore = new Semaphore(config.maxConcurrency);
return { // 处理每个 SSE 事件 async onEvent(event: SSEEvent) { if (event.type === 'tool_use_start') { tools.set(event.id, { status: 'buffering', buffer: '' }); }
if (event.type === 'tool_use_delta') { const tool = tools.get(event.id)!; tool.buffer += event.json;
// 尝试提前执行 if (tool.status === 'buffering') { try { tool.input = JSON.parse(tool.buffer); tool.status = 'ready'; if (config.canAutoExecute(event.name)) { await semaphore.acquire(); this.execute(event.id, event.name, tool); } } catch { /* 尚未有效 */ } } } },
async execute(id: string, name: string, tool: { status: ToolStatus; input?: unknown }) { tool.status = 'executing'; try { results.set(id, await runTool(name, tool.input)); tool.status = 'complete'; } catch (e) { results.set(id, { error: e }); tool.status = 'failed'; } finally { semaphore.release(); } },
getResults: () => results, };}典型多 tool 轮次的真实世界延迟对比:
传统方式(顺序): API Stream: ████████████████████ 3.2s 解析 JSON: █ 0.01s Tool 1 (ReadFile): ████ 0.8s Tool 2 (ReadFile): ████ 0.8s Tool 3 (Bash): ██████████ 2.1s 总计: 6.91s
Streaming 方式(重叠): API Stream: ████████████████████ 3.2s Tool 1: ████ 0.8s(0.4s 时启动) Tool 2: ████ 0.8s(0.9s 时启动) Tool 3: ██████████ 2.1s(1.5s 时启动) 总计: 3.6s ← 快 48%AI Agent 系统
任何通过 streaming API 让 LLM 产生 tool 调用的系统。越早开始,用户越快看到结果。
构建系统
流式构建配置,独立的编译任务可在完整清单解析前就开始。
API 编排
微服务编排,独立的 API 调用可在参数已知后立即发出。
数据流水线
流式处理系统,下游阶段在上游完成前就开始处理。
| 陷阱 | 描述 | 规避方案 |
|---|---|---|
| 过早执行 | tool 以不完整的输入执行 | 仅在 JSON 完全可解析时才执行 |
| 竞态条件 | 两个 tool 修改同一资源 | 依赖关系解析 + 文件级锁定 |
| 绕过 permission | 自动执行需要审批的 tool | 执行前检查 permission |
| 错误级联 | 一个 tool 失败导致其他 tool 受影响 | 用 try/catch 隔离每个 tool 执行 |
| 内存压力 | 过多并发 tool 执行 | Semaphore/并发数限制 |
| 中止时的部分结果 | stream 在 tool 执行中途结束 | 跟踪正在执行的任务,中止时 await |