Back to Blog

Claude Code Harness Chapter 4: 工具执行编排:并发、流式输出和中断控制

2026-04-04
AI Claude Code Tool Orchestration Concurrency Harness Engineering

Claude Code Harness Chapter 4: 工具执行编排:并发、流式输出和中断控制

引言:指挥家的艺术

在第 2 章中,我们探讨了单个工具是如何工作的。但在实际使用中,Claude Code 经常需要同时执行多个工具、管理长时间运行的操作,并实时向用户报告进度——同时保持对所有操作的细粒度控制。

这就是工具执行编排(Tool Orchestration)要解决的问题。它就像一个交响乐团的指挥家,协调多个"音乐家"(工具)同时演奏,确保整体效果和谐而非混乱。

一个复杂的任务可能涉及:

  • 🔀 并发执行:同时运行多个独立工具
  • 📡 流式输出:实时显示工具进度
  • ⏸️ 中断控制:随时取消特定操作
  • 📊 状态管理:跟踪每个工具的状态
  • 🔄 结果排序:保持输出的一致性

本章将深入剖析 Claude Code 的工具执行编排系统,揭示如何通过精心设计的架构,实现复杂的多工具协作。

编排架构概览

核心组件

工具执行由 StreamingToolExecutor 管理:

graph TB
    subgraph "Orchestration layer"
        A[StreamingToolExecutor]
        B[ToolQueue]
        C[ExecutionScheduler]
    end

    subgraph "Execution layer"
        D[ActiveToolExecutions]
        E[AbortControllers]
        F[ResultBuffer]
    end

    subgraph "Output layer"
        G[StatusTracker]
        H[StreamProcessor]
        I[ResultOrdering]
    end

    A --> B
    A --> C
    C --> D
    D --> E
    D --> F
    D --> G
    F --> H
    H --> I

    style A fill:#e1f5ff
    style D fill:#e1f5ff
    style H fill:#e1f5ff

工具执行状态

每个工具都经历明确的状态转换:

stateDiagram-v2
    [*] --> Queued: Request execution
    Queued --> Executing: Start execution
    Executing --> Streaming: Generate output
    Executing --> Completed: Completed
    Executing --> Failed: Execution failed
    Streaming --> Completed: Output complete
    Streaming --> Failed: Execution failed
    Streaming --> Yielded: 产生部分Result
    Executing --> Cancelled: 用户取消
    Streaming --> Cancelled: 用户取消

    Completed --> [*]
    Failed --> [*]
    Cancelled --> [*]
    Yielded --> [*]

    note right of Executing
        运行工具 handler
        管理 AbortController
        Collect output
    end note

    note right of Streaming
        Stream real-time output
        Update progress
        Handle partial results
    end note

并发执行策略

何时并发?

不是所有工具都应该并发执行。决策取决于多个因素:

flowchart TD
    A[Multiple tool requests] --> B{Tools independent?}
    B -->|No| C[Sequential execution]
    B -->|Yes| D{Resource needs?}

    D -->|CPU intensive| E[Limit concurrency]
    D -->|I/O intensive| F[Full concurrency]

    E --> G[Execution pool
Max N tools] F --> H[Full concurrency] G --> I[Monitor resources] I --> J{Overloaded?} J -->|Yes| K[Reduce concurrency] J -->|No| L[Maintain or increase] style C fill:#fc6 style H fill:#6f6

并发决策矩阵

场景 并发策略 原因
搜索多个目录 完全并发 I/O 操作,无共享状态
运行多个测试 完全并发 独立进程,无冲突
编辑同一文件 顺序执行 需要保证顺序
部署多个服务 限制并发(3-5) 避免资源耗尽
数据库迁移 顺序执行 需要事务一致性

实战:并发文件搜索

