职责概述

解决的问题:大模型不会自己调用工具——它只返回文本。需要一个循环引擎反复调用 API、解析模型请求的工具调用、执行工具、把结果喂回模型,直到任务完成。这就是 Query 循环干的事。

应用场景:① 用户输入一条消息后,引擎自动驱动"模型思考→调工具→再思考"的多轮循环 ② 处理需要多次工具调用的复杂任务(如跨文件重构) ③ 在 token 预算耗尽、用户中断或模型表示"完成"时优雅终止。

一句话理解:就像你和 AI 下棋——你走一步(用户输入),AI 走一步(模型回复),AI 可能需要查棋谱(调工具),查完再想,直到分出胜负(任务完成)。

架构设计

外部调用层
QueryEngine.submitMessage()
REPL ask() / SDK print.ts
输入处理层
processUserInput()
fetchSystemPromptParts()
skills/plugins 加载
Query 循环核心
query() AsyncGenerator
queryLoop()
State 不可变状态传递
API 与工具执行层
deps.callModel() 流式调用
StreamingToolExecutor
runTools() 批量执行
上下文管理层
autocompact / microcompact
snip / context-collapse
token budget / task budget

核心数据流

Phase 1: QueryEngine.submitMessage() 输入处理
QueryEngine.ts:209submitMessage() 是 SDK/REPL 调用查询的入口。它首先构建 processUserInputContext,处理孤立权限,然后调用 processUserInput() 解析用户输入(包括斜杠命令):
// QueryEngine.ts:410-428
const { messages: messagesFromUserInput, shouldQuery, allowedTools,
        model: modelFromUserInput, resultText } =
  await processUserInput({ input: prompt, mode: 'prompt', ... });
如果 shouldQuery 为 false(斜杠命令已处理),直接 yield 结果并返回。
Phase 2: 系统提示构建
QueryEngine.ts:288-325 组装系统提示词,由三部分组成:
  • defaultSystemPrompt — 工具描述 + 模型感知的默认提示
  • memoryMechanicsPrompt — 记忆系统指令(当有自定义提示 + 内存覆盖时注入)
  • appendSystemPrompt — 用户追加的系统提示
// QueryEngine.ts:321-325
const systemPrompt = asSystemPrompt([
  ...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
  ...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
  ...(appendSystemPrompt ? [appendSystemPrompt] : []),
]);
Phase 3: query() 进入主循环
query.ts:219query() 是一个 AsyncGenerator,委托给内部 queryLoop()
// query.ts:219-239
export async function* query(params: QueryParams): AsyncGenerator<...> {
  const consumedCommandUuids: string[] = []
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  for (const uuid of consumedCommandUuids) {
    notifyCommandLifecycle(uuid, 'completed')
  }
  return terminal
}
QueryEngine.submitMessage() 通过 for await (const message of query({...})) 消费 yield 的每条消息。
Phase 4: 迭代初始化与上下文压缩
query.ts:307while(true) 循环每次迭代开始时:
  1. 技能发现预取(并行于模型流式传输)
  2. yield stream_request_start — 通知消费者请求开始
  3. 查询链追踪 — 分配 chainId + depth
  4. 工具结果预算 — 限制工具结果大小
  5. Snip — 历史消息裁剪
  6. Microcompact — 缓存编辑级压缩
  7. Context Collapse — 投影式上下文折叠
  8. Autocompact — 全文摘要压缩(达到阈值时触发)
// query.ts:454-468 核心压缩调用
const { compactionResult, consecutiveFailures } = await deps.autocompact(
  messagesForQuery, toolUseContext, { systemPrompt, userContext, ... }
);
Phase 5: API 流式调用
query.ts:654-659 通过 deps.callModel() 发起流式 API 请求:
// query.ts:659-708
for await (const message of deps.callModel({
  messages: prependUserContext(messagesForQuery, userContext),
  systemPrompt: fullSystemPrompt,
  thinkingConfig, tools, signal, options: {
    model: currentModel, fallbackModel, querySource,
    ...(params.taskBudget && { taskBudget: { total, remaining } }),
  }
})) { ... }
流式响应中逐块处理:收集 assistant 消息、检测 tool_use 块、执行流式工具、处理 fallback 切换。错误通过 FallbackTriggeredError 触发模型降级重试。
Phase 6: 工具执行
query.ts:1366-1408 工具执行阶段。支持两种模式:
  • StreamingToolExecutor — 在模型流式输出 tool_use 块时即开始执行工具,与模型输出并行
  • runTools() — 传统模式,等模型完成后批量执行
