跳转到内容

Tool 分发

Tool 分发是连接 Claude 意图(以 API 响应中的 tool_use 块表达)与实际代码执行的机制。本章涵盖从块检测到结果注入的完整分发 pipeline。

graph TD
    A[API Stream] --> B{tool_use block?}
    B -- yes --> C{StreamingToolExecutor enabled?}
    C -- yes --> D[addTool to executor]
    C -- no --> E[Collect in toolUseBlocks array]
    B -- no --> F[Continue streaming]

    D --> G{isConcurrencySafe?}
    G -- yes --> H[Start immediately if safe]
    G -- no --> I[Queue for serial execution]

    E --> J[Stream ends]
    J --> K[runTools - batch execution]

    H --> L[Yield results during stream]
    I --> M[getRemainingResults after stream]
    K --> M

随着 API 响应 stream 进来,queryLoop 检查每条助手消息中的 tool_use 内容块:

// src/query.ts — inside the streaming loop
if (message.type === 'assistant') {
assistantMessages.push(message);
const msgToolUseBlocks = message.message.content.filter(
content => content.type === 'tool_use',
) as ToolUseBlock[];
if (msgToolUseBlocks.length > 0) {
toolUseBlocks.push(...msgToolUseBlocks);
needsFollowUp = true; // signals the loop to continue after this iteration
}
// Feed to streaming executor for immediate execution
if (streamingToolExecutor && !toolUseContext.abortController.signal.aborted) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message);
}
}
}

ToolUseBlock 的结构如下:

type ToolUseBlock = {
type: 'tool_use';
id: string; // e.g., "toolu_01XFDUDYJgAACzvnptvVer6u"
name: string; // e.g., "Bash", "FileRead", "mcp__server__tool"
input: unknown; // tool-specific parameters
};

src/services/tools/StreamingToolExecutor.ts 中的 StreamingToolExecutor 是默认执行引擎。它在 API 仍在 streaming 时就启动 tool 执行,每个 turn 可节省数秒时间。

executor 追踪的每个 tool 会经历以下状态:

stateDiagram-v2
    [*] --> queued: addTool()
    queued --> executing: processQueue()
    executing --> completed: tool finishes
    completed --> yielded: getCompletedResults()
    queued --> completed: abort (synthetic error)
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = {
id: string;
block: ToolUseBlock;
assistantMessage: AssistantMessage;
status: ToolStatus;
isConcurrencySafe: boolean;
promise?: Promise<void>;
results?: Message[];
pendingProgress: Message[];
contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;
};

executor 使用简单而有效的并发模型:

private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing');
return (
executingTools.length === 0 ||
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
);
}

规则:

  • 若无 tool 正在执行,任何 tool 均可启动
  • 并发安全的 tool 可与其他并发安全的 tool 并行运行
  • 非并发 tool 需要独占访问(无其他 tool 正在执行)

哪些 tool 是并发安全的?取决于 tool 的 isConcurrencySafe(input) 方法。一般而言:

  • 并发安全FileReadGlobGrepWebFetch——只读操作
  • 非并发安全BashFileWriteFileEdit——有副作用的操作

添加 tool 时,executor 立即尝试处理队列:

addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
const toolDefinition = findToolByName(this.toolDefinitions, block.name);
// ... setup TrackedTool ...
this.tools.push(tracked);
void this.processQueue();
}
private async processQueue(): Promise<void> {
for (const tool of this.tools) {
if (tool.status !== 'queued') continue;
if (this.canExecuteTool(tool.isConcurrencySafe)) {
await this.executeTool(tool);
} else {
if (!tool.isConcurrencySafe) break; // maintain order for non-concurrent
}
}
}

tool 完成后,会重新触发 processQueue() 以启动等待中的 tool:

const promise = collectResults();
tool.promise = promise;
void promise.finally(() => {
void this.processQueue();
});

当 Bash tool 出错时,所有兄弟 tool 都会被取消:

if (isErrorResult && tool.block.name === BASH_TOOL_NAME) {
this.hasErrored = true;
this.erroredToolDescription = this.getToolDescription(tool);
this.siblingAbortController.abort('sibling_error');
}

siblingAbortController 是主 toolUseContext.abortController子控制器——中止它会终止兄弟 tool,但不会结束当前 turn。只有 Bash 错误会触发此机制;只读 tool 的失败(如 FileRead 读取不存在的文件)是独立的。

