跳转到内容

Streaming Executor

StreamingToolExecutor 是 Claude Code 的主要 tool 执行引擎。它位于 src/services/tools/StreamingToolExecutor.ts,将 tool 执行与 API streaming 重叠——在模型完成响应之前就开始启动 tool。本章详细介绍其内部实现。

在典型的 agentic 轮次中,模型可能发出多个 tool 调用。没有 streaming 执行时:

API streams tool_use A ──────────────┐
API streams tool_use B ──────────────┤
API streams text ────────────────────┤
API done ────────────────────────────┘
Execute tool A ────────────── 2s
Execute tool B ────────────── 1.5s
Total: API time + 3.5s

有 streaming 执行时:

API streams tool_use A ──────┐
Execute tool A starts ─────┤────── 2s
API streams tool_use B ──────┤
Execute tool B starts ─────┤────── 1.5s
API streams text ────────────┤
API done ────────────────────┘
Tool A completes ──────────┘
Tool B completes ───────────┘
Total: API time + max(0, tools - overlap)

节省效果显著:40-80% 的 tool 执行时间可与 streaming 重叠。

src/services/tools/StreamingToolExecutor.ts
export class StreamingToolExecutor {
private tools: TrackedTool[] = [];
private toolUseContext: ToolUseContext;
private hasErrored = false;
private erroredToolDescription = '';
private siblingAbortController: AbortController;
private discarded = false;
private progressAvailableResolve?: () => void;
constructor(
private readonly toolDefinitions: Tools,
private readonly canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
) {
this.toolUseContext = toolUseContext;
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
);
}
}

关键字段:

  • tools:所有已追踪 tool 的有序列表
  • siblingAbortController:用于 Bash 错误级联的子控制器
  • hasErrored:Bash 错误时取消剩余 tool 的标志
  • discarded:streaming 回退(整个响应被丢弃)的标志
  • progressAvailableResolve:进度通知的 Promise resolver

每个 tool 调用被包装为 TrackedTool

type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = {
id: string; // tool_use block ID
block: ToolUseBlock; // original API block
assistantMessage: AssistantMessage;
status: ToolStatus;
isConcurrencySafe: boolean;
promise?: Promise<void>; // execution promise
results?: Message[]; // completed results
pendingProgress: Message[]; // progress messages (yielded immediately)
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;
};
stateDiagram-v2
    [*] --> queued: addTool()
    queued --> executing: processQueue() canExecute=true
    queued --> completed: abort before execution
    executing --> completed: tool finishes or abort during
    completed --> yielded: getCompletedResults() yields
private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing');
return (
executingTools.length === 0 ||
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
);
}

tool 从 queued 转为 executing 的条件:

  1. 没有 tool 正在执行——任何 tool 均可启动
  2. 仅有并发安全的 tool 在执行,且新 tool 也是并发安全的

tool 在其 runToolUse 生成器耗尽(所有结果已收集)或被中止(兄弟错误、用户中断、streaming 回退)时完成。

结果通过 getCompletedResults() 按顺序提交。对于非并发 tool,已完成 tool 的结果仅在所有前序 tool 均已提交后才会提交;对于并发 tool,则立即提交(但遵循整体队列顺序)。

addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name);
if (!toolDefinition) {
// Unknown tool → immediate "completed" with error
this.tools.push({
id: block.id, block, assistantMessage,
status: 'completed',
isConcurrencySafe: true,
pendingProgress: [],
results: [createUserMessage({
content: [{
type: 'tool_result',
content: `<tool_use_error>Error: No such tool: ${block.name}</tool_use_error>`,
is_error: true,
tool_use_id: block.id,
}],
})],
});
return;
}
// Determine concurrency safety from parsed input
const parsedInput = toolDefinition.inputSchema.safeParse(block.input);
const isConcurrencySafe = parsedInput?.success
? (() => {
try { return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data)); }
catch { return false; }
})()
: false;
this.tools.push({
id: block.id, block, assistantMessage,
status: 'queued', isConcurrencySafe,
pendingProgress: [],
});
void this.processQueue();
}
private async processQueue(): Promise<void> {
for (const tool of this.tools) {
if (tool.status !== 'queued') continue;
if (this.canExecuteTool(tool.isConcurrencySafe)) {
await this.executeTool(tool);
} else {
// Non-concurrent tool must wait for exclusive access
if (!tool.isConcurrencySafe) break;
}
}
}

对非并发 tool 的 break 至关重要:它维持了执行顺序。如果 tool 按 [Read, Read, Write, Read] 顺序到达,执行器会:

  1. 并发运行两个 Read
  2. 单独运行 Write(等待 Read 完成后)
  3. 单独运行最后一个 Read(等待 Write 完成后)