// query.ts:1380-1408
const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext);
for await (const update of toolUpdates) {
  if (update.message) yield update.message;
  if (update.newContext) updatedToolUseContext = { ...update.newContext, queryTracking };
}
Phase 7: 继续决策与状态传递
工具执行完成后,循环进入继续决策阶段:
  • 无工具调用 (!needsFollowUp) — 检查是否需要恢复(prompt-too-long、max-output-tokens),否则处理 stop hooks 后终止
  • 有工具调用 — 构建新的 State,合并工具结果,continue 回到循环顶部
状态通过不可变 State 对象在迭代间传递,所有继续点使用 state = { ... } 而非可变赋值:
// query.ts:204-217 迭代间状态
type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  pendingToolUseSummary: Promise<...> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined  // 上次迭代为何继续
}

关键类型与接口

QueryEngineConfig

// QueryEngine.ts:130-173
export type QueryEngineConfig = {
  cwd: string
  tools: Tools
  commands: Command[]
  mcpClients: MCPServerConnection[]
  agents: AgentDefinition[]
  canUseTool: CanUseToolFn
  getAppState: () => AppState
  setAppState: (f: (prev: AppState) => AppState) => void
  initialMessages?: Message[]
  readFileCache: FileStateCache
  customSystemPrompt?: string
  appendSystemPrompt?: string
  thinkingConfig?: ThinkingConfig
  maxTurns?: number
  maxBudgetUsd?: number
  taskBudget?: { total: number }
  jsonSchema?: Record<string, unknown>
  abortController?: AbortController
  snipReplay?: (yieldedSystemMsg, store) => { messages, executed } | undefined
}

QueryParams

// query.ts:181-199
export type QueryParams = {
  messages: Message[]
  systemPrompt: SystemPrompt
  userContext: { [k: string]: string }
  systemContext: { [k: string]: string }
  canUseTool: CanUseToolFn
  toolUseContext: ToolUseContext
  fallbackModel?: string
  querySource: QuerySource
  maxOutputTokensOverride?: number
  maxTurns?: number
  taskBudget?: { total: number }
  deps?: QueryDeps
}

State(迭代间可变状态)

// query.ts:204-217
type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined
}

QueryEngine 类签名

// QueryEngine.ts:184-1177
export class QueryEngine {
  private config: QueryEngineConfig
  private mutableMessages: Message[]
  private abortController: AbortController
  private permissionDenials: SDKPermissionDenial[]
  private totalUsage: NonNullableUsage
  private readFileState: FileStateCache

  constructor(config: QueryEngineConfig)
  async *submitMessage(prompt, options?): AsyncGenerator<SDKMessage, void, unknown>
  interrupt(): void
  getMessages(): readonly Message[]
  getReadFileState(): FileStateCache
  getSessionId(): string
  setModel(model: string): void
}

设计模式与亮点

1. AsyncGenerator 消费者模式

query() 使用 AsyncGenerator 而非回调或 Promise,这使得消费者(无论是 REPL 的 ask() 还是 SDK 的 submitMessage())可以自然地用 for await...of 逐步处理流式消息,同时保持对中断(.return())和错误传播(throw())的支持。

2. 不可变状态传递 (Immutable State Transition)

queryLoop 中的 State 对象在每次 continue 时完整替换(state = { ... }),而非逐字段修改。这使得每个 continue 点的意图完全可追溯——transition 字段记录了上次为何继续。

State 的 7 个 continue 点都有对应的 transition reason:reactive_compact_retry、collapse_drain_retry、max_output_tokens_recovery、max_output_tokens_escalate、stop_hook_blocking、token_budget_continuation、model_fallback。

3. 流式工具执行 (Streaming Tool Execution)

传统的 Agentic 循环是"模型完成 → 执行工具 → 继续"。Claude Code 的 StreamingToolExecutor 在模型还在流式输出时就开始并行执行工具,显著减少了多工具调用的延迟。模型输出 tool_use 块后立即调用 addTool(),工具在后台执行,结果通过 getCompletedResults() 在流式循环中逐个收集。

4. 多层压缩策略 (Multi-Layer Compression)

上下文管理采用四层渐进压缩,每层都有不同的粒度和成本:

5. 恢复循环 (Recovery Loop)

query 循环内嵌多个恢复机制:prompt-too-long 通过 collapse drain 或 reactive compact 恢复;max-output-tokens 通过提升输出上限或注入恢复消息继续;model fallback 通过降级到备用模型重试。每种恢复都有递增计数器防止无限循环。

开发者实践指南

在调试查询循环时,使用环境变量 CLAUDE_CODE_DEBUG=1--verbose 标志可以看到每一步的详细信息,包括压缩决策、工具执行时间和 token 使用量。

