Documentation Index
Fetch the complete documentation index at: https://ccb-863780bf-feat-local-memory-vault-wiring.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
为什么需要流式
想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。
流式响应让用户实时看到 AI 的思考过程:
- 文字逐字出现,用户能提前判断方向是否正确
- 工具调用的参数在生成过程中就能预览
- 长时间任务不会让用户觉得”卡死了”
BetaRawMessageStreamEvent 核心事件类型
流式 API 返回的是一系列 BetaRawMessageStreamEvent,每种事件类型对应流式响应的不同阶段(src/services/api/claude.ts):
message_start ← 消息开始,包含 model、usage 初始值
├── content_block_start ← 内容块开始(text / tool_use / thinking)
│ ├── content_block_delta ← 增量数据(text_delta / input_json_delta / thinking_delta)
│ ├── content_block_delta ← ... 持续到达
│ └── content_block_stop ← 内容块结束,yield AssistantMessage
├── content_block_start ← 下一个内容块...
│ └── ...
└── message_delta ← stop_reason + 最终 usage
message_stop ← 消息结束
事件处理状态机
src/services/api/claude.ts 中 queryModelWithStreaming() 函数的事件处理循环实现了一个基于 switch(part.type) 的状态机:
| 事件类型 | 处理逻辑 | 状态变更 |
|---|
message_start | 初始化 partialMessage,记录 TTFT(首字节延迟) | usage 初始化 |
content_block_start | 按 part.index 创建对应类型的内容块 | contentBlocks[index] 初始化 |
content_block_delta | 按子类型增量追加数据 | text / thinking / input 累加 |
content_block_stop | 构建完整 AssistantMessage 并 yield | 消息推入 newMessages |
message_delta | 更新 stop_reason 和最终 usage | 写回最后一条消息 |
message_stop | 无操作(流结束标记) | — |
内容块类型及其增量数据
content_block_start 中的 content_block.type 决定了如何处理后续 delta:
| 内容块类型 | Delta 类型 | 累加逻辑 |
|---|
text | text_delta | text += delta.text |
thinking | thinking_delta + signature_delta | thinking += delta.thinking,signature = delta.signature |
tool_use | input_json_delta | input += delta.partial_json(JSON 字符串增量拼接) |
server_tool_use | input_json_delta | 同 tool_use |
connector_text | connector_text_delta | 特殊连接器文本(feature flag 控制) |
关键设计:content_block_start 时所有文本字段初始化为空字符串,只通过 content_block_delta 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。
一次 AI 响应可能包含多个内容块,交替出现:
content_block_start (text, index=0) "我来帮你修复这个 bug。"
content_block_delta (text_delta) "首先..."
content_block_stop (index=0)
content_block_start (tool_use, index=1) { name: "Read", input: "..." }
content_block_delta (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}'
content_block_stop (index=1)
content_block_start (text, index=2) "我已经看到了问题所在..."
content_block_stop (index=2)
每个 content_block_stop 触发一次 yield,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生多条 AssistantMessage——文本消息和工具调用消息交替产出。
stop_reason 要等到 message_delta 才确定(可能是 end_turn、tool_use、max_tokens 等),所以最后一条消息的 stop_reason 是回写的:
// claude.ts — stop_reason 回写逻辑(直接属性修改,不用对象替换)
// 因为 transcript 写队列持有 message.message 的引用
const lastMsg = newMessages.at(-1)
if (lastMsg) {
lastMsg.message.usage = usage
lastMsg.message.stop_reason = stopReason
}
流式中的错误处理
网络断开
流式连接依赖 SSE(Server-Sent Events)。当连接中断时,系统有两层检测机制:
- 被动停滞检测(
src/services/api/claude.ts 中 stall 检测逻辑):当下一个事件到达时,计算与上一个事件的时间间隔。超过阈值(30 秒,STALL_THRESHOLD_MS = 30_000)记录为一次 stall,累积计数并写入遥测日志。这是被动检测——仅在下一个 chunk 到达时才触发,不会主动中断流。
- 主动空闲超时看门狗(
src/services/api/claude.ts 中 STREAM_IDLE_TIMEOUT_MS 看门狗逻辑):使用 setTimeout 设置 90 秒(可通过 CLAUDE_STREAM_IDLE_TIMEOUT_MS 环境变量覆盖)的硬性超时。如果在此期间没有收到任何事件,主动终止流并抛出错误进入重试流程。
- 非流式降级:作为最后手段,设置
didFallBackToNonStreaming 标志,通过 executeNonStreamingRequest() 回退到非流式请求(一次性获取完整响应)。
// claude.ts — 被动停滞检测
const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞
let totalStallTime = 0
let stallCount = 0
// claude.ts — 主动空闲超时
const STREAM_IDLE_TIMEOUT_MS =
parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
API 限流
当 API 返回限流错误时,系统使用 withRetry 包装器进行指数退避重试。重试逻辑考虑了:
- 错误类型(429 限流 vs 500 服务器错误)
- 重试次数上限
- 退避间隔
Token 超限
两种 token 超限场景有不同的处理:
| 场景 | stop_reason | 处理方式 |
|---|
| 输出超限 | max_tokens | 生成错误消息,建议设置 CLAUDE_CODE_MAX_OUTPUT_TOKENS |
| 上下文窗口超限 | model_context_window_exceeded | 触发 compaction 压缩对话历史后重试 |
// claude.ts — stop_reason 处理
if (stopReason === 'max_tokens') {
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
if (stopReason === 'model_context_window_exceeded') {
// 复用 max_output_tokens 的恢复路径
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
流式停滞检测
系统持续监控事件到达间隔,检测”停滞”(stall):
// claude.ts — stall 检测逻辑
const STALL_THRESHOLD_MS = 30_000 // 30 秒无事件视为停滞
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
stallCount++
totalStallTime += timeSinceLastEvent
logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... })
}
这是被动检测——仅在下一个 chunk 到达时才触发比较。与之互补的是 90 秒主动空闲超时看门狗(STREAM_IDLE_TIMEOUT_MS),会直接中断长时间无响应的流。
工具执行的流式反馈
BashTool 的命令执行也是流式的——通过 onProgress 回调逐行推送输出:
BashTool.call() → runShellCommand() → AsyncGenerator
├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...)
├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds }
└── return { code, stdout, interrupted, ... }
UI 层通过 useToolCallProgress hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(shouldAutoBackground)。
多 Provider 适配
| Provider | 流式协议 | 特殊处理 |
|---|
| firstParty (Anthropic Direct) | 原生 SSE | 延迟最低,TTFT 最快 |
| AWS Bedrock | AWS SDK 流式接口 | 需要额外的 beta header 和认证 |
| Google Vertex | gRPC → 事件流 | 通过 getMergedBetas() 适配 |
| foundry | Anthropic 兼容 API | 内部部署 |
| openai | OpenAI 流式适配器 | 转换为 Anthropic 内部格式 |
| gemini | Gemini 流式适配器 | 转换为 Anthropic 内部格式 |
| grok (xAI) | Grok 流式适配器 | 转换为 Anthropic 内部格式 |
所有 Provider 通过统一的 Stream<BetaRawMessageStreamEvent> 抽象层屏蔽差异。上层代码(QueryEngine、REPL)不需要关心底层用的是哪个 Provider。
Provider 选择
src/utils/model/providers.ts 中的 getAPIProvider() 根据配置决定使用哪个 Provider:
// 根据 api_provider 配置选择:
// "anthropic" → 直连
// "bedrock" → AWS SDK
// "vertex" → Google SDK
// 第三方 base URL → 自动检测
每个 Provider 需要适配的细节包括:认证方式、beta header、请求参数格式、错误码映射——但这些差异在 claude.ts 的 queryStream() 函数中被统一处理。