Query 循环
QueryEngine 与 query() 的核心循环机制——用户输入到 API 调用、流式响应、工具调度、结果收集的完整迭代过程
职责概述
解决的问题:大模型不会自己调用工具——它只返回文本。需要一个循环引擎反复调用 API、解析模型请求的工具调用、执行工具、把结果喂回模型,直到任务完成。这就是 Query 循环干的事。
应用场景:① 用户输入一条消息后,引擎自动驱动"模型思考→调工具→再思考"的多轮循环 ② 处理需要多次工具调用的复杂任务(如跨文件重构) ③ 在 token 预算耗尽、用户中断或模型表示"完成"时优雅终止。
一句话理解:就像你和 AI 下棋——你走一步(用户输入),AI 走一步(模型回复),AI 可能需要查棋谱(调工具),查完再想,直到分出胜负(任务完成)。
架构设计
核心数据流
QueryEngine.ts:209 的 submitMessage() 是 SDK/REPL 调用查询的入口。它首先构建 processUserInputContext,处理孤立权限,然后调用 processUserInput() 解析用户输入(包括斜杠命令):
// QueryEngine.ts:410-428
const { messages: messagesFromUserInput, shouldQuery, allowedTools,
model: modelFromUserInput, resultText } =
await processUserInput({ input: prompt, mode: 'prompt', ... });
如果 shouldQuery 为 false(斜杠命令已处理),直接 yield 结果并返回。
QueryEngine.ts:288-325 组装系统提示词,由三部分组成:
defaultSystemPrompt— 工具描述 + 模型感知的默认提示memoryMechanicsPrompt— 记忆系统指令(当有自定义提示 + 内存覆盖时注入)appendSystemPrompt— 用户追加的系统提示
// QueryEngine.ts:321-325
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
]);
query.ts:219 的 query() 是一个 AsyncGenerator,委托给内部 queryLoop():
// query.ts:219-239
export async function* query(params: QueryParams): AsyncGenerator<...> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
QueryEngine.submitMessage() 通过 for await (const message of query({...})) 消费 yield 的每条消息。
query.ts:307 的 while(true) 循环每次迭代开始时:
- 技能发现预取(并行于模型流式传输)
- yield stream_request_start — 通知消费者请求开始
- 查询链追踪 — 分配 chainId + depth
- 工具结果预算 — 限制工具结果大小
- Snip — 历史消息裁剪
- Microcompact — 缓存编辑级压缩
- Context Collapse — 投影式上下文折叠
- Autocompact — 全文摘要压缩(达到阈值时触发)
// query.ts:454-468 核心压缩调用
const { compactionResult, consecutiveFailures } = await deps.autocompact(
messagesForQuery, toolUseContext, { systemPrompt, userContext, ... }
);
query.ts:654-659 通过 deps.callModel() 发起流式 API 请求:
// query.ts:659-708
for await (const message of deps.callModel({
messages: prependUserContext(messagesForQuery, userContext),
systemPrompt: fullSystemPrompt,
thinkingConfig, tools, signal, options: {
model: currentModel, fallbackModel, querySource,
...(params.taskBudget && { taskBudget: { total, remaining } }),
}
})) { ... }
流式响应中逐块处理:收集 assistant 消息、检测 tool_use 块、执行流式工具、处理 fallback 切换。错误通过 FallbackTriggeredError 触发模型降级重试。
query.ts:1366-1408 工具执行阶段。支持两种模式:
- StreamingToolExecutor — 在模型流式输出 tool_use 块时即开始执行工具,与模型输出并行
- runTools() — 传统模式,等模型完成后批量执行
// query.ts:1380-1408
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext);
for await (const update of toolUpdates) {
if (update.message) yield update.message;
if (update.newContext) updatedToolUseContext = { ...update.newContext, queryTracking };
}
- 无工具调用 (
!needsFollowUp) — 检查是否需要恢复(prompt-too-long、max-output-tokens),否则处理 stop hooks 后终止 - 有工具调用 — 构建新的
State,合并工具结果,continue回到循环顶部
State 对象在迭代间传递,所有继续点使用 state = { ... } 而非可变赋值:
// query.ts:204-217 迭代间状态
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
pendingToolUseSummary: Promise<...> | undefined
stopHookActive: boolean | undefined
turnCount: number
transition: Continue | undefined // 上次迭代为何继续
}
关键类型与接口
QueryEngineConfig
// QueryEngine.ts:130-173
export type QueryEngineConfig = {
cwd: string
tools: Tools
commands: Command[]
mcpClients: MCPServerConnection[]
agents: AgentDefinition[]
canUseTool: CanUseToolFn
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[]
readFileCache: FileStateCache
customSystemPrompt?: string
appendSystemPrompt?: string
thinkingConfig?: ThinkingConfig
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
jsonSchema?: Record<string, unknown>
abortController?: AbortController
snipReplay?: (yieldedSystemMsg, store) => { messages, executed } | undefined
}
QueryParams
// query.ts:181-199
export type QueryParams = {
messages: Message[]
systemPrompt: SystemPrompt
userContext: { [k: string]: string }
systemContext: { [k: string]: string }
canUseTool: CanUseToolFn
toolUseContext: ToolUseContext
fallbackModel?: string
querySource: QuerySource
maxOutputTokensOverride?: number
maxTurns?: number
taskBudget?: { total: number }
deps?: QueryDeps
}
State(迭代间可变状态)
// query.ts:204-217
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
transition: Continue | undefined
}
QueryEngine 类签名
// QueryEngine.ts:184-1177
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[]
private abortController: AbortController
private permissionDenials: SDKPermissionDenial[]
private totalUsage: NonNullableUsage
private readFileState: FileStateCache
constructor(config: QueryEngineConfig)
async *submitMessage(prompt, options?): AsyncGenerator<SDKMessage, void, unknown>
interrupt(): void
getMessages(): readonly Message[]
getReadFileState(): FileStateCache
getSessionId(): string
setModel(model: string): void
}
设计模式与亮点
1. AsyncGenerator 消费者模式
query() 使用 AsyncGenerator 而非回调或 Promise,这使得消费者(无论是 REPL 的 ask() 还是 SDK 的 submitMessage())可以自然地用 for await...of 逐步处理流式消息,同时保持对中断(.return())和错误传播(throw())的支持。
2. 不可变状态传递 (Immutable State Transition)
queryLoop 中的 State 对象在每次 continue 时完整替换(state = { ... }),而非逐字段修改。这使得每个 continue 点的意图完全可追溯——transition 字段记录了上次为何继续。
3. 流式工具执行 (Streaming Tool Execution)
传统的 Agentic 循环是"模型完成 → 执行工具 → 继续"。Claude Code 的 StreamingToolExecutor 在模型还在流式输出时就开始并行执行工具,显著减少了多工具调用的延迟。模型输出 tool_use 块后立即调用 addTool(),工具在后台执行,结果通过 getCompletedResults() 在流式循环中逐个收集。
4. 多层压缩策略 (Multi-Layer Compression)
上下文管理采用四层渐进压缩,每层都有不同的粒度和成本:
- Snip — 最细粒度,裁剪历史消息尾部
- Microcompact — 缓存编辑级压缩,保留工具结果 ID
- Context Collapse — 投影式折叠,保留粒度上下文
- Autocompact — 最粗粒度,全文摘要
5. 恢复循环 (Recovery Loop)
query 循环内嵌多个恢复机制:prompt-too-long 通过 collapse drain 或 reactive compact 恢复;max-output-tokens 通过提升输出上限或注入恢复消息继续;model fallback 通过降级到备用模型重试。每种恢复都有递增计数器防止无限循环。
开发者实践指南
CLAUDE_CODE_DEBUG=1 和 --verbose 标志可以看到每一步的详细信息,包括压缩决策、工具执行时间和 token 使用量。添加新的继续条件
在 query.ts 的 queryLoop 中添加新的 continue 点时,必须:
- 在
State类型中添加相关追踪字段 - 在 continue 点构建完整的
State对象 - 设置
transition字段记录原因 - 确保恢复计数器递增(防止无限循环)
- 添加对应的
logEvent用于遥测
添加新的 yield 消息类型
query() 的 AsyncGenerator 可以 yield 多种消息类型:StreamEvent、RequestStartEvent、Message、TombstoneMessage、ToolUseSummaryMessage。添加新类型需要更新 QueryParams 的 yield 类型联合和所有消费者的 switch/case。
QueryEngine vs query() 的分工
QueryEngine 负责会话级管理(消息历史、转录持久化、SDK 消息格式化),query() 负责单次查询的循环逻辑。新的会话级功能(如消息持久化策略)应放在 QueryEngine 中;新的循环行为(如新的压缩策略)应放在 query() 中。
架构师决策指南
AsyncGenerator 的优势与代价
使用 AsyncGenerator 作为核心抽象,使得流式处理、背压控制和中止语义可以统一处理。消费者可以随时通过 generator.return() 优雅终止,或通过 AbortController 中断正在进行的工具执行。代价是调试复杂度增加——生成器的调用栈在 yield 点是不连续的。
并行化的边界
流式工具执行将模型输出与工具执行重叠,但工具执行本身是串行的(for await...of toolUpdates)。完全并行的工具执行会增加竞态风险(文件系统冲突、权限决策冲突)。当前设计在并行度和安全性之间取得了平衡。
discard() 方法在 model fallback 时被调用,确保旧 tool_use_id 对应的工具结果不会泄露到重试请求中。这是正确性的关键点。压缩策略的扩展性
四层压缩系统的执行顺序是有意设计的:Snip 在 Microcompact 之前(因为 snip 移除的消息不需要微压缩);Context Collapse 在 Autocompact 之前(折叠可能避免昂贵的全文压缩)。添加新的压缩层必须仔细考虑它在管道中的位置。
任务预算 (Task Budget) 的设计
taskBudget 与 tokenBudget 是独立的机制:前者是 API 端的输出 token 预算(跨压缩边界累计计算),后者是客户端侧的 Agentic turn token 预算。两者独立操作,可以组合使用。Task budget 在每次压缩后通过 taskBudgetRemaining 传递累计消耗,因为压缩后的 API 无法看到被摘要掉的历史。
◈ 可视化处理拓扑图
Query 循环是 Claude Code 的心脏。QueryEngine.submitMessage 准备上下文,query() AsyncGenerator 驱动核心循环,queryLoop() 在 while(true) 中迭代处理:4 层压缩管道 → API 流式调用 → 流式工具执行 → 继续决策。模型降级、错误恢复和死亡螺旋检测是循环内的关键安全机制。
queryLoop 使用"不可变 State + continue"模式——每个迭代开始时解构 state(L311-321),结束时通过 state = next; continue 跳回顶部。StreamingToolExecutor 在模型还在流式输出时就并行执行工具,大幅降低多工具调用的延迟。模型降级时 discard() 确保旧 tool_use_id 的结果不会泄露到重试请求中。⇉ 核心处理流程详解
查询循环是 Claude Code 的心脏。从用户输入到模型响应再到工具执行,数据经过一条精心编排的流水线:QueryEngine.submitMessage 准备上下文,query() 生成器驱动核心循环,queryLoop() 在其中迭代处理压缩、API 调用、工具执行和错误恢复。每一轮迭代都是一个完整的"思考-行动"周期。
QueryEngine.ts:209-236 接收用户提示和配置,解构出 cwd/tools/mcpClients/canUseTool 等参数。L244-271 创建 wrappedCanUseTool 包装器追踪权限拒绝。L284-300 调用 fetchSystemPromptParts() 构建系统提示词,包括默认提示、用户上下文、MCP 协调器上下文和自定义提示的合并(L321-325)。query.ts:219-239 是一个 async generator,调用 queryLoop() 并用 yield* 代理所有事件。循环正常退出后(L235-238),通知所有已消费的命令 UUID 完成。这种设计确保生成器的异常和中断路径都能正确传播。query.ts:241-279 解构不可变参数(systemPrompt/userContext 等),创建可变状态 State:消息数组、工具上下文、压缩追踪、输出 token 恢复计数等。L280 初始化 budget tracker,L291 初始化 taskBudgetRemaining 用于跨压缩边界的 token 预算追踪。L295 调用 buildQueryConfig() 快照不可变环境状态。while(true) 主循环后(L307),依次执行四层压缩:Snip(L401-410,移除已处理工具调用释放 token)→ Microcompact(L414-426,缓存编辑压缩工具结果)→ Context Collapse(L440-447,折叠历史消息为摘要)→ Autocompact(L454-468,完整上下文摘要)。顺序精心设计:Snip 在 Microcompact 前(被 snip 移除的不需要微压缩),Collapse 在 Autocompact 前(折叠可能避免昂贵的全文压缩)。query.ts:654-954 通过 deps.callModel()(L659)发起流式 API 请求。流式循环(L659-863)处理每个到达的消息块:提取 tool_use 块(L829-835),通过 StreamingToolExecutor 并行执行工具(L841-844),yield 完成的工具结果(L851-861)。模型回退机制在 FallbackTriggeredError 时切换模型重试(L894-953),丢弃旧结果并重建执行器。query.ts:1062-1256 处理"无后续操作"场景的多重恢复:Prompt-too-long(L1085-1183)先尝试 context collapse drain(廉价),再尝试 reactive compact(昂贵);max_output_tokens(L1188-1256)先升级到 64k token 限制重试,再注入恢复消息让模型继续。每次恢复都通过 state = next; continue 回到循环顶部,保持状态一致性。query.ts:1360-1409 执行工具调用,由 toolOrchestration.ts 控制并发策略。核心机制是 isConcurrencySafe 分类:并发安全工具(Read/Glob/Grep/WebFetch/LSP 等,isConcurrencySafe=true)在批次内最多 10 个并行(CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY 可配置);非安全工具(Edit/Write/Bash 写操作等,默认 false)串行排他执行,必须等当前所有工具完成。partitionToolCalls()(toolOrchestration.ts:91)将连续的安全工具合并为一个并发批次,非安全工具单独串行。StreamingToolExecutor 在模型流式输出时即开始执行,通过 canExecuteTool()(StreamingToolExecutor.ts:129)判断:新工具可执行当且仅当当前无工具在跑,或新工具与所有正在执行的工具都是并发安全的。query.ts:1537-1590+ 将工具结果、附件消息和排队命令合并为新消息数组,通过 state = { messages: [...], ... } 模式传递到下一轮迭代。整个状态传递使用不可变更新(创建新 State 对象而非修改字段),使得每个 continue 点都有明确的语义。maxTurns 检查(L1568-1574)在每轮结束时判断是否终止循环。state = next; continue 跳回顶部。所有 7 个 continue 站点(压缩恢复、prompt-too-long 恢复、max_output_tokens 恢复、模型回退、stop-hook 阻塞、token budget 继续)都遵循同一模式。这种设计避免了深度嵌套的 if-else,使控制流扁平且可审计。★ 设计精华
1. Async Generator 作为流式协程
query() 和 queryLoop() 都是 async generator 函数,通过 yield 向调用方实时推送事件(流式消息、工具结果、系统通知),同时内部维护完整的循环状态。调用方(REPL 或 SDK)可以按需消费事件,也可以通过 .return() 中断生成器。这种设计将"生产"和"消费"解耦——query 不需要知道谁在监听。
// query.ts:219-239 — yield* 代理整个循环
export async function* query(params: QueryParams):
AsyncGenerator {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
// 正常退出后通知命令完成
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
for await...of 循环退出时自动调用 .return(),触发生成器的 finally 块。这确保了即使 REPL 被 Ctrl+C 中断,资源清理(abort controller、MCP 锁释放)也能正确执行。2. 四层压缩管道的顺序依赖
压缩不是一次性的操作,而是精心排序的四层管道。Snip 先于 Microcompact(被移除的消息不需要微压缩,且 snip 释放的 token 数要传给 autocompact 的阈值计算)。Context Collapse 先于 Autocompact(折叠保留粒度上下文,而 autocompact 生成单一摘要)。这个顺序使得系统可以"渐进式"地回收 token——先用廉价操作,必要时才用昂贵操作。
// query.ts:401-468 — 压缩管道执行顺序
// Layer 1: Snip — 移除已处理的工具调用(最廉价)
let snipTokensFreed = 0
if (feature('HISTORY_SNIP')) {
const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
messagesForQuery = snipResult.messages
snipTokensFreed = snipResult.tokensFreed // 传递给 autocompact 阈值计算
}
// Layer 2: Microcompact — 缓存编辑压缩工具结果
const microcompactResult = await deps.microcompact(messagesForQuery, ...)
// Layer 3: Context Collapse — 折叠历史为摘要(保留粒度)
if (feature('CONTEXT_COLLAPSE') && contextCollapse) {
const collapseResult = await contextCollapse.applyCollapsesIfNeeded(...)
}
// Layer 4: Autocompact — 完整上下文摘要(最昂贵)
const { compactionResult } = await deps.autocompact(messagesForQuery, ...)
tokenCountWithEstimation 读取 usage 字段,但 snip 移除的是尾部之前的消息——usage 字段(来自尾部 assistant)不会反映 snip 的效果。因此必须显式传递 snipTokensFreed 并在阈值计算中减去。3. StreamingToolExecutor — 并行工具执行
传统模式是"收集所有 tool_use → 串行执行 → 返回结果"。StreamingToolExecutor 改为"流式接收 tool_use → 立即开始执行 → 完成即返回结果",使工具执行与模型流式输出并行。这在大批量工具调用时显著降低延迟——模型还在输出后续 tool_use 时,前面的工具已经开始执行。
// query.ts:838-862 — 流式工具执行
if (streamingToolExecutor && !toolUseContext.abortController.signal.aborted) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message) // 立即入队执行
}
}
// 轮询已完成的结果
for (const result of streamingToolExecutor.getCompletedResults()) {
if (result.message) {
yield result.message // 完成即推送,不等所有工具
}
}
discard() 方法(query.ts:734)在模型回退时被调用,确保旧 tool_use_id 对应的工具结果不会泄露到重试请求中。这是正确性的关键——如果保留旧结果,API 会因 tool_use/tool_result 不匹配而报错。4. 读写分离与并发控制 — isConcurrencySafe
CC 不使用简单的"全部串行"或"全部并行",而是通过 isConcurrencySafe() 对每个工具分类。每个 Tool 实现此方法,返回 true 的工具可以在批次内并行(上限 10),返回 false 的工具必须串行排他执行。partitionToolCalls()(toolOrchestration.ts:91)将连续的安全工具合并为一个并发批次,非安全工具单独成批。
// toolOrchestration.ts:10 — 并发上限
function getMaxToolUseConcurrency(): number {
return parseInt(process.env.CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY || '', 10) || 10
}
// StreamingToolExecutor.ts:129 — 流式路径的安全判断
private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing')
return (
executingTools.length === 0 ||
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
)
}
// 工具分类示例(Tool.ts 各实现)
// Read/Glob/Grep/WebFetch/LSP → isConcurrencySafe = true(并发安全)
// Edit/Write → isConcurrencySafe = false(串行排他)
// Bash → this.isReadOnly?.(input) ?? false(条件安全)
false(Tool.ts:759)——新工具不显式覆盖就默认串行。这是"安全优先"的设计:宁可牺牲并行度,也不冒竞态风险。Bash 工具最复杂:根据命令内容动态判断(grep/find 安全,rm/npm install 不安全)。5. 可 withheld 消息的延迟恢复
某些错误(prompt-too-long、max-output-tokens)不应立即展示给用户,因为可能通过自动恢复机制解决。queryLoop 在 L799-822 实现了"withhold"机制:错误消息仍然推入 assistantMessages(供恢复逻辑检查),但不 yield 给调用方。如果恢复成功,用户永远不会看到这个错误;如果恢复失败,才在 L1255 yield 展示。
// query.ts:799-825 — 错误消息暂扣逻辑
let withheld = false
if (contextCollapse?.isWithheldPromptTooLong(message, ...)) {
withheld = true
}
if (reactiveCompact?.isWithheldPromptTooLong(message)) {
withheld = true
}
if (isWithheldMaxOutputTokens(message)) {
withheld = true
}
if (!withheld) {
yield yieldMessage // 正常消息直接推送
}
if (message.type === 'assistant') {
assistantMessages.push(message) // 无论是否 withheld 都记录
}
◈ Agent 实践借鉴 — 客服 Agent 对话循环设计
场景映射:客服对话循环的真实挑战
客户说"我要退货",背后是一串工具调用链:
- 多轮工具链:查订单(orderService.query)→ 查物流状态(logisticsService.track)→ 判断是否符合退货条件(业务规则)→ 调用退单接口(refundService.create)→ 通知仓库(warehouseService.notify)。这不是一轮 chat 能搞定的。
- 中途取消:客户在退单流程走到一半说"算了不退了"。Agent 必须能中断正在执行的退单接口调用,不能提交了退款请求再告诉客户"已经退了改不了"。
- 高并发:10 万客户同时在线,每个客户的对话循环不能阻塞其他客户。一个退单接口超时 5 秒,不能让其他 9999 个客户跟着等。
- 会话超时:客户发了消息后去接电话,3 分钟没回复。对话循环需要自动超时释放资源,不能一直占着。
借鉴 CC:AsyncGenerator 循环 + AbortController 中断
CC 的 query() 用 AsyncGenerator 实现了"接收输入 → 调用模型 → 执行工具 → yield 结果 → 继续循环"的核心模式。消费者通过 for await...of 流式接收事件,通过 AbortController 中断。客服场景的对话循环结构完全一样:
// CC 的 query 循环(简化)
async function* query(params): AsyncGenerator<Event> {
while (true) {
// 1. 压缩上下文
// 2. 流式调用 LLM
for await (const chunk of callModel({ signal })) {
if (signal.aborted) return;
if (chunk.type === 'tool_use') startToolExecution(chunk);
yield event;
}
// 3. 执行工具,结果追加到消息历史
// 4. state = { ...state, messages: [...results] }; continue;
}
}
客服 Agent 怎么改:对话循环 AsyncGenerator 模式
核心结构照搬 CC 的 query 循环,但去掉 tool_use_count 预算限制(客服场景不限制工具调用次数,业务需要几轮就几轮),增加三种中断场景:
// 客服 Agent 对话循环 — AsyncGenerator 模式
type DialogEvent =
| { type: 'text_delta'; text: string } // 流式文本回复
| { type: 'tool_call'; tool: string; args: any } // 工具调用通知
| { type: 'tool_result'; tool: string; data: any } // 工具执行结果
| { type: 'interrupted'; reason: string } // 中断通知
| { type: 'complete'; summary: string }; // 对话完成
async function* dialogLoop(
session: CustomerSession
): AsyncGenerator<DialogEvent> {
let messages: Message[] = session.initialMessages;
const abortSignal = session.abortController.signal;
try {
while (true) {
if (abortSignal.aborted) {
yield { type: 'interrupted', reason: session.abortReason };
return;
}
// 阶段 1:调用 LLM(流式)
yield { type: 'text_delta', text: '' }; // 让消费者知道请求开始
const stream = callLLM({
messages,
tools: session.availableTools,
systemPrompt: session.systemPrompt,
signal: abortSignal,
});
let toolCalls: ToolCall[] = [];
let assistantText = '';
// 阶段 2:流式处理 LLM 响应
for await (const chunk of stream) {
if (abortSignal.aborted) {
yield { type: 'interrupted', reason: session.abortReason };
return;
}
if (chunk.type === 'text') {
assistantText += chunk.text;
yield { type: 'text_delta', text: chunk.text }; // 实时推送给客户
}
if (chunk.type === 'tool_use') {
toolCalls.push(chunk);
yield { type: 'tool_call', tool: chunk.name, args: chunk.input };
}
}
// 阶段 3:无工具调用 → LLM 认为对话可以结束
if (toolCalls.length === 0) {
yield { type: 'complete', summary: assistantText };
return;
}
// 阶段 4:执行工具 + 收集结果
// 客服场景不限制工具调用次数,业务需要几轮就几轮
const toolResults: Message[] = [];
for (const toolCall of toolCalls) {
if (abortSignal.aborted) {
yield { type: 'interrupted', reason: session.abortReason };
return;
}
const result = await executeTool(toolCall, session);
yield { type: 'tool_result', tool: toolCall.name, data: result };
toolResults.push({ role: 'tool_result', content: result, toolUseId: toolCall.id });
}
// 阶段 5:累积消息到对话历史(消息累积器,照搬 CC 的 State 模式)
messages = [
...messages,
{ role: 'assistant', content: assistantText, toolCalls },
...toolResults,
];
// continue → 回到 while(true) 顶部,LLM 根据工具结果决定下一步
}
} finally {
// 资源清理:释放会话锁、通知外部系统会话结束
session.cleanup();
}
}
三种中断场景的实现
// 客服场景的三种中断
class CustomerSession {
abortController = new AbortController();
abortReason: string = '';
// 中断 1:客户主动取消("算了不退了")
cancelByCustomer(reason: string) {
this.abortReason = 'customer_cancel';
this.abortController.abort();
// dialogLoop 的 finally 块自动执行资源清理
}
// 中断 2:会话超时(客户 3 分钟没回复)
startTimeout(timeoutMs: number = 180_000) {
setTimeout(() => {
this.abortReason = 'session_timeout';
this.abortController.abort();
}, timeoutMs);
}
// 中断 3:坐席接手(人工客服介入,Agent 退出)
handoverToAgent(agentId: string) {
this.abortReason = 'agent_takeover';
this.abortController.abort();
// 坐席接手后,dialogLoop yield interrupted 事件
// 上层调度器将对话路由到人工坐席
}
}
// 消费者:每个客户一个独立的 dialogLoop 实例
// 10 万客户 = 10 万个独立的 AsyncGenerator,互不阻塞
async function handleCustomer(customerId: string) {
const session = new CustomerSession(customerId);
session.startTimeout(180_000); // 3 分钟超时
for await (const event of dialogLoop(session)) {
switch (event.type) {
case 'text_delta':
sendToCustomer(customerId, event.text); // 实时推送
break;
case 'tool_result':
logToolUsage(customerId, event.tool, event.data); // 审计日志
break;
case 'interrupted':
handleInterruption(customerId, event.reason);
break;
}
}
}
对话历史管理:工具结果怎么追加到上下文
// 消息累积器 — 照搬 CC 的不可变 State 模式
// 每轮工具调用后,结果追加到消息历史,下轮 LLM 调用自动带上
// 关键:客服场景的对话历史需要持久化(客户可能断线重连)
interface DialogState {
messages: Message[];
turnCount: number;
lastActiveAt: number; // 用于超时判断
customerId: string;
orderId?: string; // 当前关联订单(工具调用后填充)
}
// 不可变更新:创建新 State 而非修改字段(照搬 CC 的 state = {...} 模式)
function accumulateResults(state: DialogState, results: Message[]): DialogState {
return {
...state,
messages: [...state.messages, ...results],
turnCount: state.turnCount + 1,
lastActiveAt: Date.now(),
};
}
// 压缩策略:客服场景比 CC 简单
// CC 有 4 层压缩(snip → microcompact → collapse → autocompact)
// 客服场景只需要 2 层:
// 1. 摘要:把已解决的工具调用结果替换为一句话摘要
// 2. 截断:保留最近 N 轮完整对话 + 更早的摘要
async function compressIfNeeded(state: DialogState): Promise<DialogState> {
const tokenCount = estimateTokens(state.messages);
if (tokenCount < MAX_TOKENS * 0.8) return state;
// 保留最近 5 轮完整对话
const recentMessages = state.messages.slice(-10); // 5 轮 ≈ 10 条
// 早期消息用 LLM 生成摘要
const oldMessages = state.messages.slice(0, -10);
const summary = await generateSummary(oldMessages);
return {
...state,
messages: [
{ role: 'system', content: `之前的对话摘要:${summary}` },
...recentMessages,
],
};
}
落地清单
- AsyncGenerator 流式输出:对话循环用
async function*+yield。客户能实时看到 Agent 的回复(text_delta),不用等整个工具链跑完。 - AbortController 中断:客户取消、超时、坐席接手三种场景共用一个 AbortController。循环内的每个
await点都检查signal.aborted。 - 消息累积:工具结果追加到 messages 数组,下轮 LLM 自动看到。用不可变更新(
state = {...state, messages: [...]}),避免引用混乱。 - finally 资源清理:generator 的 finally 块释放会话锁、通知外部系统。无论正常结束还是中断,清理都会执行。
- tool_use_count 预算限制:CC 用
maxTurns防止无限循环。客服场景不限制——退单流程需要 5 轮工具调用就 5 轮,没必要人为设上限。 - StreamingToolExecutor 并行工具执行:CC 在模型流式输出时就开始并行执行工具。客服场景的工具调用有先后依赖(先查订单才能查物流),并行收益不大。
- 4 层压缩:CC 的 snip/microcompact/collapse/autocompact 是为代码编辑场景设计的。客服对话更简单,2 层压缩(摘要 + 截断)足够。
- 工具执行不检查 abortSignal:
refundService.create()执行了 2 秒,期间客户说"不退了"。如果退单接口不检查 signal,退款已经提交了。解法:所有外部调用传signal,接口层在发起请求前检查signal.aborted。 - 每个客户一个进程而不是一个 generator:10 万客户 = 10 万进程,内存直接爆掉。正确做法是 10 万个 AsyncGenerator 在少量 worker 线程中调度。
- 中断后不通知客户:客户说"不退了",Agent 内部中断了循环但没给客户发消息。客户看到的是 Agent 突然不说话了。正确做法:yield
interrupted事件后,上层发送"好的,已为您取消退货流程"。 - 对话历史丢失工具调用结果:压缩时把工具结果全删了,LLM 不知道订单号是什么,又开始问客户。解法:压缩时保留关键实体(订单号、退款金额)在摘要中。
代码索引
| 文件 | 行数 | 说明 |
|---|---|---|
QueryEngine.ts | ~1296 | QueryEngine 类:会话管理、submitMessage 入口、SDK 消息 yield |
query.ts | ~1730 | query() 生成器:核心循环、压缩、API 调用、工具调度 |
coordinator/coordinatorMode.ts | ~370 | 协调器模式:多 Agent 调度系统提示生成 |
state/AppStateStore.ts | ~1200+ | AppState 类型:toolPermissionContext、mcp、tasks 等 |
state/store.ts | ~35 | 通用状态容器(不可变比较 + 监听器通知) |
tools/toolOrchestration.ts | ~180 | 工具并发编排:partitionToolCalls 读写分离、并发上限 10 |
tools/StreamingToolExecutor.ts | ~200 | 流式工具执行器:canExecuteTool 安全判断、discard 丢弃 |