添加新的继续条件

query.tsqueryLoop 中添加新的 continue 点时,必须:

  1. State 类型中添加相关追踪字段
  2. 在 continue 点构建完整的 State 对象
  3. 设置 transition 字段记录原因
  4. 确保恢复计数器递增(防止无限循环)
  5. 添加对应的 logEvent 用于遥测

添加新的 yield 消息类型

query() 的 AsyncGenerator 可以 yield 多种消息类型:StreamEventRequestStartEventMessageTombstoneMessageToolUseSummaryMessage。添加新类型需要更新 QueryParams 的 yield 类型联合和所有消费者的 switch/case。

QueryEngine vs query() 的分工

QueryEngine 负责会话级管理(消息历史、转录持久化、SDK 消息格式化),query() 负责单次查询的循环逻辑。新的会话级功能(如消息持久化策略)应放在 QueryEngine 中;新的循环行为(如新的压缩策略)应放在 query() 中。

架构师决策指南

AsyncGenerator 的优势与代价

使用 AsyncGenerator 作为核心抽象,使得流式处理、背压控制和中止语义可以统一处理。消费者可以随时通过 generator.return() 优雅终止,或通过 AbortController 中断正在进行的工具执行。代价是调试复杂度增加——生成器的调用栈在 yield 点是不连续的。

并行化的边界

流式工具执行将模型输出与工具执行重叠,但工具执行本身是串行的(for await...of toolUpdates)。完全并行的工具执行会增加竞态风险(文件系统冲突、权限决策冲突)。当前设计在并行度和安全性之间取得了平衡。

StreamingToolExecutor 的 discard() 方法在 model fallback 时被调用,确保旧 tool_use_id 对应的工具结果不会泄露到重试请求中。这是正确性的关键点。

压缩策略的扩展性

四层压缩系统的执行顺序是有意设计的:Snip 在 Microcompact 之前(因为 snip 移除的消息不需要微压缩);Context Collapse 在 Autocompact 之前(折叠可能避免昂贵的全文压缩)。添加新的压缩层必须仔细考虑它在管道中的位置。

任务预算 (Task Budget) 的设计

taskBudgettokenBudget 是独立的机制:前者是 API 端的输出 token 预算(跨压缩边界累计计算),后者是客户端侧的 Agentic turn token 预算。两者独立操作,可以组合使用。Task budget 在每次压缩后通过 taskBudgetRemaining 传递累计消耗,因为压缩后的 API 无法看到被摘要掉的历史。

可视化处理拓扑图

Query 循环是 Claude Code 的心脏。QueryEngine.submitMessage 准备上下文,query() AsyncGenerator 驱动核心循环,queryLoop()while(true) 中迭代处理:4 层压缩管道 → API 流式调用 → 流式工具执行 → 继续决策。模型降级、错误恢复和死亡螺旋检测是循环内的关键安全机制。

Phase 1 — 输入准备与系统提示构建
QueryEngine.submitMessage(prompt)QueryEngine.ts:209
processUserInput — 解析用户输入QueryEngine.ts:410-428
shouldQuery ?斜杠命令处理
yield 结果 + 返回斜杠命令已处理
是 ↓
fetchSystemPromptParts — 系统提示组装QueryEngine.ts:288-325
defaultPrompt工具描述 + 默认
memoryPrompt记忆系统指令
appendPrompt用户追加提示
权限包装 — wrappedCanUseToolQueryEngine.ts:244-271
↓ 进入 query() 主循环
Phase 2 — while(true) 主循环(queryLoop)
while(true) 迭代 — query.ts:307+
四层压缩管道
Layer 1: Snip — 历史消息裁剪query.ts:401-410 — 释放 token
Layer 2: Microcompact — 缓存编辑压缩query.ts:414-426
Layer 3: Context Collapse — 投影式折叠query.ts:440-447
Layer 4: Autocompact — 全文摘要压缩query.ts:454-468 — 最昂贵
deps.callModel — 流式 API 请求query.ts:654-708
流式响应处理 — 逐块解析query.ts:659-863
tool_use 块检测query.ts:829-835
StreamingToolExecutor.addToolquery.ts:841-844 — 并行执行
工具结果 yieldquery.ts:851-861
纯文本输出无 tool_use
yield assistant 消息query.ts:799-825
终止循环needsFollowUp=false
FallbackTriggeredError?query.ts:894-953
模型降级重试discard 旧结果 + 切换模型
工具执行 — 读写分离 + 并发上限 10toolOrchestration.ts + StreamingToolExecutor
并发安全(Read/Glob/Grep/WebFetch)isConcurrencySafe=true · 最多 10 并行
非安全(Edit/Write/Bash写操作)isConcurrencySafe=false · 串行排他
继续决策 — state = next; continuequery.ts:1537-1590+
有工具 → continue合并工具结果回循环
prompt-too-long → 恢复collapse drain / reactive compact
max-output → 恢复升级上限 + 注入消息
maxTurns → 终止query.ts:1568-1574
7 个 continue 站点的 State 传递
1
reactive_compact_retry
prompt-too-long 后触发 reactive compact 重试 · query.ts:1085-1183
2
collapse_drain_retry
context collapse drain 释放 token 后重试 · query.ts:1120
3
max_output_tokens_recovery
输出超限后升级到 64k 重试 · query.ts:1188-1256
4
model_fallback
FallbackTriggeredError → 降级模型重试 · query.ts:894-953
5
token_budget_continuation
task budget 未耗尽 → 继续循环 · query.ts:616
状态机设计queryLoop 使用"不可变 State + continue"模式——每个迭代开始时解构 state(L311-321),结束时通过 state = next; continue 跳回顶部。StreamingToolExecutor 在模型还在流式输出时就并行执行工具,大幅降低多工具调用的延迟。模型降级时 discard() 确保旧 tool_use_id 的结果不会泄露到重试请求中。