sequenceDiagram
    participant Model
    participant Executor
    participant Search1
    participant Search2
    participant Search3

    Model->>Executor: Search 3 directories
    par Concurrent execution
        Executor->>Search1: Search src/
        Executor->>Search2: Search tests/
        Executor->>Search3: Search docs/
    end

    Search2-->>Executor: Complete(第 1)
    Search3-->>Executor: Complete(第 2)
    Search1-->>Executor: Complete(第 3)

    Executor->>Executor: Collect results
    Executor-->>Model: Return ordered results

代码示例:

// 并发执行多个搜索工具
const searchTasks = [
  grepTool({ pattern: "function", path: "src/" }),
  grepTool({ pattern: "function", path: "tests/" }),
  grepTool({ pattern: "function", path: "docs/" })
]

const results = await Promise.all(searchTasks)
// 结果按任务顺序,而非完成顺序

资源管理

并发执行需要 careful 的资源管理:

pie title Concurrency limit configuration
    "File operations" : 10
    "Network requests" : 5
    "CPU intensive" : 3
    "数据库" : 2

资源池策略:

资源类型 并发限制 超限处理
文件读取 10 个 排队等待
网络请求 5 个 限流
Bash 进程 3 个 池管理
内存密集 2 个 串行化

流式输出系统

为什么需要流式输出?

长时间运行的工具如果等到完成才输出,用户体验会很差:

❌ 非流式:
[运行 30 秒...]
[运行 60 秒...]
[运行 90 秒...]
突然输出: "完成!找到了 1000 个结果"

✅ 流式:
"正在搜索..."
"[====      ] 20% - 找到 200 个结果"
"[========  ] 50% - 找到 500 个结果"
"[==========] 100% - 找到 1000 个结果"

流式输出架构

flowchart LR
    A[Tool execution] --> B[Generate output]
    B --> C[Output buffer]
    C --> D{Buffer full?}
    D -->|Yes| E[Flush to user]
    D -->|No| F[Continue accumulating]
    E --> G[User sees progress]
    F --> B

    style E fill:#e1f5ff
    style G fill:#e1f5ff

流式输出类型

输出类型 流式方式 示例
进度更新 百分比 + 状态 [50%] 处理中...
增量结果 逐项输出 找到: file1.ts, file2.ts...
日志输出 实时日志 INFO: 连接数据库...
状态变化 状态转换 开始 → 运行中 → 完成

实战:流式测试运行

sequenceDiagram
    participant User
    participant Executor
    participant TestTool

    User->>Executor: Run tests
    Executor->>TestTool: Start execution

    loop Test run
        TestTool-->>Executor: Progress update
        Executor-->>User: [====  ] 40% - 10/25 通过
    end

    TestTool-->>Executor: Final result
    Executor-->>User: [=======] 100% - 25/25 通过 ✓

    note over User,Executor
        User sees progress in real-time
        Can spot issues early
        Better UX
    end note

流式输出格式:

interface StreamUpdate {
  toolCallId: string
  status: 'running' | 'completed' | 'failed'
  progress?: number // 0-100
  message?: string
  partialResult?: any
}

// 示例输出
{
  toolCallId: "bash_123",
  status: "running",
  progress: 45,
  message: "运行测试中...",
  partialResult: { passed: 10, failed: 2, total: 25 }
}

细粒度中断控制

中断层次

Claude Code 实现了多层次的中断控制:

graph TB
    A[User interrupt] --> B{Interrupt level}

    B -->|Soft interrupt| C[Complete当前工具]
    B -->|Hard interrupt| D[Stop all immediately]
    B -->|Selective interrupt| E[Stop specific tool]

    C --> F[Clean up state]
    D --> G[Force terminate]
    E --> H[Cancel selected tools]

    F --> I[Report status]
    G --> I
    H --> I

    style C fill:#6f6
    style D fill:#f66
    style E fill:#fc6

AbortController 架构

每个工具都有独立的 AbortController:

flowchart LR
    A[ToolExecutionManager] --> B1[Tool 1
AbortController] A --> B2[Tool 2
AbortController] A --> B3[Tool 3
AbortController] C[User interrupt] --> D[选择要中断工具] D --> E1[取消 Tool 1] D --> E2[取消 Tool 2] D --> E3[取消 Tool 3] E1 --> F1[Cleanup resources] E2 --> F2[Cleanup resources] E3 --> F3[Cleanup resources] style A fill:#e1f5ff style D fill:#f66

