Tool 分发
Tool 分发是连接 Claude 意图(以 API 响应中的 tool_use 块表达)与实际代码执行的机制。本章涵盖从块检测到结果注入的完整分发 pipeline。
graph TD
A[API Stream] --> B{tool_use block?}
B -- yes --> C{StreamingToolExecutor enabled?}
C -- yes --> D[addTool to executor]
C -- no --> E[Collect in toolUseBlocks array]
B -- no --> F[Continue streaming]
D --> G{isConcurrencySafe?}
G -- yes --> H[Start immediately if safe]
G -- no --> I[Queue for serial execution]
E --> J[Stream ends]
J --> K[runTools - batch execution]
H --> L[Yield results during stream]
I --> M[getRemainingResults after stream]
K --> M
streaming 期间的 tool 检测
Section titled “streaming 期间的 tool 检测”随着 API 响应 stream 进来,queryLoop 检查每条助手消息中的 tool_use 内容块:
// src/query.ts — inside the streaming loopif (message.type === 'assistant') { assistantMessages.push(message);
const msgToolUseBlocks = message.message.content.filter( content => content.type === 'tool_use', ) as ToolUseBlock[];
if (msgToolUseBlocks.length > 0) { toolUseBlocks.push(...msgToolUseBlocks); needsFollowUp = true; // signals the loop to continue after this iteration }
// Feed to streaming executor for immediate execution if (streamingToolExecutor && !toolUseContext.abortController.signal.aborted) { for (const toolBlock of msgToolUseBlocks) { streamingToolExecutor.addTool(toolBlock, message); } }}ToolUseBlock 的结构如下:
type ToolUseBlock = { type: 'tool_use'; id: string; // e.g., "toolu_01XFDUDYJgAACzvnptvVer6u" name: string; // e.g., "Bash", "FileRead", "mcp__server__tool" input: unknown; // tool-specific parameters};StreamingToolExecutor
Section titled “StreamingToolExecutor”src/services/tools/StreamingToolExecutor.ts 中的 StreamingToolExecutor 是默认执行引擎。它在 API 仍在 streaming 时就启动 tool 执行,每个 turn 可节省数秒时间。
tool 状态机
Section titled “tool 状态机”executor 追踪的每个 tool 会经历以下状态:
stateDiagram-v2
[*] --> queued: addTool()
queued --> executing: processQueue()
executing --> completed: tool finishes
completed --> yielded: getCompletedResults()
queued --> completed: abort (synthetic error)
type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded';
type TrackedTool = { id: string; block: ToolUseBlock; assistantMessage: AssistantMessage; status: ToolStatus; isConcurrencySafe: boolean; promise?: Promise<void>; results?: Message[]; pendingProgress: Message[]; contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;};executor 使用简单而有效的并发模型:
private canExecuteTool(isConcurrencySafe: boolean): boolean { const executingTools = this.tools.filter(t => t.status === 'executing'); return ( executingTools.length === 0 || (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe)) );}规则:
- 若无 tool 正在执行,任何 tool 均可启动
- 并发安全的 tool 可与其他并发安全的 tool 并行运行
- 非并发 tool 需要独占访问(无其他 tool 正在执行)
哪些 tool 是并发安全的?取决于 tool 的 isConcurrencySafe(input) 方法。一般而言:
- 并发安全:
FileRead、Glob、Grep、WebFetch——只读操作 - 非并发安全:
Bash、FileWrite、FileEdit——有副作用的操作
添加 tool 时,executor 立即尝试处理队列:
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { const toolDefinition = findToolByName(this.toolDefinitions, block.name); // ... setup TrackedTool ... this.tools.push(tracked); void this.processQueue();}
private async processQueue(): Promise<void> { for (const tool of this.tools) { if (tool.status !== 'queued') continue; if (this.canExecuteTool(tool.isConcurrencySafe)) { await this.executeTool(tool); } else { if (!tool.isConcurrencySafe) break; // maintain order for non-concurrent } }}tool 完成后,会重新触发 processQueue() 以启动等待中的 tool:
const promise = collectResults();tool.promise = promise;void promise.finally(() => { void this.processQueue();});兄弟 tool 错误取消
Section titled “兄弟 tool 错误取消”当 Bash tool 出错时,所有兄弟 tool 都会被取消:
if (isErrorResult && tool.block.name === BASH_TOOL_NAME) { this.hasErrored = true; this.erroredToolDescription = this.getToolDescription(tool); this.siblingAbortController.abort('sibling_error');}siblingAbortController 是主 toolUseContext.abortController 的子控制器——中止它会终止兄弟 tool,但不会结束当前 turn。只有 Bash 错误会触发此机制;只读 tool 的失败(如 FileRead 读取不存在的文件)是独立的。
回退路径:runTools(批量执行)
Section titled “回退路径:runTools(批量执行)”当 StreamingToolExecutor 被禁用时,src/services/tools/toolOrchestration.ts 中的回退路径会在 stream 完成后运行 tool:
export async function* runTools( toolUseMessages: ToolUseBlock[], assistantMessages: AssistantMessage[], canUseTool: CanUseToolFn, toolUseContext: ToolUseContext,): AsyncGenerator<MessageUpdate, void> { for (const { isConcurrencySafe, blocks } of partitionToolCalls( toolUseMessages, toolUseContext )) { if (isConcurrencySafe) { yield* runToolsConcurrently(blocks, assistantMessages, canUseTool, toolUseContext); } else { yield* runToolsSerially(blocks, assistantMessages, canUseTool, toolUseContext); } }}tool 调用被分区为批次:
function partitionToolCalls(toolUseMessages, toolUseContext): Batch[] { // Groups consecutive concurrent-safe tools into one batch // Non-concurrent tools get their own batch // Example: [Read, Read, Write, Read, Read] → [Read,Read], [Write], [Read,Read]}并发批次使用 src/utils/generators.ts 中的 all() 函数,最大并发数为 10:
async function* runToolsConcurrently( toolUseMessages, assistantMessages, canUseTool, toolUseContext,): AsyncGenerator<MessageUpdateLazy, void> { yield* all( toolUseMessages.map(async function* (toolUse) { yield* runToolUse(toolUse, assistantMessage, canUseTool, toolUseContext); }), getMaxToolUseConcurrency(), // default: 10 );}tool 执行 pipeline
Section titled “tool 执行 pipeline”无论使用 StreamingToolExecutor 还是 runTools,每个独立的 tool 调用都经过 src/services/tools/toolExecution.ts 中的 runToolUse():
graph TD
A[runToolUse] --> B[Parse input with Zod]
B --> C{Valid?}
C -- no --> D[Return parse error]
C -- yes --> E[Run PreToolUse hooks]
E --> F{Hook decision?}
F -- deny --> G[Return denial message]
F -- allow/ask --> H[Check permissions]
H --> I{Allowed?}
I -- no --> J[Return permission denial]
I -- yes --> K[Execute tool.call()]
K --> L[Run PostToolUse hooks]
L --> M[Format result]
M --> N[Yield result message]
tool 输入针对 tool 的 Zod schema 进行验证:
const parsedInput = tool.inputSchema.safeParse(block.input);if (!parsedInput.success) { // Return error with Zod validation details}permission 解析
Section titled “permission 解析”permission 经过多层检查(参见 toolHooks.ts 中的 resolveHookPermissionDecision):
- PreToolUse hook 可返回
allow、deny或ask - hook
allow不会绕过 settings.json 中的 deny/ask 规则 - 基于规则的 permission(
checkRuleBasedPermissions)检查设置 - canUseTool callback 在需要时显示 permission 对话框
tool 结果通过 mapToolResultToToolResultBlockParam 转换为 ToolResultBlockParam:
const resultContent = tool.mapToolResultToToolResultBlockParam( result.data, toolUseID);// → { type: 'tool_result', tool_use_id: '...', content: '...', is_error: false }超出 tool.maxResultSizeChars 的大型结果会被持久化到磁盘,并用预览 + 文件路径替代。
将结果注入回对话
Section titled “将结果注入回对话”所有 tool 完成后,其结果流回消息数组:
// src/query.ts — at the continue sitestate = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], // ... transition: { reason: 'next_turn' },};下一次 API 调用将看到:
[prior messages][assistant message with tool_use blocks][user message with tool_result blocks] ← injected here[user message with attachment blocks] ← if any当用户中断(Ctrl+C 或 submit-interrupt)时,tool 分发负责清理:
// In StreamingToolExecutorprivate getAbortReason(tool): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null { if (this.discarded) return 'streaming_fallback'; if (this.hasErrored) return 'sibling_error'; if (this.toolUseContext.abortController.signal.aborted) { if (this.toolUseContext.abortController.signal.reason === 'interrupt') { return this.getToolInterruptBehavior(tool) === 'cancel' ? 'user_interrupted' : null; } return 'user_interrupted'; } return null;}interruptBehavior: 'block'(默认)的 tool 会在中断期间继续运行。interruptBehavior: 'cancel' 的 tool 会收到合成错误结果。
中止后,query loop 发出中断消息:
if (toolUseContext.abortController.signal.reason !== 'interrupt') { yield createUserInterruptionMessage({ toolUse: true });}return { reason: 'aborted_tools' };