核心处理流程详解

查询循环是 Claude Code 的心脏。从用户输入到模型响应再到工具执行,数据经过一条精心编排的流水线:QueryEngine.submitMessage 准备上下文,query() 生成器驱动核心循环,queryLoop() 在其中迭代处理压缩、API 调用、工具执行和错误恢复。每一轮迭代都是一个完整的"思考-行动"周期。

1. QueryEngine.submitMessage — 入口准备
QueryEngine.ts:209-236 接收用户提示和配置,解构出 cwd/tools/mcpClients/canUseTool 等参数。L244-271 创建 wrappedCanUseTool 包装器追踪权限拒绝。L284-300 调用 fetchSystemPromptParts() 构建系统提示词,包括默认提示、用户上下文、MCP 协调器上下文和自定义提示的合并(L321-325)。
2. query() — 生成器入口
query.ts:219-239 是一个 async generator,调用 queryLoop() 并用 yield* 代理所有事件。循环正常退出后(L235-238),通知所有已消费的命令 UUID 完成。这种设计确保生成器的异常和中断路径都能正确传播。
3. queryLoop 初始化 — 状态构建
query.ts:241-279 解构不可变参数(systemPrompt/userContext 等),创建可变状态 State:消息数组、工具上下文、压缩追踪、输出 token 恢复计数等。L280 初始化 budget tracker,L291 初始化 taskBudgetRemaining 用于跨压缩边界的 token 预算追踪。L295 调用 buildQueryConfig() 快照不可变环境状态。
4. 四层压缩管道
进入 while(true) 主循环后(L307),依次执行四层压缩:Snip(L401-410,移除已处理工具调用释放 token)→ Microcompact(L414-426,缓存编辑压缩工具结果)→ Context Collapse(L440-447,折叠历史消息为摘要)→ Autocompact(L454-468,完整上下文摘要)。顺序精心设计:Snip 在 Microcompact 前(被 snip 移除的不需要微压缩),Collapse 在 Autocompact 前(折叠可能避免昂贵的全文压缩)。
5. API 调用与流式处理
query.ts:654-954 通过 deps.callModel()(L659)发起流式 API 请求。流式循环(L659-863)处理每个到达的消息块:提取 tool_use 块(L829-835),通过 StreamingToolExecutor 并行执行工具(L841-844),yield 完成的工具结果(L851-861)。模型回退机制在 FallbackTriggeredError 时切换模型重试(L894-953),丢弃旧结果并重建执行器。
6. 错误恢复 — 多重回退策略
query.ts:1062-1256 处理"无后续操作"场景的多重恢复:Prompt-too-long(L1085-1183)先尝试 context collapse drain(廉价),再尝试 reactive compact(昂贵);max_output_tokens(L1188-1256)先升级到 64k token 限制重试,再注入恢复消息让模型继续。每次恢复都通过 state = next; continue 回到循环顶部,保持状态一致性。
7. 工具执行 — 读写分离 + 并发上限 10
query.ts:1360-1409 执行工具调用,由 toolOrchestration.ts 控制并发策略。核心机制是 isConcurrencySafe 分类:并发安全工具(Read/Glob/Grep/WebFetch/LSP 等,isConcurrencySafe=true)在批次内最多 10 个并行CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY 可配置);非安全工具(Edit/Write/Bash 写操作等,默认 false)串行排他执行,必须等当前所有工具完成。partitionToolCalls()(toolOrchestration.ts:91)将连续的安全工具合并为一个并发批次,非安全工具单独串行。StreamingToolExecutor 在模型流式输出时即开始执行,通过 canExecuteTool()(StreamingToolExecutor.ts:129)判断:新工具可执行当且仅当当前无工具在跑,或新工具与所有正在执行的工具都是并发安全的。
8. 循环继续 — 状态传递
query.ts:1537-1590+ 将工具结果、附件消息和排队命令合并为新消息数组,通过 state = { messages: [...], ... } 模式传递到下一轮迭代。整个状态传递使用不可变更新(创建新 State 对象而非修改字段),使得每个 continue 点都有明确的语义。maxTurns 检查(L1568-1574)在每轮结束时判断是否终止循环。
queryLoop 的核心设计模式是"状态机 + continue":每个迭代开始时解构 state(L311-321),结束时通过 state = next; continue 跳回顶部。所有 7 个 continue 站点(压缩恢复、prompt-too-long 恢复、max_output_tokens 恢复、模型回退、stop-hook 阻塞、token budget 继续)都遵循同一模式。这种设计避免了深度嵌套的 if-else,使控制流扁平且可审计。

