Claude Code Harness Chapter 4: 工具执行编排:并发、流式输出和中断控制
2026-04-04
AI Claude Code Tool Orchestration Concurrency Harness Engineering
Claude Code Harness Chapter 4: 工具执行编排:并发、流式输出和中断控制
引言:指挥家的艺术
在第 2 章中,我们探讨了单个工具是如何工作的。但在实际使用中,Claude Code 经常需要同时执行多个工具、管理长时间运行的操作,并实时向用户报告进度——同时保持对所有操作的细粒度控制。
这就是工具执行编排(Tool Orchestration)要解决的问题。它就像一个交响乐团的指挥家,协调多个"音乐家"(工具)同时演奏,确保整体效果和谐而非混乱。
一个复杂的任务可能涉及:
- 🔀 并发执行:同时运行多个独立工具
- 📡 流式输出:实时显示工具进度
- ⏸️ 中断控制:随时取消特定操作
- 📊 状态管理:跟踪每个工具的状态
- 🔄 结果排序:保持输出的一致性
本章将深入剖析 Claude Code 的工具执行编排系统,揭示如何通过精心设计的架构,实现复杂的多工具协作。
编排架构概览
核心组件
工具执行由 StreamingToolExecutor 管理:
graph TB
subgraph "Orchestration layer"
A[StreamingToolExecutor]
B[ToolQueue]
C[ExecutionScheduler]
end
subgraph "Execution layer"
D[ActiveToolExecutions]
E[AbortControllers]
F[ResultBuffer]
end
subgraph "Output layer"
G[StatusTracker]
H[StreamProcessor]
I[ResultOrdering]
end
A --> B
A --> C
C --> D
D --> E
D --> F
D --> G
F --> H
H --> I
style A fill:#e1f5ff
style D fill:#e1f5ff
style H fill:#e1f5ff工具执行状态
每个工具都经历明确的状态转换:
stateDiagram-v2
[*] --> Queued: Request execution
Queued --> Executing: Start execution
Executing --> Streaming: Generate output
Executing --> Completed: Completed
Executing --> Failed: Execution failed
Streaming --> Completed: Output complete
Streaming --> Failed: Execution failed
Streaming --> Yielded: 产生部分Result
Executing --> Cancelled: 用户取消
Streaming --> Cancelled: 用户取消
Completed --> [*]
Failed --> [*]
Cancelled --> [*]
Yielded --> [*]
note right of Executing
运行工具 handler
管理 AbortController
Collect output
end note
note right of Streaming
Stream real-time output
Update progress
Handle partial results
end note并发执行策略
何时并发?
不是所有工具都应该并发执行。决策取决于多个因素:
flowchart TD
A[Multiple tool requests] --> B{Tools independent?}
B -->|No| C[Sequential execution]
B -->|Yes| D{Resource needs?}
D -->|CPU intensive| E[Limit concurrency]
D -->|I/O intensive| F[Full concurrency]
E --> G[Execution pool
Max N tools]
F --> H[Full concurrency]
G --> I[Monitor resources]
I --> J{Overloaded?}
J -->|Yes| K[Reduce concurrency]
J -->|No| L[Maintain or increase]
style C fill:#fc6
style H fill:#6f6并发决策矩阵
| 场景 | 并发策略 | 原因 |
|---|---|---|
| 搜索多个目录 | 完全并发 | I/O 操作,无共享状态 |
| 运行多个测试 | 完全并发 | 独立进程,无冲突 |
| 编辑同一文件 | 顺序执行 | 需要保证顺序 |
| 部署多个服务 | 限制并发(3-5) | 避免资源耗尽 |
| 数据库迁移 | 顺序执行 | 需要事务一致性 |
实战:并发文件搜索
sequenceDiagram
participant Model
participant Executor
participant Search1
participant Search2
participant Search3
Model->>Executor: Search 3 directories
par Concurrent execution
Executor->>Search1: Search src/
Executor->>Search2: Search tests/
Executor->>Search3: Search docs/
end
Search2-->>Executor: Complete(第 1)
Search3-->>Executor: Complete(第 2)
Search1-->>Executor: Complete(第 3)
Executor->>Executor: Collect results
Executor-->>Model: Return ordered results代码示例:
// 并发执行多个搜索工具
const searchTasks = [
grepTool({ pattern: "function", path: "src/" }),
grepTool({ pattern: "function", path: "tests/" }),
grepTool({ pattern: "function", path: "docs/" })
]
const results = await Promise.all(searchTasks)
// 结果按任务顺序,而非完成顺序资源管理
并发执行需要 careful 的资源管理:
pie title Concurrency limit configuration
"File operations" : 10
"Network requests" : 5
"CPU intensive" : 3
"数据库" : 2资源池策略:
| 资源类型 | 并发限制 | 超限处理 |
|---|---|---|
| 文件读取 | 10 个 | 排队等待 |
| 网络请求 | 5 个 | 限流 |
| Bash 进程 | 3 个 | 池管理 |
| 内存密集 | 2 个 | 串行化 |
流式输出系统
为什么需要流式输出?
长时间运行的工具如果等到完成才输出,用户体验会很差:
❌ 非流式:
[运行 30 秒...]
[运行 60 秒...]
[运行 90 秒...]
突然输出: "完成!找到了 1000 个结果"
✅ 流式:
"正在搜索..."
"[==== ] 20% - 找到 200 个结果"
"[======== ] 50% - 找到 500 个结果"
"[==========] 100% - 找到 1000 个结果"流式输出架构
flowchart LR
A[Tool execution] --> B[Generate output]
B --> C[Output buffer]
C --> D{Buffer full?}
D -->|Yes| E[Flush to user]
D -->|No| F[Continue accumulating]
E --> G[User sees progress]
F --> B
style E fill:#e1f5ff
style G fill:#e1f5ff流式输出类型
| 输出类型 | 流式方式 | 示例 |
|---|---|---|
| 进度更新 | 百分比 + 状态 | [50%] 处理中... |
| 增量结果 | 逐项输出 | 找到: file1.ts, file2.ts... |
| 日志输出 | 实时日志 | INFO: 连接数据库... |
| 状态变化 | 状态转换 | 开始 → 运行中 → 完成 |
实战:流式测试运行
sequenceDiagram
participant User
participant Executor
participant TestTool
User->>Executor: Run tests
Executor->>TestTool: Start execution
loop Test run
TestTool-->>Executor: Progress update
Executor-->>User: [==== ] 40% - 10/25 通过
end
TestTool-->>Executor: Final result
Executor-->>User: [=======] 100% - 25/25 通过 ✓
note over User,Executor
User sees progress in real-time
Can spot issues early
Better UX
end note流式输出格式:
interface StreamUpdate {
toolCallId: string
status: 'running' | 'completed' | 'failed'
progress?: number // 0-100
message?: string
partialResult?: any
}
// 示例输出
{
toolCallId: "bash_123",
status: "running",
progress: 45,
message: "运行测试中...",
partialResult: { passed: 10, failed: 2, total: 25 }
}细粒度中断控制
中断层次
Claude Code 实现了多层次的中断控制:
graph TB
A[User interrupt] --> B{Interrupt level}
B -->|Soft interrupt| C[Complete当前工具]
B -->|Hard interrupt| D[Stop all immediately]
B -->|Selective interrupt| E[Stop specific tool]
C --> F[Clean up state]
D --> G[Force terminate]
E --> H[Cancel selected tools]
F --> I[Report status]
G --> I
H --> I
style C fill:#6f6
style D fill:#f66
style E fill:#fc6AbortController 架构
每个工具都有独立的 AbortController:
flowchart LR
A[ToolExecutionManager] --> B1[Tool 1
AbortController]
A --> B2[Tool 2
AbortController]
A --> B3[Tool 3
AbortController]
C[User interrupt] --> D[选择要中断工具]
D --> E1[取消 Tool 1]
D --> E2[取消 Tool 2]
D --> E3[取消 Tool 3]
E1 --> F1[Cleanup resources]
E2 --> F2[Cleanup resources]
E3 --> F3[Cleanup resources]
style A fill:#e1f5ff
style D fill:#f66实现示例:
class ToolExecution {
private abortController = new AbortController()
private cleanupCallbacks: Array<() => void> = []
async execute(tool: Tool, input: any) {
try {
// 设置信号
const signal = this.abortController.signal
// 执行工具
const result = await tool.handler(input, { signal })
return result
} catch (error) {
if (error.name === 'AbortError') {
// 清理资源
this.cleanup()
throw new ToolCancelledError()
}
throw error
}
}
cancel() {
this.abortController.abort()
}
registerCleanup(callback: () => void) {
this.cleanupCallbacks.push(callback)
}
private cleanup() {
this.cleanupCallbacks.forEach(cb => cb())
this.cleanupCallbacks = []
}
}中断场景
| 场景 | 中断类型 | 行为 |
|---|---|---|
| 用户按 Ctrl+C | 硬中断 | 立即停止所有操作 |
| 点击"取消"按钮 | 软中断 | 完成当前工具后停止 |
| 取消特定工具 | 选择性中断 | 只停止选定的工具 |
| 超时 | 自动中断 | 达到时间限制后中断 |
结果缓冲和排序
结果顺序问题
当工具并发执行时,完成的顺序是不可预测的:
sequenceDiagram
participant Executor
participant Tool1
participant Tool2
participant Tool3
par Concurrent execution
Executor->>Tool1: Slow operation (5s)
Executor->>Tool2: Fast operation (1s)
Executor->>Tool3: Medium operation (3s)
end
Note over Tool2,Tool3: Complete顺序: Tool2 → Tool3 → Tool1
Note over Executor: Expected order: Tool1 → Tool2 → Tool3
Tool2-->>Executor: Result(第 1)
Tool3-->>Executor: Result(第 2)
Tool1-->>Executor: Result(第 3)
Executor->>Executor: Reorder
Executor-->>User: Tool1, Tool2, Tool3结果缓冲系统
flowchart TD
A[Tool start] --> B[Assign index]
B --> C[Execute tool]
C --> D[Complete]
D --> E{所有工具Complete?}
E -->|No| F[Buffer result]
E -->|Yes| G[Sort by index]
F --> H[Wait for other tools]
H --> C
G --> I[Return ordered results]
style F fill:#e1f5ff
style G fill:#e1f5ff实现示例:
class ResultBuffer {
private buffer = new Map()
private nextExpectedIndex = 0
private readonly totalTools: number
constructor(totalTools: number) {
this.totalTools = totalTools
}
addResult(index: number, result: ToolResult) {
this.buffer.set(index, result)
}
*getOrderedResults(): Generator {
while (this.buffer.size < this.totalTools) {
if (this.buffer.has(this.nextExpectedIndex)) {
const result = this.buffer.get(this.nextExpectedIndex)!
yield result
this.buffer.delete(this.nextExpectedIndex)
this.nextExpectedIndex++
} else {
// 等待下一个结果
break
}
}
}
isComplete(): boolean {
return this.buffer.size === this.totalTools &&
this.nextExpectedIndex === this.totalTools
}
} 状态管理和追踪
工具状态模型
每个工具都有详细的状态追踪:
graph TB
subgraph "Tool state"
A[Queued
📋] --> B[Executing
⚙️]
B --> C[Streaming
📡]
C --> D[Completed
✅]
B --> E[Failed
❌]
C --> E
B --> F[Cancelled
⏸️]
C --> F
C --> G[Yielded
⏸️]
end
subgraph "User visible"
H[Waiting...] --> A
I[Running...] --> B
J[Complete] --> D
K[Fail] --> E
L[Cancelled] --> F
end状态更新流
flowchart LR
A[Tool execution] --> B[State change]
B --> C[Update status tracker]
C --> D[Generate event]
D --> E[Notify UI]
D --> F[Log events]
D --> G[Update memory]
E --> H[User sees new state]
F --> I[Audit trail]
G --> J[上下文更新]
style C fill:#e1f5ff
style E fill:#e1f5ff状态追踪接口:
interface ToolStatus {
toolCallId: string
toolName: string
status: ToolState
startTime: number
endTime?: number
progress?: number
error?: Error
result?: any
}
enum ToolState {
Queued = 'queued',
Executing = 'executing',
Streaming = 'streaming',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Yielded = 'yielded'
}高级编排模式
模式 1:工具依赖链
flowchart TD
A[Tool 1
Read config] --> B{Success?}
B -->|Yes| C[Tool 2
Validate config]
B -->|No| D[Fail并停止]
C --> E{Success?}
E -->|Yes| F[Tool 3
Apply config]
E -->|No| D
F --> G{Success?}
G -->|Yes| H[Complete]
G -->|No| D
style A fill:#e1f5ff
style C fill:#e1f5ff
style F fill:#e1f5ff模式 2:条件分支
flowchart TD
A[Check condition] --> B{Condition A?}
B -->|Yes| C[Execute tool A]
B -->|No| D{Condition B?}
D -->|Yes| E[Execute tool B]
D -->|No| F[Execute tool C]
C --> G[Merge results]
E --> G
F --> G
style A fill:#e1f5ff
style G fill:#e1f5ff模式 3:重试循环
flowchart TD
A[Execute tool] --> B{Success?}
B -->|Yes| C[Return result]
B -->|No| D{Retry count < max?}
D -->|Yes| E[Wait]
E --> F[Increment retry count]
F --> A
D -->|No| G[Fail]
style A fill:#e1f5ff
style C fill:#6f6
style G fill:#f66重试策略配置:
| 工具类型 | 最大重试 | 退避策略 | 超时 |
|---|---|---|---|
| 网络请求 | 3 | 指数退避 | 30s |
| 文件操作 | 1 | 无 | 5s |
| Bash 命令 | 0 | 无 | 60s |
| 数据库操作 | 2 | 固定延迟 | 10s |
性能优化
1. 批处理
flowchart LR
A[Multiple similar requests] --> B[Batch]
B --> C[Single execution]
C --> D[Split results]
D --> E[Return individual results]
style B fill:#6f6
style C fill:#6f6示例:批量文件读取
// ❌ 逐个读取
for (const file of files) {
await readFile(file)
}
// ✅ 批量读取
const results = await Promise.all(
files.map(file => readFile(file))
)2. 缓存策略
flowchart TD
A[Tool request] --> B{Cache hit?}
B -->|Yes| C[Return cached result]
B -->|No| D[Execute tool]
D --> E[Cache result]
E --> F[Return result]
style C fill:#6f6
style E fill:#e1f5ff缓存决策:
| 工具类型 | 缓存 | TTL | 失效条件 |
|---|---|---|---|
| 文件读取 | ✅ | 5min | 文件修改 |
| Git 状态 | ✅ | 5min | 文件系统变化 |
| Bash 执行 | ❌ | - | - |
| 网络请求 | ✅ | 1min | 时间过期 |
3. 资源池化
pie title Resource pool allocation
"File read pool" : 10
"Bash process pool" : 3
"Network request pool" : 5
"Database connection pool" : 2错误处理和恢复
错误分类
graph TD
A[Tool error] --> B{Error type}
B -->|Retryable| C[Network error]
B -->|User intervention| D[Permission error]
B -->|Auto fix| E[Parameter error]
B -->|Fatal| F[System error]
C --> G[Auto retry]
D --> H[Request approval]
E --> I[Fix parameters]
F --> J[Report and stop]
style G fill:#fc6
style H fill:#fc6
style I fill:#6f6
style J fill:#f66错误恢复策略
| 错误类型 | 恢复策略 | 用户干预 |
|---|---|---|
| 超时 | 重试 3 次,指数退避 | ❌ |
| 权限拒绝 | 请求用户批准 | ✅ |
| 参数无效 | 修正并重试 | ❌ |
| 资源不足 | 减少并发,重试 | ❌ |
| 依赖缺失 | 安装依赖或报告错误 | ✅ |
实战案例:复杂编排场景
场景:部署微服务应用
flowchart TD
A[Start deployment] --> B[Concurrent execution]
B --> C1[Build service A]
B --> C2[Build service B]
B --> C3[Build service C]
C1 --> D1{Success?}
C2 --> D2{Success?}
C3 --> D3{Success?}
D1 -->|No| E1[Rollback A]
D2 -->|No| E2[Rollback B]
D3 -->|No| E3[Rollback C]
D1 -->|Yes| F1[Deploy A]
D2 -->|Yes| F2[Deploy B]
D3 -->|Yes| F3[Deploy C]
F1 --> G1{Success?}
F2 --> G2{Success?}
F3 --> G3{Success?}
G1 -->|No| H1[Rollback A]
G2 -->|No| H2[Rollback B]
G3 -->|No| H3[Rollback C]
G1 -->|Yes| I1[Health check A]
G2 -->|Yes| I2[Health check B]
G3 -->|Yes| I3[Health check C]
I1 --> J{All healthy?}
I2 --> J
I3 --> J
J -->|Yes| K[Deployment successful]
J -->|No| L[Rollback all]
style B fill:#e1f5ff
style K fill:#6f6
style L fill:#f66执行日志:
=== 阶段 1: 并发构建 ===
[Tool 1] 构建 service-a: [==== ] 40%
[Tool 2] 构建 service-b: [======== ] 80%
[Tool 3] 构建 service-c: [==========] 100% ✓
=== 阶段 2: 部署 ===
[Tool 3] 部署 service-c: [==== ] 40%
[Tool 1] 构建 service-a: [==========] 100% ✓
[Tool 2] 构建 service-b: [==========] 100% ✓
[Tool 3] 部署 service-c: [==========] 100% ✓
=== 阶段 3: 健康检查 ===
[Tool 1] 健康检查 service-a: ✓ 健康
[Tool 2] 健康检查 service-b: ✓ 健康
[Tool 3] 健康检查 service-c: ✓ 健康
=== 结果 ===
✅ 所有服务部署成功结论:编排的艺术
工具执行编排是 Claude Code 的"指挥家",它实现了:
- 并发执行:充分利用系统资源
- 流式输出:实时反馈和更好的用户体验
- 细粒度控制:独立管理每个工具的生命周期
- 结果一致性:保持输出顺序的可预测性
- 错误恢复:优雅处理各种失败情况
- 资源优化:智能管理并发和缓存
核心设计原则:
| 原则 | 实现 | 价值 |
|---|---|---|
| 独立性 | 每个工具有独立的控制器 | 可单独中断和重试 |
| 可观察性 | 详细的状态追踪和日志 | 易于调试和监控 |
| 用户控制 | 多层次的中断机制 | 保持用户主导 |
| 性能优先 | 并发、缓存、批处理 | 快速响应 |
| 优雅降级 | 错误恢复和回滚 | 可靠的系统 |
这种编排架构展示了 Harness Engineering 的精髓:不是简单地执行工具,而是创建一个智能的、可控制的、高性能的执行环境,让 AI 智能与实际能力完美协作。
在 Part 2 中,我们将转向 Prompt 工程,探讨如何通过精心设计的 System Prompts 来引导 AI 的行为——这是 Claude Code 的"控制平面"。
关键要点
- 编排是指挥艺术:协调多个工具和谐工作
- 并发执行策略:根据工具特性选择并发级别
- 流式输出:实时反馈改善用户体验
- 细粒度中断:独立控制每个工具
- 结果排序:保持输出一致性
- 性能优化:批处理、缓存、资源池化
延伸阅读
- 第 5 章:系统提示词架构——控制平面
- 第 6 章:通过提示词引导行为
- StreamingToolExecutor 源码:
src/services/tools/StreamingToolExecutor.ts - 工具执行源码:
src/services/tools/toolExecution.ts