Streaming Executor
StreamingToolExecutor 是 Claude Code 的主要 tool 执行引擎。它位于 src/services/tools/StreamingToolExecutor.ts,将 tool 执行与 API streaming 重叠——在模型完成响应之前就开始启动 tool。本章详细介绍其内部实现。
为什么需要 Streaming 执行?
Section titled “为什么需要 Streaming 执行?”在典型的 agentic 轮次中,模型可能发出多个 tool 调用。没有 streaming 执行时:
API streams tool_use A ──────────────┐API streams tool_use B ──────────────┤API streams text ────────────────────┤API done ────────────────────────────┘Execute tool A ────────────── 2sExecute tool B ────────────── 1.5s Total: API time + 3.5s有 streaming 执行时:
API streams tool_use A ──────┐ Execute tool A starts ─────┤────── 2sAPI streams tool_use B ──────┤ Execute tool B starts ─────┤────── 1.5sAPI streams text ────────────┤API done ────────────────────┘ Tool A completes ──────────┘ Tool B completes ───────────┘ Total: API time + max(0, tools - overlap)节省效果显著:40-80% 的 tool 执行时间可与 streaming 重叠。
export class StreamingToolExecutor { private tools: TrackedTool[] = []; private toolUseContext: ToolUseContext; private hasErrored = false; private erroredToolDescription = ''; private siblingAbortController: AbortController; private discarded = false; private progressAvailableResolve?: () => void;
constructor( private readonly toolDefinitions: Tools, private readonly canUseTool: CanUseToolFn, toolUseContext: ToolUseContext, ) { this.toolUseContext = toolUseContext; this.siblingAbortController = createChildAbortController( toolUseContext.abortController, ); }}关键字段:
tools:所有已追踪 tool 的有序列表siblingAbortController:用于 Bash 错误级联的子控制器hasErrored:Bash 错误时取消剩余 tool 的标志discarded:streaming 回退(整个响应被丢弃)的标志progressAvailableResolve:进度通知的 Promise resolver
TrackedTool 类型
Section titled “TrackedTool 类型”每个 tool 调用被包装为 TrackedTool:
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = { id: string; // tool_use block ID block: ToolUseBlock; // original API block assistantMessage: AssistantMessage; status: ToolStatus; isConcurrencySafe: boolean; promise?: Promise<void>; // execution promise results?: Message[]; // completed results pendingProgress: Message[]; // progress messages (yielded immediately) contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;};stateDiagram-v2
[*] --> queued: addTool()
queued --> executing: processQueue() canExecute=true
queued --> completed: abort before execution
executing --> completed: tool finishes or abort during
completed --> yielded: getCompletedResults() yields
转换:queued → executing
Section titled “转换:queued → executing”private canExecuteTool(isConcurrencySafe: boolean): boolean { const executingTools = this.tools.filter(t => t.status === 'executing'); return ( executingTools.length === 0 || (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe)) );}tool 从 queued 转为 executing 的条件:
- 没有 tool 正在执行——任何 tool 均可启动
- 仅有并发安全的 tool 在执行,且新 tool 也是并发安全的
转换:executing → completed
Section titled “转换:executing → completed”tool 在其 runToolUse 生成器耗尽(所有结果已收集)或被中止(兄弟错误、用户中断、streaming 回退)时完成。
转换:completed → yielded
Section titled “转换:completed → yielded”结果通过 getCompletedResults() 按顺序提交。对于非并发 tool,已完成 tool 的结果仅在所有前序 tool 均已提交后才会提交;对于并发 tool,则立即提交(但遵循整体队列顺序)。
在 Streaming 期间添加 Tool
Section titled “在 Streaming 期间添加 Tool”addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { const toolDefinition = findToolByName(this.toolDefinitions, block.name);
if (!toolDefinition) { // Unknown tool → immediate "completed" with error this.tools.push({ id: block.id, block, assistantMessage, status: 'completed', isConcurrencySafe: true, pendingProgress: [], results: [createUserMessage({ content: [{ type: 'tool_result', content: `<tool_use_error>Error: No such tool: ${block.name}</tool_use_error>`, is_error: true, tool_use_id: block.id, }], })], }); return; }
// Determine concurrency safety from parsed input const parsedInput = toolDefinition.inputSchema.safeParse(block.input); const isConcurrencySafe = parsedInput?.success ? (() => { try { return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data)); } catch { return false; } })() : false;
this.tools.push({ id: block.id, block, assistantMessage, status: 'queued', isConcurrencySafe, pendingProgress: [], });
void this.processQueue();}private async processQueue(): Promise<void> { for (const tool of this.tools) { if (tool.status !== 'queued') continue;
if (this.canExecuteTool(tool.isConcurrencySafe)) { await this.executeTool(tool); } else { // Non-concurrent tool must wait for exclusive access if (!tool.isConcurrencySafe) break; } }}对非并发 tool 的 break 至关重要:它维持了执行顺序。如果 tool 按 [Read, Read, Write, Read] 顺序到达,执行器会:
- 并发运行两个 Read
- 单独运行 Write(等待 Read 完成后)
- 单独运行最后一个 Read(等待 Write 完成后)
Tool 执行
Section titled “Tool 执行”private async executeTool(tool: TrackedTool): Promise<void> { tool.status = 'executing'; this.toolUseContext.setInProgressToolUseIDs(prev => new Set(prev).add(tool.id)); this.updateInterruptibleState();
const collectResults = async () => { // Check for pre-existing abort const initialAbortReason = this.getAbortReason(tool); if (initialAbortReason) { messages.push(this.createSyntheticErrorMessage(tool.id, initialAbortReason, ...)); tool.status = 'completed'; return; }
// Create per-tool abort controller const toolAbortController = createChildAbortController(this.siblingAbortController);
// Bubble up non-sibling aborts to the query controller toolAbortController.signal.addEventListener('abort', () => { if (toolAbortController.signal.reason !== 'sibling_error' && !this.toolUseContext.abortController.signal.aborted && !this.discarded) { this.toolUseContext.abortController.abort(toolAbortController.signal.reason); } }, { once: true });
// Execute via runToolUse generator const generator = runToolUse( tool.block, tool.assistantMessage, this.canUseTool, { ...this.toolUseContext, abortController: toolAbortController }, );
for await (const update of generator) { // Check for mid-execution abort const abortReason = this.getAbortReason(tool); if (abortReason && !thisToolErrored) { messages.push(this.createSyntheticErrorMessage(...)); break; }
// Handle Bash errors → cascade to siblings if (isErrorResult && tool.block.name === BASH_TOOL_NAME) { this.hasErrored = true; this.siblingAbortController.abort('sibling_error'); }
// Route progress messages for immediate yielding if (update.message.type === 'progress') { tool.pendingProgress.push(update.message); this.progressAvailableResolve?.(); } else { messages.push(update.message); } }
tool.results = messages; tool.status = 'completed'; };
const promise = collectResults(); tool.promise = promise;
// Re-trigger queue when this tool completes void promise.finally(() => void this.processQueue());}AbortController 层级
Section titled “AbortController 层级”graph TD
A[toolUseContext.abortController] --> B[siblingAbortController]
B --> C[toolAbortController - Tool 1]
B --> D[toolAbortController - Tool 2]
B --> E[toolAbortController - Tool N]
style A fill:#f9f,stroke:#333
style B fill:#ff9,stroke:#333
- 父级(toolUseContext):中止此控制器会结束整个轮次
- 兄弟级:中止此控制器会终止所有 tool 子进程,但不结束轮次
- 单 tool 级:中止此控制器会终止一个 tool 的子进程
冒泡逻辑确保 permission 拒绝(会中止单 tool 控制器)传播到父级——从而结束轮次。否则,query 循环会在部分结果下继续运行。
Streaming 期间:getCompletedResults()
Section titled “Streaming 期间:getCompletedResults()”一个同步生成器,不阻塞地提交已完成的结果:
*getCompletedResults(): Generator<MessageUpdate, void> { if (this.discarded) return;
for (const tool of this.tools) { // Always yield progress immediately while (tool.pendingProgress.length > 0) { yield { message: tool.pendingProgress.shift()!, newContext: this.toolUseContext }; }
if (tool.status === 'yielded') continue;
if (tool.status === 'completed' && tool.results) { tool.status = 'yielded'; for (const message of tool.results) { yield { message, newContext: this.toolUseContext }; } markToolUseAsComplete(this.toolUseContext, tool.id); } else if (tool.status === 'executing' && !tool.isConcurrencySafe) { break; // Can't yield past an executing non-concurrent tool } }}Streaming 结束后:getRemainingResults()
Section titled “Streaming 结束后:getRemainingResults()”一个等待所有 tool 完成的异步生成器:
async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> { if (this.discarded) return;
while (this.hasUnfinishedTools()) { await this.processQueue(); for (const result of this.getCompletedResults()) yield result;
// Wait for any tool to complete OR progress to arrive if (this.hasExecutingTools() && !this.hasCompletedResults() && !this.hasPendingProgress()) { const executingPromises = this.tools .filter(t => t.status === 'executing' && t.promise) .map(t => t.promise!);
const progressPromise = new Promise<void>(resolve => { this.progressAvailableResolve = resolve; });
if (executingPromises.length > 0) { await Promise.race([...executingPromises, progressPromise]); } } }
// Final flush for (const result of this.getCompletedResults()) yield result;}Promise.race 模式确保循环在 tool 完成或进度到达时任意一种情况下都会唤醒——在等待慢速 tool 时仍能实时显示进度。
合成错误消息
Section titled “合成错误消息”当 tool 被中止时,它会收到一个合成错误结果,以维持 API 不变量:
private createSyntheticErrorMessage( toolUseId: string, reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback', assistantMessage: AssistantMessage,): Message { if (reason === 'user_interrupted') { return createUserMessage({ content: [{ type: 'tool_result', content: withMemoryCorrectionHint(REJECT_MESSAGE), is_error: true, tool_use_id: toolUseId, }], toolUseResult: 'User rejected tool use', }); } // ... similar for sibling_error and streaming_fallback}Tool 声明其处理用户中断的方式:
private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' { const definition = findToolByName(this.toolDefinitions, tool.block.name); if (!definition?.interruptBehavior) return 'block'; // default: keep running try { return definition.interruptBehavior(); } catch { return 'block'; }}当用户提交新消息(reason === 'interrupt')时:
canceltool 立即被中止并收到合成错误blocktool 继续运行;新消息等待它们完成
当发生 streaming 回退(stream 中途切换模型)时,整个执行器被丢弃:
discard(): void { this.discarded = true;}丢弃后:
addTool仍然有效,但 tool 会直接进入completed状态并带有回退错误getCompletedResults和getRemainingResults立即返回- query 循环为重试创建一个新的执行器