设计精华

1. Async Generator 作为流式协程

query()queryLoop() 都是 async generator 函数,通过 yield 向调用方实时推送事件(流式消息、工具结果、系统通知),同时内部维护完整的循环状态。调用方(REPL 或 SDK)可以按需消费事件,也可以通过 .return() 中断生成器。这种设计将"生产"和"消费"解耦——query 不需要知道谁在监听。

Generator-based Streaming Pipeline
yield 既用于输出也用于背压控制
// query.ts:219-239 — yield* 代理整个循环
export async function* query(params: QueryParams):
  AsyncGenerator {
  const consumedCommandUuids: string[] = []
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  // 正常退出后通知命令完成
  for (const uuid of consumedCommandUuids) {
    notifyCommandLifecycle(uuid, 'completed')
  }
  return terminal
}
async generator 的独特优势是自然支持中断:for await...of 循环退出时自动调用 .return(),触发生成器的 finally 块。这确保了即使 REPL 被 Ctrl+C 中断,资源清理(abort controller、MCP 锁释放)也能正确执行。

2. 四层压缩管道的顺序依赖

压缩不是一次性的操作,而是精心排序的四层管道。Snip 先于 Microcompact(被移除的消息不需要微压缩,且 snip 释放的 token 数要传给 autocompact 的阈值计算)。Context Collapse 先于 Autocompact(折叠保留粒度上下文,而 autocompact 生成单一摘要)。这个顺序使得系统可以"渐进式"地回收 token——先用廉价操作,必要时才用昂贵操作。

Layered Compression Pipeline
四层压缩按成本从低到高排列
// query.ts:401-468 — 压缩管道执行顺序
// Layer 1: Snip — 移除已处理的工具调用(最廉价)
let snipTokensFreed = 0
if (feature('HISTORY_SNIP')) {
  const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
  messagesForQuery = snipResult.messages
  snipTokensFreed = snipResult.tokensFreed // 传递给 autocompact 阈值计算
}
// Layer 2: Microcompact — 缓存编辑压缩工具结果
const microcompactResult = await deps.microcompact(messagesForQuery, ...)

// Layer 3: Context Collapse — 折叠历史为摘要(保留粒度)
if (feature('CONTEXT_COLLAPSE') && contextCollapse) {
  const collapseResult = await contextCollapse.applyCollapsesIfNeeded(...)
}

// Layer 4: Autocompact — 完整上下文摘要(最昂贵)
const { compactionResult } = await deps.autocompact(messagesForQuery, ...)
snipTokensFreed 的传递是精妙的细节:autocompact 的阈值检查使用 tokenCountWithEstimation 读取 usage 字段,但 snip 移除的是尾部之前的消息——usage 字段(来自尾部 assistant)不会反映 snip 的效果。因此必须显式传递 snipTokensFreed 并在阈值计算中减去。

3. StreamingToolExecutor — 并行工具执行

传统模式是"收集所有 tool_use → 串行执行 → 返回结果"。StreamingToolExecutor 改为"流式接收 tool_use → 立即开始执行 → 完成即返回结果",使工具执行与模型流式输出并行。这在大批量工具调用时显著降低延迟——模型还在输出后续 tool_use 时,前面的工具已经开始执行。