实现示例:

class ToolExecution {
  private abortController = new AbortController()
  private cleanupCallbacks: Array<() => void> = []

  async execute(tool: Tool, input: any) {
    try {
      // 设置信号
      const signal = this.abortController.signal

      // 执行工具
      const result = await tool.handler(input, { signal })

      return result
    } catch (error) {
      if (error.name === 'AbortError') {
        // 清理资源
        this.cleanup()
        throw new ToolCancelledError()
      }
      throw error
    }
  }

  cancel() {
    this.abortController.abort()
  }

  registerCleanup(callback: () => void) {
    this.cleanupCallbacks.push(callback)
  }

  private cleanup() {
    this.cleanupCallbacks.forEach(cb => cb())
    this.cleanupCallbacks = []
  }
}

中断场景

场景 中断类型 行为
用户按 Ctrl+C 硬中断 立即停止所有操作
点击"取消"按钮 软中断 完成当前工具后停止
取消特定工具 选择性中断 只停止选定的工具
超时 自动中断 达到时间限制后中断

结果缓冲和排序

结果顺序问题

当工具并发执行时,完成的顺序是不可预测的:

sequenceDiagram
    participant Executor
    participant Tool1
    participant Tool2
    participant Tool3

    par Concurrent execution
        Executor->>Tool1: Slow operation (5s)
        Executor->>Tool2: Fast operation (1s)
        Executor->>Tool3: Medium operation (3s)
    end

    Note over Tool2,Tool3: Complete顺序: Tool2 → Tool3 → Tool1
    Note over Executor: Expected order: Tool1 → Tool2 → Tool3

    Tool2-->>Executor: Result(第 1)
    Tool3-->>Executor: Result(第 2)
    Tool1-->>Executor: Result(第 3)

    Executor->>Executor: Reorder
    Executor-->>User: Tool1, Tool2, Tool3

结果缓冲系统

flowchart TD
    A[Tool start] --> B[Assign index]
    B --> C[Execute tool]
    C --> D[Complete]
    D --> E{所有工具Complete?}

    E -->|No| F[Buffer result]
    E -->|Yes| G[Sort by index]
    F --> H[Wait for other tools]
    H --> C

    G --> I[Return ordered results]

    style F fill:#e1f5ff
    style G fill:#e1f5ff

实现示例:

class ResultBuffer {
  private buffer = new Map()
  private nextExpectedIndex = 0
  private readonly totalTools: number

  constructor(totalTools: number) {
    this.totalTools = totalTools
  }

  addResult(index: number, result: ToolResult) {
    this.buffer.set(index, result)
  }

  *getOrderedResults(): Generator {
    while (this.buffer.size < this.totalTools) {
      if (this.buffer.has(this.nextExpectedIndex)) {
        const result = this.buffer.get(this.nextExpectedIndex)!
        yield result
        this.buffer.delete(this.nextExpectedIndex)
        this.nextExpectedIndex++
      } else {
        // 等待下一个结果
        break
      }
    }
  }

  isComplete(): boolean {
    return this.buffer.size === this.totalTools &&
           this.nextExpectedIndex === this.totalTools
  }
}

状态管理和追踪

工具状态模型

每个工具都有详细的状态追踪:

graph TB
    subgraph "Tool state"
        A[Queued
📋] --> B[Executing
⚙️] B --> C[Streaming
📡] C --> D[Completed
✅] B --> E[Failed
❌] C --> E B --> F[Cancelled
⏸️] C --> F C --> G[Yielded
⏸️] end subgraph "User visible" H[Waiting...] --> A I[Running...] --> B J[Complete] --> D K[Fail] --> E L[Cancelled] --> F end

状态更新流