StreamingToolExecutor 被禁用时,src/services/tools/toolOrchestration.ts 中的回退路径会在 stream 完成后运行 tool:

src/services/tools/toolOrchestration.ts
export async function* runTools(
toolUseMessages: ToolUseBlock[],
assistantMessages: AssistantMessage[],
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
for (const { isConcurrencySafe, blocks } of partitionToolCalls(
toolUseMessages, toolUseContext
)) {
if (isConcurrencySafe) {
yield* runToolsConcurrently(blocks, assistantMessages, canUseTool, toolUseContext);
} else {
yield* runToolsSerially(blocks, assistantMessages, canUseTool, toolUseContext);
}
}
}

tool 调用被分区为批次:

function partitionToolCalls(toolUseMessages, toolUseContext): Batch[] {
// Groups consecutive concurrent-safe tools into one batch
// Non-concurrent tools get their own batch
// Example: [Read, Read, Write, Read, Read] → [Read,Read], [Write], [Read,Read]
}

并发批次使用 src/utils/generators.ts 中的 all() 函数,最大并发数为 10:

async function* runToolsConcurrently(
toolUseMessages, assistantMessages, canUseTool, toolUseContext,
): AsyncGenerator<MessageUpdateLazy, void> {
yield* all(
toolUseMessages.map(async function* (toolUse) {
yield* runToolUse(toolUse, assistantMessage, canUseTool, toolUseContext);
}),
getMaxToolUseConcurrency(), // default: 10
);
}

无论使用 StreamingToolExecutor 还是 runTools,每个独立的 tool 调用都经过 src/services/tools/toolExecution.ts 中的 runToolUse()

graph TD
    A[runToolUse] --> B[Parse input with Zod]
    B --> C{Valid?}
    C -- no --> D[Return parse error]
    C -- yes --> E[Run PreToolUse hooks]
    E --> F{Hook decision?}
    F -- deny --> G[Return denial message]
    F -- allow/ask --> H[Check permissions]
    H --> I{Allowed?}
    I -- no --> J[Return permission denial]
    I -- yes --> K[Execute tool.call()]
    K --> L[Run PostToolUse hooks]
    L --> M[Format result]
    M --> N[Yield result message]

tool 输入针对 tool 的 Zod schema 进行验证:

const parsedInput = tool.inputSchema.safeParse(block.input);
if (!parsedInput.success) {
// Return error with Zod validation details
}

permission 经过多层检查(参见 toolHooks.ts 中的 resolveHookPermissionDecision):

  1. PreToolUse hook 可返回 allowdenyask
  2. hook allow 不会绕过 settings.json 中的 deny/ask 规则
  3. 基于规则的 permissioncheckRuleBasedPermissions)检查设置
  4. canUseTool callback 在需要时显示 permission 对话框

tool 结果通过 mapToolResultToToolResultBlockParam 转换为 ToolResultBlockParam

const resultContent = tool.mapToolResultToToolResultBlockParam(
result.data, toolUseID
);
// → { type: 'tool_result', tool_use_id: '...', content: '...', is_error: false }

超出 tool.maxResultSizeChars 的大型结果会被持久化到磁盘,并用预览 + 文件路径替代。

所有 tool 完成后,其结果流回消息数组:

// src/query.ts — at the continue site
state = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
// ...
transition: { reason: 'next_turn' },
};

下一次 API 调用将看到:

[prior messages]
[assistant message with tool_use blocks]
[user message with tool_result blocks] ← injected here
[user message with attachment blocks] ← if any

当用户中断(Ctrl+C 或 submit-interrupt)时,tool 分发负责清理:

// In StreamingToolExecutor
private getAbortReason(tool): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null {
if (this.discarded) return 'streaming_fallback';
if (this.hasErrored) return 'sibling_error';
if (this.toolUseContext.abortController.signal.aborted) {
if (this.toolUseContext.abortController.signal.reason === 'interrupt') {
return this.getToolInterruptBehavior(tool) === 'cancel'
? 'user_interrupted' : null;
}
return 'user_interrupted';
}
return null;
}

interruptBehavior: 'block'(默认)的 tool 会在中断期间继续运行。interruptBehavior: 'cancel' 的 tool 会收到合成错误结果。

中止后,query loop 发出中断消息:

if (toolUseContext.abortController.signal.reason !== 'interrupt') {
yield createUserInterruptionMessage({ toolUse: true });
}
return { reason: 'aborted_tools' };