Streaming Tool Execution
工具执行与模型流式输出并行,完成后立即 yield
// query.ts:838-862 — 流式工具执行
if (streamingToolExecutor && !toolUseContext.abortController.signal.aborted) {
  for (const toolBlock of msgToolUseBlocks) {
    streamingToolExecutor.addTool(toolBlock, message) // 立即入队执行
  }
}
// 轮询已完成的结果
for (const result of streamingToolExecutor.getCompletedResults()) {
  if (result.message) {
    yield result.message // 完成即推送,不等所有工具
  }
}
StreamingToolExecutor 的 discard() 方法(query.ts:734)在模型回退时被调用,确保旧 tool_use_id 对应的工具结果不会泄露到重试请求中。这是正确性的关键——如果保留旧结果,API 会因 tool_use/tool_result 不匹配而报错。

4. 读写分离与并发控制 — isConcurrencySafe

CC 不使用简单的"全部串行"或"全部并行",而是通过 isConcurrencySafe() 对每个工具分类。每个 Tool 实现此方法,返回 true 的工具可以在批次内并行(上限 10),返回 false 的工具必须串行排他执行。partitionToolCalls()(toolOrchestration.ts:91)将连续的安全工具合并为一个并发批次,非安全工具单独成批。

Concurrency Safety Classification
isConcurrencySafe 决定并行还是串行,默认 false(安全优先)
// toolOrchestration.ts:10 — 并发上限
function getMaxToolUseConcurrency(): number {
  return parseInt(process.env.CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY || '', 10) || 10
}