flowchart LR
    A[Tool execution] --> B[State change]
    B --> C[Update status tracker]
    C --> D[Generate event]
    D --> E[Notify UI]
    D --> F[Log events]
    D --> G[Update memory]

    E --> H[User sees new state]
    F --> I[Audit trail]
    G --> J[上下文更新]

    style C fill:#e1f5ff
    style E fill:#e1f5ff

状态追踪接口:

interface ToolStatus {
  toolCallId: string
  toolName: string
  status: ToolState
  startTime: number
  endTime?: number
  progress?: number
  error?: Error
  result?: any
}

enum ToolState {
  Queued = 'queued',
  Executing = 'executing',
  Streaming = 'streaming',
  Completed = 'completed',
  Failed = 'failed',
  Cancelled = 'cancelled',
  Yielded = 'yielded'
}

高级编排模式

模式 1:工具依赖链

flowchart TD
    A[Tool 1
Read config] --> B{Success?} B -->|Yes| C[Tool 2
Validate config] B -->|No| D[Fail并停止] C --> E{Success?} E -->|Yes| F[Tool 3
Apply config] E -->|No| D F --> G{Success?} G -->|Yes| H[Complete] G -->|No| D style A fill:#e1f5ff style C fill:#e1f5ff style F fill:#e1f5ff

模式 2:条件分支

flowchart TD
    A[Check condition] --> B{Condition A?}
    B -->|Yes| C[Execute tool A]
    B -->|No| D{Condition B?}

    D -->|Yes| E[Execute tool B]
    D -->|No| F[Execute tool C]

    C --> G[Merge results]
    E --> G
    F --> G

    style A fill:#e1f5ff
    style G fill:#e1f5ff

模式 3:重试循环

flowchart TD
    A[Execute tool] --> B{Success?}
    B -->|Yes| C[Return result]
    B -->|No| D{Retry count < max?}

    D -->|Yes| E[Wait]
    E --> F[Increment retry count]
    F --> A

    D -->|No| G[Fail]

    style A fill:#e1f5ff
    style C fill:#6f6
    style G fill:#f66

重试策略配置:

工具类型 最大重试 退避策略 超时
网络请求 3 指数退避 30s
文件操作 1 5s
Bash 命令 0 60s
数据库操作 2 固定延迟 10s

性能优化

1. 批处理

flowchart LR
    A[Multiple similar requests] --> B[Batch]
    B --> C[Single execution]
    C --> D[Split results]
    D --> E[Return individual results]

    style B fill:#6f6
    style C fill:#6f6

示例:批量文件读取

// ❌ 逐个读取
for (const file of files) {
  await readFile(file)
}

// ✅ 批量读取
const results = await Promise.all(
  files.map(file => readFile(file))
)

2. 缓存策略

flowchart TD
    A[Tool request] --> B{Cache hit?}
    B -->|Yes| C[Return cached result]
    B -->|No| D[Execute tool]
    D --> E[Cache result]
    E --> F[Return result]

    style C fill:#6f6
    style E fill:#e1f5ff

缓存决策:

工具类型 缓存 TTL 失效条件
文件读取 5min 文件修改
Git 状态 5min 文件系统变化
Bash 执行 - -
网络请求 1min 时间过期

3. 资源池化

pie title Resource pool allocation
    "File read pool" : 10
    "Bash process pool" : 3
    "Network request pool" : 5
    "Database connection pool" : 2

错误处理和恢复

错误分类

graph TD
    A[Tool error] --> B{Error type}

    B -->|Retryable| C[Network error]
    B -->|User intervention| D[Permission error]
    B -->|Auto fix| E[Parameter error]
    B -->|Fatal| F[System error]

    C --> G[Auto retry]
    D --> H[Request approval]
    E --> I[Fix parameters]
    F --> J[Report and stop]

    style G fill:#fc6
    style H fill:#fc6
    style I fill:#6f6
    style J fill:#f66

错误恢复策略

错误类型 恢复策略 用户干预
超时 重试 3 次,指数退避
权限拒绝 请求用户批准
参数无效 修正并重试
资源不足 减少并发,重试
依赖缺失 安装依赖或报告错误

