深入剖析 Claude Code:关键功能与实现细节(第三部分)
2026-03-31
Claude Code AI Agent TypeScript 工具实现 Git集成
深入剖析 Claude Code:关键功能与实现细节
系列文章第三部分:核心工具与功能实现
在前两篇文章中,我们探讨了 Claude Code 的整体架构和设计模式。今天,我们将深入其核心工具的实现细节,看看这些关键功能是如何构建的。
BashTool:命令执行的精密机制
BashTool 是 Claude Code 中最复杂的工具之一,它负责安全地执行 shell 命令并流式返回结果。
执行流程
// src/tools/BashTool/BashTool.ts (简化版)
export class BashTool implements Tool {
name = 'bash'
description = 'Execute shell commands'
async call(
params: { command: string },
context: ToolUseContext
): AsyncGenerator {
const { command } = params
const { signal } = context.abortController
// 1. 验证命令安全性
this.validateCommand(command)
// 2. 发送进度更新
yield { type: 'progress', data: `Executing: ${command}` }
// 3. 启动子进程
const shell = process.env.SHELL || '/bin/bash'
const args = ['-c', command]
const childProcess = spawn(shell, args, {
cwd: getCwd(),
env: process.env,
timeout: 120000 // 2 分钟超时
})
// 4. 流式读取输出
const outputStream = this.streamOutput(childProcess, signal)
for await (const chunk of outputStream) {
yield { type: 'data', data: chunk }
}
// 5. 等待进程结束
const exitCode = await this.waitForExit(childProcess)
yield { type: 'result', data: { exitCode } }
}
private validateCommand(command: string): void {
// 检测交互式命令
const interactive = [
'vim', 'vi', 'nano', 'emacs',
'top', 'htop', 'less', 'more'
]
if (interactive.some(cmd => command.includes(cmd))) {
throw new Error(
`Interactive commands not supported: ${command}`
)
}
}
private async* streamOutput(
process: ChildProcess,
signal: AbortSignal
): AsyncGenerator {
const { stdout, stderr } = process
// 合并 stdout 和 stderr
const streams = [stdout, stderr].filter(Boolean)
for (const stream of streams) {
stream.on('data', (chunk) => {
if (!signal.aborted) {
// 逐块输出,而不是缓冲
yield chunk.toString()
}
})
if (signal.aborted) {
process.kill()
return
}
}
}
} 超时处理
const DEFAULT_TIMEOUT = 120000 // 2 分钟
async function executeWithTimeout(
command: string,
timeout: number = DEFAULT_TIMEOUT
): Promise {
const controller = new AbortController()
const timeoutId = setTimeout(() => {
controller.abort()
}, timeout)
try {
const result = await executeCommand(command, controller)
clearTimeout(timeoutId)
return result
} catch (error) {
if (error.name === 'AbortError') {
throw new Error(
`Command timed out after ${timeout}ms: ${command}`
)
}
throw error
}
} 安全限制
BashTool 有多个安全机制:
// 1. 禁用交互式命令
function isInteractiveCommand(command: string): boolean {
const interactivePatterns = [
/vi\s+/, /vim\s+/, /nano\s+/,
/top\b/, /htop\b/,
/sudo\s+/, // 权限提升
]
return interactivePatterns.some(pattern => pattern.test(command))
}
// 2. 命令长度限制
const MAX_COMMAND_LENGTH = 10000
if (command.length > MAX_COMMAND_LENGTH) {
throw new Error('Command too long')
}
// 3. 特殊字符过滤
function hasDangerousCharacters(command: string): boolean {
const dangerous = ['\x00', '\r', '\n']
return dangerous.some(char => command.includes(char))
}文件操作工具
FileReadTool:智能文件读取
// src/tools/FileReadTool/FileReadTool.ts
export class FileReadTool implements Tool {
name = 'read_file'
async call(
params: { file_path: string; offset?: number; limit?: number },
context: ToolUseContext
): AsyncGenerator {
const { file_path, offset = 0, limit } = params
// 检查缓存
const cached = this.checkCache(file_path)
if (cached && cached.mtime === this.getMtime(file_path)) {
yield { type: 'data', data: cached.content }
return
}
// 读取文件
const content = await fs.readFile(file_path, 'utf-8')
// 分页支持
const lines = content.split('\n')
const start = offset
const end = limit ? offset + limit : lines.length
const paginated = lines.slice(start, end).join('\n')
// 添加行号
const numbered = this.addLineNumbers(paginated, start)
yield { type: 'data', data: numbered }
// 更新缓存
this.updateCache(file_path, content)
}
private addLineNumbers(content: string, start: number): string {
return content
.split('\n')
.map((line, index) => {
const num = start + index + 1
return `${num.toString().padStart(6)}│${line}`
})
.join('\n')
}
private checkCache(path: string): { content: string; mtime: number } | null {
const timestamps = context.readFileTimestamps
if (timestamps[path]) {
return {
content: timestamps[path].content,
mtime: timestamps[path].mtime
}
}
return null
}
} FileEditTool:精确字符串替换
// src/tools/FileEditTool/FileEditTool.ts
export class FileEditTool implements Tool {
name = 'str_replace'
async call(
params: {
file_path: string
old_string: string
new_string: string
},
context: ToolUseContext
): AsyncGenerator {
const { file_path, old_string, new_string } = params
// 读取文件
const content = await fs.readFile(file_path, 'utf-8')
// 检查 old_string 是否唯一
const occurrences = (content.match(old_string) || []).length
if (occurrences === 0) {
throw new Error('old_string not found in file')
}
if (occurrences > 1) {
throw new Error(
`old_string appears ${occurrences} times. ` +
'Must be unique for safe replacement.'
)
}
// 执行替换
const newContent = content.replace(old_string, new_string)
// 写入文件
await fs.writeFile(file_path, newContent, 'utf-8')
yield {
type: 'result',
data: {
success: true,
file_path,
replacement_count: 1
}
}
}
} FileWriteTool:创建新文件
// src/tools/FileWriteTool/FileWriteTool.ts
export class FileWriteTool implements Tool {
name = 'write_file'
async call(
params: { file_path: string; content: string },
context: ToolUseContext
): AsyncGenerator {
const { file_path, content } = params
// 检查文件是否已存在
if (await fs.pathExists(file_path)) {
throw new Error(
`File already exists: ${file_path}. ` +
'Use str_replace to edit existing files.'
)
}
// 确保目录存在
const dir = path.dirname(file_path)
await fs.ensureDir(dir)
// 写入文件
await fs.writeFile(file_path, content, 'utf-8')
yield {
type: 'result',
data: {
success: true,
file_path,
size: content.length
}
}
}
} AgentTool:子 Agent 系统
AgentTool 是最强大的工具之一,它可以生成专门的子 Agent 来处理特定任务。
Agent 类型
// src/tools/AgentTool/AgentTool.ts
type SubagentType =
| 'Explore' // 快速代码库探索
| 'Plan' // 实现计划设计
| 'BugAnalyzer' // 深度调试
| 'GeneralPurpose' // 通用代理
| 'code-reviewer' // 代码审查
| 'ui-sketcher' // UI 设计
export class AgentTool implements Tool {
name = 'agent'
description = 'Launch specialized sub-agents'
async call(
params: {
subagent_type: SubagentType
prompt: string
run_in_background?: boolean
},
context: ToolUseContext
): AsyncGenerator {
const { subagent_type, prompt, run_in_background = false } = params
// 创建子 Agent
const agent = this.createAgent(subagent_type)
if (run_in_background) {
// 后台执行
const taskId = await this.launchBackgroundAgent(agent, prompt)
yield {
type: 'progress',
data: `Agent ${subagent_type} running in background (ID: ${taskId})`
}
yield { type: 'result', data: { task_id: taskId, status: 'running' } }
} else {
// 前台执行
yield { type: 'progress', data: `Starting ${subagent_type} agent...` }
const results = []
for await (const result of agent.execute(prompt, context)) {
yield result
results.push(result)
}
yield {
type: 'result',
data: { subagent_type, results }
}
}
}
private createAgent(type: SubagentType): SubAgent {
switch (type) {
case 'Explore':
return new ExploreAgent()
case 'Plan':
return new PlanAgent()
case 'BugAnalyzer':
return new BugAnalyzerAgent()
case 'code-reviewer':
return new CodeReviewerAgent()
case 'ui-sketcher':
return new UiSketcherAgent()
default:
return new GeneralPurposeAgent()
}
}
} Explore Agent 实现
// ExploreAgent 专注于快速代码库探索
class ExploreAgent implements SubAgent {
async* execute(
prompt: string,
context: ToolUseContext
): AsyncGenerator {
// 1. 解析用户意图
const intent = this.parseIntent(prompt)
// 2. 使用 Glob 查找相关文件
const files = await this.findFiles(intent.pattern)
// 3. 使用 Grep 搜索关键词
const matches = await this.searchContent(intent.keywords)
// 4. 读取关键文件
const contents = await this.readKeyFiles(files.slice(0, 10))
// 5. 综合分析
const summary = this.summarizeFindings(contents, matches)
yield {
type: 'data',
data: summary
}
}
private async findFiles(pattern: string): Promise {
const globTool = new GlobTool()
const result = await globTool.call({ pattern }, context)
return result.files
}
private async searchContent(keywords: string[]): Promise {
const grepTool = new GrepTool()
const matches = []
for (const keyword of keywords) {
const result = await grepTool.call({
pattern: keyword,
path: '.'
}, context)
matches.push(...result.matches)
}
return matches
}
} Plan Agent 实现
// Plan Agent 专注于创建实现计划
class PlanAgent implements SubAgent {
async* execute(
prompt: string,
context: ToolUseContext
): AsyncGenerator {
yield {
type: 'progress',
data: 'Analyzing requirements...'
}
// 1. 理解需求
const requirements = await this.analyzeRequirements(prompt)
// 2. 探索代码库
const codebaseInfo = await this.exploreCodebase(requirements)
// 3. 识别关键文件
const keyFiles = this.identifyKeyFiles(codebaseInfo)
// 4. 创建实施步骤
const steps = this.createImplementationSteps(
requirements,
keyFiles
)
// 5. 生成计划文档
const plan = this.formatPlan(steps)
yield {
type: 'data',
data: plan
}
}
private createImplementationSteps(
requirements: Requirements,
keyFiles: string[]
): ImplementationStep[] {
return [
{
title: 'Update data model',
files: ['src/types.ts'],
description: 'Add new types for the feature'
},
{
title: 'Implement API endpoint',
files: ['src/api/routes.ts'],
description: 'Create REST endpoint'
},
// ... 更多步骤
]
}
} 后台 Agent 执行
private async launchBackgroundAgent(
agent: SubAgent,
prompt: string
): Promise {
const taskId = uuidv4()
const abortController = new AbortController()
// 在后台运行
setImmediate(async () => {
try {
for await (const result of agent.execute(prompt, {
abortController
})) {
// 保存到任务存储
await saveTaskResult(taskId, result)
}
} catch (error) {
await saveTaskError(taskId, error)
}
})
return taskId
}
// 查询后台任务状态
async function getBackgroundTaskStatus(
taskId: string
): Promise {
const task = await loadTask(taskId)
return {
id: taskId,
status: task.completed ? 'completed' : 'running',
results: task.results,
error: task.error
}
} MCP 集成:扩展工具生态
MCP(Model Context Protocol)允许连接外部工具服务器。
MCP 客户端实现
// src/services/mcpClient.ts
export class MCPClient {
private servers: Map = new Map()
async connectServer(config: MCPConfig): Promise {
const server = new MCPServer(config)
// 启动服务器进程
await server.start()
// 列出可用工具
const tools = await server.listTools()
// 注册工具
for (const tool of tools) {
this.registerMCPTool(server, tool)
}
this.servers.set(config.name, server)
}
private registerMCPTool(
server: MCPServer,
toolDef: MCPToolDefinition
): void {
const tool: Tool = {
name: `${server.name}/${toolDef.name}`,
description: toolDef.description,
parameters: toolDef.inputSchema,
async call(params, context): AsyncGenerator {
try {
const result = await server.callTool(toolDef.name, params)
yield { type: 'result', data: result }
} catch (error) {
yield {
type: 'error',
data: `MCP tool error: ${error.message}`
}
}
}
}
// 注册到工具注册表
registerTool(tool)
}
} MCP 配置
// .mcprc.json
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@anthropic/mcp-server-filesystem", "/Users/yaya/projects"],
"disabled": false
},
"database": {
"command": "npx",
"args": ["-y", "@anthropic/mcp-server-postgres", "postgresql://..."],
"disabled": false
}
}
}MCP 权限管理
// 首次使用 MCP 工具时请求批准
async function requestMCPToolApproval(
serverName: string,
toolName: string,
toolDef: MCPToolDefinition
): Promise {
const prompt = `
MCP server '${serverName}' wants to register tool '${toolName}':
Description: ${toolDef.description}
This tool will have access to: ${toolDef.capabilities}
Approve? (once/always/deny)
`
const response = await askUser(prompt)
if (response === 'always') {
await saveApproval(serverName, toolName, 'always')
}
return response !== 'deny'
} Git 深度集成
Git 状态获取
// src/context.ts
export const getGitStatus = memoize(async (): Promise => {
if (!(await getIsGit())) {
return null
}
try {
const [branch, mainBranch, status, log, authorLog] =
await Promise.all([
execFile('git', ['branch', '--show-current']),
execFile('git', ['rev-parse', '--abbrev-ref', 'origin/HEAD']),
execFile('git', ['status', '--short']),
execFile('git', ['log', '--oneline', '-n', '5']),
execFile('git', ['log', '--oneline', '-n', '5',
'--author', await getGitEmail()])
])
// 限制状态输出长度
const statusLines = status.split('\n')
const truncatedStatus = statusLines.length > 200
? statusLines.slice(0, 200).join('\n') + '\n... (truncated)'
: status
return `
Git status (session start snapshot):
Current branch: ${branch}
Main branch: ${mainBranch.replace('origin/', '')}
Status:
${truncatedStatus || '(clean)'}
Recent commits:
${log}
Your recent commits:
${authorLog || '(none)'}
`
} catch (error) {
logError(error)
return null
}
}) 提交工作流辅助
// src/commands/commit.ts (简化版)
export async function createCommit(
message: string,
files?: string[]
): Promise {
// 1. 添加文件
if (files) {
await execFile('git', ['add', ...files])
} else {
await execFile('git', ['add', '-A'])
}
// 2. 检查状态
const status = await execFile('git', ['status', '--short'])
if (!status.trim()) {
throw new Error('No changes to commit')
}
// 3. 显示预览
console.log('Changes to be committed:')
console.log(status)
// 4. 确认
const confirmed = await askUser('Commit these changes? (y/n)')
if (!confirmed) {
return
}
// 5. 创建提交
const fullMessage = formatCommitMessage(message)
await execFile('git', ['commit', '-m', fullMessage])
console.log('✓ Commit created')
}
function formatCommitMessage(message: string): string {
const coAuthor = '\n\nCo-Authored-By: Claude Sonnet 4.5 '
return message + coAuthor
} PR 创建
// src/commands/pr.ts
export async function createPullRequest(
baseBranch?: string
): Promise {
// 1. 获取分支信息
const currentBranch = await execFile('git', ['branch', '--show-current'])
const mainBranch = baseBranch || await getMainBranch()
// 2. 检查差异
const hasDiff = await checkBranchDifference(currentBranch, mainBranch)
if (!hasDiff) {
throw new Error('No differences with main branch')
}
// 3. 生成 PR 标题和描述
const commits = await execFile('git', ['log', '--oneline',
`${mainBranch}..HEAD`])
const title = generatePRTitle(commits)
const description = await generatePRDescription(commits)
// 4. 使用 gh CLI 创建 PR
const args = [
'pr', 'create',
'--base', mainBranch,
'--title', title,
'--body', description
]
const result = await execFile('gh', args)
// 从输出中提取 PR URL
const urlMatch = result.match(/https:\/\/github\.com\/[^\/]+\/[^\/]+\/pull\/\d+/)
return urlMatch ? urlMatch[0] : ''
} 冲突解决指导
// src/commands/merge.ts
export async function resolveMergeConflicts(): Promise {
// 1. 检查冲突状态
const status = await execFile('git', ['status', '--short'])
const conflicts = status.split('\n')
.filter(line => line.includes('UU')) // UU = both modified
if (conflicts.length === 0) {
console.log('No merge conflicts found')
return
}
console.log(`Found ${conflicts.length} conflicts:`)
for (const conflict of conflicts) {
const file = conflict.split('\t')[1]
console.log(` - ${file}`)
}
// 2. 提供解决指导
for (const conflict of conflicts) {
const file = conflict.split('\t')[1]
console.log(`\nResolving: ${file}`)
console.log('Conflict markers:')
console.log(' <<<<<<< HEAD')
console.log(' Your changes')
console.log(' =======')
console.log(' Their changes')
console.log(' >>>>>>> branch-name')
// 3. 使用 AI 帮助解决
const resolution = await askAIForResolution(file)
if (resolution) {
await fs.writeFile(file, resolution, 'utf-8')
await execFile('git', ['add', file])
console.log(`✓ Resolved: ${file}`)
}
}
// 4. 完成合并
console.log('\nAll conflicts resolved. Complete merge with:')
console.log(' git commit')
} 上下文管理实现
目录结构生成
// src/context.ts
export const getDirectoryStructure = memoize(
async function (): Promise {
const abortController = new AbortController()
// 限制超时
setTimeout(() => abortController.abort(), 1_000)
try {
const model = await getSlowAndCapableModel()
// 使用 LSTool 生成树状结构
const resultsGen = LSTool.call(
{ path: '.' },
{
abortController,
options: {
commands: [],
tools: [],
slowAndCapableModel: model,
forkNumber: 0,
messageLogName: 'directory-structure',
maxThinkingTokens: 0
}
}
)
const result = await lastX(resultsGen)
const lines = result.data
return `Project file structure (session start snapshot):
${lines}
Note: This snapshot will not update during the conversation.
`
} catch (error) {
logError(error)
return ''
}
}
) 代码风格检测
// src/utils/style.ts
export function getCodeStyle(): string | null {
const projectConfig = getCurrentProjectConfig()
if (projectConfig.context?.codeStyle) {
return projectConfig.context.codeStyle
}
// 检测项目代码风格
const style = detectCodeStyle()
if (style) {
// 保存到配置
saveCurrentProjectConfig({
...projectConfig,
context: {
...projectConfig.context,
codeStyle: style
}
})
}
return style
}
function detectCodeStyle(): string | null {
const files = findSourceFiles()
if (files.length === 0) {
return null
}
// 分析前 10 个文件
const samples = files.slice(0, 10)
const styles = samples.map(analyzeFileStyle)
// 找出最常见的风格
const indentStyle = majorityOf(styles.map(s => s.indent))
const quoteStyle = majorityOf(styles.map(s => s.quotes))
const semicolonStyle = majorityOf(styles.map(s => s.semicolons))
return `
Code style detected from this project:
Indentation: ${indentStyle === 'tab' ? 'Tabs' : `${indentStyle} spaces`}
Quotes: ${quoteStyle === 'single' ? 'Single quotes' : 'Double quotes'}
Semicolons: ${semicolons ? 'Yes' : 'No'}
Please follow this style when writing code.
`
}
function analyzeFileStyle(filePath: string): FileStyle {
const content = fs.readFileSync(filePath, 'utf-8')
// 检测缩进
const tabMatches = content.match(/^\t/mg)
const space2Matches = content.match(/^ /mg)
const space4Matches = content.match(/^ /mg)
let indent = 'unknown'
if (tabMatches && tabMatches.length > (space2Matches?.length || 0)) {
indent = 'tab'
} else if (space4Matches && space4Matches.length > (space2Matches?.length || 0)) {
indent = 4
} else if (space2Matches) {
indent = 2
}
// 检测引号
const singleQuotes = (content.match(/'/g) || []).length
const doubleQuotes = (content.match(/"/g) || []).length
const quotes = singleQuotes > doubleQuotes ? 'single' : 'double'
// 检测分号
const semicolons = /\}\s*\n\s*\}/.test(content) ? false : true
return { indent, quotes, semicolons }
}成本追踪实现
// src/cost-tracker.ts
export class CostTracker {
private totalTokens = 0
private totalCost = 0
private sessionTokens = 0
private sessionCost = 0
trackUsage(usage: TokenUsage): void {
const inputCost = this.calculateCost(usage.input_tokens, 'input')
const outputCost = this.calculateCost(usage.output_tokens, 'output')
this.sessionTokens += usage.input_tokens + usage.output_tokens
this.sessionCost += inputCost + outputCost
this.totalTokens += usage.input_tokens + usage.output_tokens
this.totalCost += inputCost + outputCost
}
private calculateCost(tokens: number, type: 'input' | 'output'): number {
const prices = {
'input': 3.00 / 1_000_000, // $3 per million
'output': 15.00 / 1_000_000 // $15 per million
}
return tokens * prices[type]
}
getSessionSummary(): CostSummary {
return {
tokens: this.sessionTokens,
cost: this.sessionCost,
formatted: `$${this.sessionCost.toFixed(4)}`
}
}
getTotalSummary(): CostSummary {
return {
tokens: this.totalTokens,
cost: this.totalCost,
formatted: `$${this.totalCost.toFixed(2)}`
}
}
}下一步
在下一篇文章中,我们将通过实际代码示例,探讨:
- 最佳实践:从代码中学到的设计模式
- 错误处理:优雅的错误处理策略
- 测试策略:如何测试 AI Agent 系统
- 性能优化:实用优化技巧
- 扩展指南:如何添加新工具
系列目录