// StreamingToolExecutor.ts:129 — 流式路径的安全判断
private canExecuteTool(isConcurrencySafe: boolean): boolean {
  const executingTools = this.tools.filter(t => t.status === 'executing')
  return (
    executingTools.length === 0 ||
    (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
  )
}

// 工具分类示例(Tool.ts 各实现)
// Read/Glob/Grep/WebFetch/LSP → isConcurrencySafe = true(并发安全)
// Edit/Write → isConcurrencySafe = false(串行排他)
// Bash → this.isReadOnly?.(input) ?? false(条件安全)
默认值是 false(Tool.ts:759)——新工具不显式覆盖就默认串行。这是"安全优先"的设计:宁可牺牲并行度,也不冒竞态风险。Bash 工具最复杂:根据命令内容动态判断(grep/find 安全,rm/npm install 不安全)。

5. 可 withheld 消息的延迟恢复

某些错误(prompt-too-long、max-output-tokens)不应立即展示给用户,因为可能通过自动恢复机制解决。queryLoop 在 L799-822 实现了"withhold"机制:错误消息仍然推入 assistantMessages(供恢复逻辑检查),但不 yield 给调用方。如果恢复成功,用户永远不会看到这个错误;如果恢复失败,才在 L1255 yield 展示。

Withheld Message Pattern
可恢复错误先"暂扣",恢复成功则不展示
// query.ts:799-825 — 错误消息暂扣逻辑
let withheld = false
if (contextCollapse?.isWithheldPromptTooLong(message, ...)) {
  withheld = true
}
if (reactiveCompact?.isWithheldPromptTooLong(message)) {
  withheld = true
}
if (isWithheldMaxOutputTokens(message)) {
  withheld = true
}
if (!withheld) {
  yield yieldMessage // 正常消息直接推送
}
if (message.type === 'assistant') {
  assistantMessages.push(message) // 无论是否 withheld 都记录
}
withheld 模式要解决的核心问题是"避免闪烁":如果先展示错误再恢复,用户会看到"出错了...等一下,又好了"的闪烁。暂扣机制确保只有真正无法恢复的错误才会展示。但暂扣也增加了复杂度——每个 withhold 点必须有对应的恢复路径,否则消息会被静默吞掉。

Agent 实践借鉴 — 客服 Agent 对话循环设计

场景映射:客服对话循环的真实挑战

客户说"我要退货",背后是一串工具调用链:

借鉴 CC:AsyncGenerator 循环 + AbortController 中断

CC 的 query()AsyncGenerator 实现了"接收输入 → 调用模型 → 执行工具 → yield 结果 → 继续循环"的核心模式。消费者通过 for await...of 流式接收事件,通过 AbortController 中断。客服场景的对话循环结构完全一样:

// CC 的 query 循环(简化)
async function* query(params): AsyncGenerator<Event> {
  while (true) {
    // 1. 压缩上下文
    // 2. 流式调用 LLM
    for await (const chunk of callModel({ signal })) {
      if (signal.aborted) return;
      if (chunk.type === 'tool_use') startToolExecution(chunk);
      yield event;
    }
    // 3. 执行工具,结果追加到消息历史
    // 4. state = { ...state, messages: [...results] }; continue;
  }
}

客服 Agent 怎么改:对话循环 AsyncGenerator 模式

核心结构照搬 CC 的 query 循环,但去掉 tool_use_count 预算限制(客服场景不限制工具调用次数,业务需要几轮就几轮),增加三种中断场景:

// 客服 Agent 对话循环 — AsyncGenerator 模式
type DialogEvent =
  | { type: 'text_delta'; text: string }         // 流式文本回复
  | { type: 'tool_call'; tool: string; args: any } // 工具调用通知
  | { type: 'tool_result'; tool: string; data: any } // 工具执行结果
  | { type: 'interrupted'; reason: string }       // 中断通知
  | { type: 'complete'; summary: string };        // 对话完成

async function* dialogLoop(
  session: CustomerSession
): AsyncGenerator<DialogEvent> {
  let messages: Message[] = session.initialMessages;
  const abortSignal = session.abortController.signal;

  try {
    while (true) {
      if (abortSignal.aborted) {
        yield { type: 'interrupted', reason: session.abortReason };
        return;
      }

      // 阶段 1:调用 LLM(流式)
      yield { type: 'text_delta', text: '' }; // 让消费者知道请求开始
      const stream = callLLM({
        messages,
        tools: session.availableTools,
        systemPrompt: session.systemPrompt,
        signal: abortSignal,
      });

      let toolCalls: ToolCall[] = [];
      let assistantText = '';

      // 阶段 2:流式处理 LLM 响应
      for await (const chunk of stream) {
        if (abortSignal.aborted) {
          yield { type: 'interrupted', reason: session.abortReason };
          return;
        }
        if (chunk.type === 'text') {
          assistantText += chunk.text;
          yield { type: 'text_delta', text: chunk.text }; // 实时推送给客户
        }
        if (chunk.type === 'tool_use') {
          toolCalls.push(chunk);
          yield { type: 'tool_call', tool: chunk.name, args: chunk.input };
        }
      }

      // 阶段 3:无工具调用 → LLM 认为对话可以结束
      if (toolCalls.length === 0) {
        yield { type: 'complete', summary: assistantText };
        return;
      }

      // 阶段 4:执行工具 + 收集结果
      // 客服场景不限制工具调用次数,业务需要几轮就几轮
      const toolResults: Message[] = [];
      for (const toolCall of toolCalls) {
        if (abortSignal.aborted) {
          yield { type: 'interrupted', reason: session.abortReason };
          return;
        }
        const result = await executeTool(toolCall, session);
        yield { type: 'tool_result', tool: toolCall.name, data: result };
        toolResults.push({ role: 'tool_result', content: result, toolUseId: toolCall.id });
      }

      // 阶段 5:累积消息到对话历史(消息累积器,照搬 CC 的 State 模式)
      messages = [
        ...messages,
        { role: 'assistant', content: assistantText, toolCalls },
        ...toolResults,
      ];
      // continue → 回到 while(true) 顶部,LLM 根据工具结果决定下一步
    }
  } finally {
    // 资源清理:释放会话锁、通知外部系统会话结束
    session.cleanup();
  }
}

三种中断场景的实现

// 客服场景的三种中断
class CustomerSession {
  abortController = new AbortController();
  abortReason: string = '';

  // 中断 1:客户主动取消("算了不退了")
  cancelByCustomer(reason: string) {
    this.abortReason = 'customer_cancel';
    this.abortController.abort();
    // dialogLoop 的 finally 块自动执行资源清理
  }

  // 中断 2:会话超时(客户 3 分钟没回复)
  startTimeout(timeoutMs: number = 180_000) {
    setTimeout(() => {
      this.abortReason = 'session_timeout';
      this.abortController.abort();
    }, timeoutMs);
  }

  // 中断 3:坐席接手(人工客服介入,Agent 退出)
  handoverToAgent(agentId: string) {
    this.abortReason = 'agent_takeover';
    this.abortController.abort();
    // 坐席接手后,dialogLoop yield interrupted 事件
    // 上层调度器将对话路由到人工坐席
  }
}

// 消费者:每个客户一个独立的 dialogLoop 实例
// 10 万客户 = 10 万个独立的 AsyncGenerator,互不阻塞
async function handleCustomer(customerId: string) {
  const session = new CustomerSession(customerId);
  session.startTimeout(180_000); // 3 分钟超时

  for await (const event of dialogLoop(session)) {
    switch (event.type) {
      case 'text_delta':
        sendToCustomer(customerId, event.text); // 实时推送
        break;
      case 'tool_result':
        logToolUsage(customerId, event.tool, event.data); // 审计日志
        break;
      case 'interrupted':
        handleInterruption(customerId, event.reason);
        break;
    }
  }
}

对话历史管理:工具结果怎么追加到上下文

// 消息累积器 — 照搬 CC 的不可变 State 模式
// 每轮工具调用后,结果追加到消息历史,下轮 LLM 调用自动带上

// 关键:客服场景的对话历史需要持久化(客户可能断线重连)
interface DialogState {
  messages: Message[];
  turnCount: number;
  lastActiveAt: number;  // 用于超时判断
  customerId: string;
  orderId?: string;       // 当前关联订单(工具调用后填充)
}

// 不可变更新:创建新 State 而非修改字段(照搬 CC 的 state = {...} 模式)
function accumulateResults(state: DialogState, results: Message[]): DialogState {
  return {
    ...state,
    messages: [...state.messages, ...results],
    turnCount: state.turnCount + 1,
    lastActiveAt: Date.now(),
  };
}

// 压缩策略:客服场景比 CC 简单
// CC 有 4 层压缩(snip → microcompact → collapse → autocompact)
// 客服场景只需要 2 层:
// 1. 摘要:把已解决的工具调用结果替换为一句话摘要
// 2. 截断:保留最近 N 轮完整对话 + 更早的摘要
async function compressIfNeeded(state: DialogState): Promise<DialogState> {
  const tokenCount = estimateTokens(state.messages);
  if (tokenCount < MAX_TOKENS * 0.8) return state;

  // 保留最近 5 轮完整对话
  const recentMessages = state.messages.slice(-10); // 5 轮 ≈ 10 条
  // 早期消息用 LLM 生成摘要
  const oldMessages = state.messages.slice(0, -10);
  const summary = await generateSummary(oldMessages);

  return {
    ...state,
    messages: [
      { role: 'system', content: `之前的对话摘要:${summary}` },
      ...recentMessages,
    ],
  };
}

落地清单

必须抄的:
  • AsyncGenerator 流式输出:对话循环用 async function* + yield。客户能实时看到 Agent 的回复(text_delta),不用等整个工具链跑完。
  • AbortController 中断:客户取消、超时、坐席接手三种场景共用一个 AbortController。循环内的每个 await 点都检查 signal.aborted
  • 消息累积:工具结果追加到 messages 数组,下轮 LLM 自动看到。用不可变更新(state = {...state, messages: [...]}),避免引用混乱。
  • finally 资源清理:generator 的 finally 块释放会话锁、通知外部系统。无论正常结束还是中断,清理都会执行。
不需要抄的:
  • tool_use_count 预算限制:CC 用 maxTurns 防止无限循环。客服场景不限制——退单流程需要 5 轮工具调用就 5 轮,没必要人为设上限。
  • StreamingToolExecutor 并行工具执行:CC 在模型流式输出时就开始并行执行工具。客服场景的工具调用有先后依赖(先查订单才能查物流),并行收益不大。
  • 4 层压缩:CC 的 snip/microcompact/collapse/autocompact 是为代码编辑场景设计的。客服对话更简单,2 层压缩(摘要 + 截断)足够。
常见坑:
  • 工具执行不检查 abortSignalrefundService.create() 执行了 2 秒,期间客户说"不退了"。如果退单接口不检查 signal,退款已经提交了。解法:所有外部调用传 signal,接口层在发起请求前检查 signal.aborted
  • 每个客户一个进程而不是一个 generator:10 万客户 = 10 万进程,内存直接爆掉。正确做法是 10 万个 AsyncGenerator 在少量 worker 线程中调度。
  • 中断后不通知客户:客户说"不退了",Agent 内部中断了循环但没给客户发消息。客户看到的是 Agent 突然不说话了。正确做法:yield interrupted 事件后,上层发送"好的,已为您取消退货流程"。
  • 对话历史丢失工具调用结果:压缩时把工具结果全删了,LLM 不知道订单号是什么,又开始问客户。解法:压缩时保留关键实体(订单号、退款金额)在摘要中。

代码索引

文件行数说明
QueryEngine.ts~1296QueryEngine 类:会话管理、submitMessage 入口、SDK 消息 yield
query.ts~1730query() 生成器:核心循环、压缩、API 调用、工具调度
coordinator/coordinatorMode.ts~370协调器模式:多 Agent 调度系统提示生成
state/AppStateStore.ts~1200+AppState 类型:toolPermissionContext、mcp、tasks 等
state/store.ts~35通用状态容器(不可变比较 + 监听器通知)
tools/toolOrchestration.ts~180工具并发编排:partitionToolCalls 读写分离、并发上限 10
tools/StreamingToolExecutor.ts~200流式工具执行器:canExecuteTool 安全判断、discard 丢弃