实战案例:复杂编排场景

场景:部署微服务应用

flowchart TD
    A[Start deployment] --> B[Concurrent execution]
    B --> C1[Build service A]
    B --> C2[Build service B]
    B --> C3[Build service C]

    C1 --> D1{Success?}
    C2 --> D2{Success?}
    C3 --> D3{Success?}

    D1 -->|No| E1[Rollback A]
    D2 -->|No| E2[Rollback B]
    D3 -->|No| E3[Rollback C]

    D1 -->|Yes| F1[Deploy A]
    D2 -->|Yes| F2[Deploy B]
    D3 -->|Yes| F3[Deploy C]

    F1 --> G1{Success?}
    F2 --> G2{Success?}
    F3 --> G3{Success?}

    G1 -->|No| H1[Rollback A]
    G2 -->|No| H2[Rollback B]
    G3 -->|No| H3[Rollback C]

    G1 -->|Yes| I1[Health check A]
    G2 -->|Yes| I2[Health check B]
    G3 -->|Yes| I3[Health check C]

    I1 --> J{All healthy?}
    I2 --> J
    I3 --> J

    J -->|Yes| K[Deployment successful]
    J -->|No| L[Rollback all]

    style B fill:#e1f5ff
    style K fill:#6f6
    style L fill:#f66

执行日志:

=== 阶段 1: 并发构建 ===
[Tool 1] 构建 service-a: [====      ] 40%
[Tool 2] 构建 service-b: [========  ] 80%
[Tool 3] 构建 service-c: [==========] 100% ✓

=== 阶段 2: 部署 ===
[Tool 3] 部署 service-c: [====      ] 40%
[Tool 1] 构建 service-a: [==========] 100% ✓
[Tool 2] 构建 service-b: [==========] 100% ✓
[Tool 3] 部署 service-c: [==========] 100% ✓

=== 阶段 3: 健康检查 ===
[Tool 1] 健康检查 service-a: ✓ 健康
[Tool 2] 健康检查 service-b: ✓ 健康
[Tool 3] 健康检查 service-c: ✓ 健康

=== 结果 ===
✅ 所有服务部署成功

结论:编排的艺术

工具执行编排是 Claude Code 的"指挥家",它实现了:

  1. 并发执行:充分利用系统资源
  2. 流式输出:实时反馈和更好的用户体验
  3. 细粒度控制:独立管理每个工具的生命周期
  4. 结果一致性:保持输出顺序的可预测性
  5. 错误恢复:优雅处理各种失败情况
  6. 资源优化:智能管理并发和缓存

核心设计原则:

原则 实现 价值
独立性 每个工具有独立的控制器 可单独中断和重试
可观察性 详细的状态追踪和日志 易于调试和监控
用户控制 多层次的中断机制 保持用户主导
性能优先 并发、缓存、批处理 快速响应
优雅降级 错误恢复和回滚 可靠的系统

这种编排架构展示了 Harness Engineering 的精髓:不是简单地执行工具,而是创建一个智能的、可控制的、高性能的执行环境,让 AI 智能与实际能力完美协作。

在 Part 2 中,我们将转向 Prompt 工程,探讨如何通过精心设计的 System Prompts 来引导 AI 的行为——这是 Claude Code 的"控制平面"。

关键要点

  1. 编排是指挥艺术:协调多个工具和谐工作
  2. 并发执行策略:根据工具特性选择并发级别
  3. 流式输出:实时反馈改善用户体验
  4. 细粒度中断:独立控制每个工具
  5. 结果排序:保持输出一致性
  6. 性能优化:批处理、缓存、资源池化

延伸阅读

  • 第 5 章:系统提示词架构——控制平面
  • 第 6 章:通过提示词引导行为
  • StreamingToolExecutor 源码src/services/tools/StreamingToolExecutor.ts
  • 工具执行源码src/services/tools/toolExecution.ts
Enjoyed this article? Share it with others!