private async executeTool(tool: TrackedTool): Promise<void> {
tool.status = 'executing';
this.toolUseContext.setInProgressToolUseIDs(prev => new Set(prev).add(tool.id));
this.updateInterruptibleState();
const collectResults = async () => {
// Check for pre-existing abort
const initialAbortReason = this.getAbortReason(tool);
if (initialAbortReason) {
messages.push(this.createSyntheticErrorMessage(tool.id, initialAbortReason, ...));
tool.status = 'completed';
return;
}
// Create per-tool abort controller
const toolAbortController = createChildAbortController(this.siblingAbortController);
// Bubble up non-sibling aborts to the query controller
toolAbortController.signal.addEventListener('abort', () => {
if (toolAbortController.signal.reason !== 'sibling_error' &&
!this.toolUseContext.abortController.signal.aborted &&
!this.discarded) {
this.toolUseContext.abortController.abort(toolAbortController.signal.reason);
}
}, { once: true });
// Execute via runToolUse generator
const generator = runToolUse(
tool.block, tool.assistantMessage, this.canUseTool,
{ ...this.toolUseContext, abortController: toolAbortController },
);
for await (const update of generator) {
// Check for mid-execution abort
const abortReason = this.getAbortReason(tool);
if (abortReason && !thisToolErrored) {
messages.push(this.createSyntheticErrorMessage(...));
break;
}
// Handle Bash errors → cascade to siblings
if (isErrorResult && tool.block.name === BASH_TOOL_NAME) {
this.hasErrored = true;
this.siblingAbortController.abort('sibling_error');
}
// Route progress messages for immediate yielding
if (update.message.type === 'progress') {
tool.pendingProgress.push(update.message);
this.progressAvailableResolve?.();
} else {
messages.push(update.message);
}
}
tool.results = messages;
tool.status = 'completed';
};
const promise = collectResults();
tool.promise = promise;
// Re-trigger queue when this tool completes
void promise.finally(() => void this.processQueue());
}
graph TD
    A[toolUseContext.abortController] --> B[siblingAbortController]
    B --> C[toolAbortController - Tool 1]
    B --> D[toolAbortController - Tool 2]
    B --> E[toolAbortController - Tool N]

    style A fill:#f9f,stroke:#333
    style B fill:#ff9,stroke:#333
  • 父级(toolUseContext):中止此控制器会结束整个轮次
  • 兄弟级:中止此控制器会终止所有 tool 子进程,但不结束轮次
  • 单 tool 级:中止此控制器会终止一个 tool 的子进程

冒泡逻辑确保 permission 拒绝(会中止单 tool 控制器)传播到父级——从而结束轮次。否则,query 循环会在部分结果下继续运行。

一个同步生成器,不阻塞地提交已完成的结果:

*getCompletedResults(): Generator<MessageUpdate, void> {
if (this.discarded) return;
for (const tool of this.tools) {
// Always yield progress immediately
while (tool.pendingProgress.length > 0) {
yield { message: tool.pendingProgress.shift()!, newContext: this.toolUseContext };
}
if (tool.status === 'yielded') continue;
if (tool.status === 'completed' && tool.results) {
tool.status = 'yielded';
for (const message of tool.results) {
yield { message, newContext: this.toolUseContext };
}
markToolUseAsComplete(this.toolUseContext, tool.id);
} else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
break; // Can't yield past an executing non-concurrent tool
}
}
}

Streaming 结束后:getRemainingResults()

Section titled “Streaming 结束后:getRemainingResults()”

一个等待所有 tool 完成的异步生成器:

async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
if (this.discarded) return;
while (this.hasUnfinishedTools()) {
await this.processQueue();
for (const result of this.getCompletedResults()) yield result;
// Wait for any tool to complete OR progress to arrive
if (this.hasExecutingTools() && !this.hasCompletedResults() && !this.hasPendingProgress()) {
const executingPromises = this.tools
.filter(t => t.status === 'executing' && t.promise)
.map(t => t.promise!);
const progressPromise = new Promise<void>(resolve => {
this.progressAvailableResolve = resolve;
});
if (executingPromises.length > 0) {
await Promise.race([...executingPromises, progressPromise]);
}
}
}
// Final flush
for (const result of this.getCompletedResults()) yield result;
}

Promise.race 模式确保循环在 tool 完成或进度到达时任意一种情况下都会唤醒——在等待慢速 tool 时仍能实时显示进度。

当 tool 被中止时,它会收到一个合成错误结果,以维持 API 不变量:

private createSyntheticErrorMessage(
toolUseId: string,
reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
assistantMessage: AssistantMessage,
): Message {
if (reason === 'user_interrupted') {
return createUserMessage({
content: [{
type: 'tool_result',
content: withMemoryCorrectionHint(REJECT_MESSAGE),
is_error: true,
tool_use_id: toolUseId,
}],
toolUseResult: 'User rejected tool use',
});
}
// ... similar for sibling_error and streaming_fallback
}

Tool 声明其处理用户中断的方式:

private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' {
const definition = findToolByName(this.toolDefinitions, tool.block.name);
if (!definition?.interruptBehavior) return 'block'; // default: keep running
try {
return definition.interruptBehavior();
} catch {
return 'block';
}
}

当用户提交新消息(reason === 'interrupt')时:

  • cancel tool 立即被中止并收到合成错误
  • block tool 继续运行;新消息等待它们完成

当发生 streaming 回退(stream 中途切换模型)时,整个执行器被丢弃:

discard(): void {
this.discarded = true;
}

丢弃后:

  • addTool 仍然有效,但 tool 会直接进入 completed 状态并带有回退错误
  • getCompletedResultsgetRemainingResults 立即返回
  • query 循环为重试创建一个新的执行器