SSE 流式处理
用 Server-Sent Events 实时接收 Claude 的输出。本文涵盖什么时候该开 stream、7 种事件类型详解、Python / Node SDK 和原生 fetch 三种解析方式,以及流中错误处理。BUZZ 对 SSE 帧做透明转发,不做任何缓冲或重组。
"stream": true
什么时候用 streaming
开启 stream 后,响应会通过同一条 HTTP 连接以 SSE 帧的形式持续下发,你的应用可以边收边渲染。下面这些场景应该开:
- 长响应。输出超过几百 token 的请求都该开。用户在几十毫秒内看到第一个字,而不是等模型把整段写完。
- 交互式 UX。聊天 UI、代码助手、任何对感知延迟敏感的产品。第一个 token 在模型还没"想完"时就到了。
- Tool Use 循环。模型生成
tool_use块时,JSON 入参以input_json_delta形式逐步流出,你的应用可以提前安排工具调度,而不是等整段消息结束。 - Extended Thinking。Opus 4.7 开启 thinking 后,
thinking_delta流让你实时显示推理进度,而不阻塞最终答案。 - 长连接保活。
ping心跳事件能在慢速生成或代理空闲超时的情况下保住 TCP 连接。
启用方法
请求体加 "stream": true。其他参数全部不变。
curl -N -X POST https://buzzai.cc/v1/messages \
-H "Authorization: Bearer $BUZZ_API_KEY" \
-H "anthropic-version: 2023-06-01" \
-H "Content-Type: application/json" \
-d '{
"model": "claude-haiku-4-5-20251001",
"max_tokens": 200,
"stream": true,
"messages": [
{"role": "user", "content": "从 1 数到 5"}
]
}'from anthropic import Anthropic
client = Anthropic(
base_url="https://buzzai.cc",
api_key="",
)
with client.messages.stream(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{"role": "user", "content": "从 1 数到 5"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic({
baseURL: "https://buzzai.cc",
apiKey: process.env.BUZZ_API_KEY,
});
const stream = await client.messages.stream({
model: "claude-haiku-4-5-20251001",
max_tokens: 200,
messages: [{ role: "user", content: "从 1 数到 5" }],
});
for await (const chunk of stream) {
if (chunk.type === "content_block_delta" && chunk.delta.type === "text_delta") {
process.stdout.write(chunk.delta.text);
}
}curl 的 -N 参数关闭输出缓冲,SSE 帧到达终端后立即刷新。
7 种事件类型
每条流都是一组命名事件的序列。日常使用时大多数客户端只关心 content_block_delta,但每个事件都有明确职责。
| 事件 | 含义 | 典型动作 |
|---|---|---|
| message_start | 响应头帧。带有 id、model 和初始 usage。 | 记录消息 id 用于日志或重试关联。 |
| content_block_start | 新内容块开始,块类型为 text、tool_use 或 thinking。 | 按 index 分配缓冲区。 |
| content_block_delta | 当前内容块的增量,见下面的 4 种 delta 形式。 | 追加到对应 index 的缓冲区。 |
| content_block_stop | 当前内容块结束。 | 固化缓冲区。tool_use 块需要把累积的 partial_json 解析成 input 对象。 |
| message_delta | 最终 stop_reason 与累计 usage。 | 更新 token 计费,检查停止原因。 |
| message_stop | 流结束,后面不会再有帧。 | 关闭响应句柄。 |
| ping | 心跳保活帧,任意时机可能下发。 | 忽略(或重置空闲计时器)。 |
| error | 200 响应之后的流中错误,见 流中错误。 | 停止读取,把错误抛给上层。 |
content_block_delta 内的 4 种 delta
| delta.type | 取值字段 | 用途 |
|---|---|---|
| text_delta | delta.text | 可见文本生成 |
| input_json_delta | delta.partial_json | Tool 参数 JSON 增量 |
| thinking_delta | delta.thinking | Extended Thinking 推理痕迹(Opus 4.7) |
| signature_delta | delta.signature | 已签名 thinking 块的签名段 |
实测帧样本
以下是从 https://buzzai.cc/v1/messages 实测捕获的"从 1 数到 3"流帧:
event: message_start
data: {"type":"message_start","message":{"model":"claude-haiku-4-5-20251001","id":"msg_01YkyqfgStqigCHAgJ6uUDfd","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":7,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":1,"service_tier":"standard"}}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"1\n2\n3"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":7,"output_tokens":5}}
event: message_stop
data: {"type":"message_stop"}
Python SDK 解析
官方 Anthropic Python SDK 用上下文管理器封装了流式接口,指向 BUZZ 只需要改一处 base URL。
from anthropic import Anthropic
client = Anthropic(
base_url="https://buzzai.cc",
api_key="",
)
with client.messages.stream(
model="claude-haiku-4-5-20251001",
max_tokens=400,
messages=[{"role": "user", "content": "写一首关于缓存的俳句"}],
) as stream:
# 方式 A:只迭代文本
for text in stream.text_stream:
print(text, end="", flush=True)
# 方式 B:遍历完整事件并按类型分发
# for event in stream:
# if event.type == "content_block_delta":
# if event.delta.type == "text_delta":
# print(event.delta.text, end="", flush=True)
# elif event.delta.type == "input_json_delta":
# handle_tool_input_chunk(event.index, event.delta.partial_json)
final_message = stream.get_final_message()
print()
print("stop_reason:", final_message.stop_reason)
print("usage:", final_message.usage)
上下文管理器保证即使消费者中途退出,底层 HTTP 响应也会被正确关闭。get_final_message() 在流结束后返回组装好的 Message 对象。
Node.js SDK 解析
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic({
baseURL: "https://buzzai.cc",
apiKey: process.env.BUZZ_API_KEY,
});
const stream = await client.messages.stream({
model: "claude-haiku-4-5-20251001",
max_tokens: 400,
messages: [{ role: "user", content: "写一首关于缓存的俳句" }],
});
for await (const event of stream) {
switch (event.type) {
case "content_block_delta":
if (event.delta.type === "text_delta") {
process.stdout.write(event.delta.text);
} else if (event.delta.type === "input_json_delta") {
// 按 event.index 累积 event.delta.partial_json
}
break;
case "message_delta":
// 最终 stop_reason 和累计 usage
break;
}
}
const finalMessage = await stream.finalMessage();
console.log("\nstop_reason:", finalMessage.stop_reason);
console.log("usage:", finalMessage.usage);
SDK 已经处理了 SSE 分帧、JSON 解析、断帧重组缓冲。生产代码优先用 SDK,不要自己解析 wire 格式。
原生 fetch 解析 SSE
无法引入完整 SDK 时(比如边缘运行时),或需要完全控制分帧时使用。SSE wire 格式很朴素:event: 开头的行是事件名,data: 开头的行是 JSON 载荷,空行分隔每一帧。
async function streamMessages() {
const resp = await fetch("https://buzzai.cc/v1/messages", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.BUZZ_API_KEY}`,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "claude-haiku-4-5-20251001",
max_tokens: 400,
stream: true,
messages: [{ role: "user", content: "写一首关于缓存的俳句" }],
}),
});
if (!resp.ok) {
throw new Error(`HTTP ${resp.status} ${await resp.text()}`);
}
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE 帧之间用空行分隔
const frames = buffer.split("\n\n");
buffer = frames.pop() ?? "";
for (const frame of frames) {
const lines = frame.split("\n");
let eventName = "message";
let dataLines = [];
for (const line of lines) {
if (line.startsWith("event:")) eventName = line.slice(6).trim();
else if (line.startsWith("data:")) dataLines.push(line.slice(5).trim());
}
if (dataLines.length === 0) continue;
const payload = JSON.parse(dataLines.join("\n"));
if (eventName === "content_block_delta" && payload.delta?.type === "text_delta") {
process.stdout.write(payload.delta.text);
} else if (eventName === "error") {
throw new Error(`stream error: ${payload.error?.type} ${payload.error?.message}`);
} else if (eventName === "message_stop") {
return;
}
}
}
}
第一次手写 SSE 解析常踩两个坑:
- 帧可能跨
read()切分。永远要用一个buffer在多次读之间留住未完整的帧,只消费到最后一个完整\n\n。 - 一帧可以有多行
data:。先用\n把它们拼起来,再 JSON 解析。
流中错误与断连
一旦网关已经发出 HTTP 200 OK 头,后面的所有失败都不会再以非 200 状态形式出现,而是以 SSE 事件的形式到达。三种情况都要显式处理:
1. event: error 帧
上游可能在生成已经开始之后才触发 rate_limit_error 或 overloaded_error。帧形如:
event: error
data: {"type":"error","error":{"type":"overloaded_error","message":"..."}}
按硬失败处理:停止读取、把 error.type 抛给上层、决定是否重试。具体退避策略见错误处理指南。
2. 流中 TCP 断连
连接在没有 message_stop 的情况下断开。客户端表现为流提前结束。SDK 会抛异常(Python 是 APIConnectionError,Node 是 APIConnectionError)。
恢复需要重新发起整条请求。Claude 流没有原生 resume token。如果你需要幂等性,自己用 metadata.user_id 加业务侧的关联 id。
3. 客户端主动取消
关闭响应(Python 退出 with 块,Node 调 stream.controller.abort(),原生 fetch 调 reader.cancel())会通知上游停止生成。已经生成的 token 会照常计费,所以用户关掉聊天时尽快取消。
BUZZ 透明转发承诺
BUZZ 不缓冲、不重组、不改写 SSE 流。你看到的帧边界就是上游模型生成的帧边界,顺序也保持原样。具体表现:
- 不合并事件帧。上游下发 8 个
content_block_delta帧,你就收到 8 个帧。网关不会把多个小 text_delta 合并成大块以减少帧数。 - 不规范化 JSON。
data:载荷按字节透传。上游引入的未知字段在你的客户端原样出现。 - 不改写请求。你发的字节就是上游模型收到的字节,包括所有
cache_control标记。这是 Prompt Cache 端到端可用的前提(实测:同一 payload 第一次cache_creation_input_tokens=1200,第二次cache_read_input_tokens=1200)。 - BUZZ 只可能加字段,不会去字段。你可能看到
usage.iterations[](逐迭代计费透明化)或context_management.applied_edits。解析器要兼容未知字段。
生产清单
- 能用官方 SDK 就用。手写 SSE 解析迟早踩到分帧 bug。
event: error帧和HTTP 4xx/5xx要分别处理。- 按
index分别维护content_block_delta缓冲。tool_use块需要全部partial_json累积完才能解析。 - 最终 token 数读
message_delta.usage,不是message_start.usage。 - 429 遵守
retry-after,529 用更长退避。详见错误处理指南。 - 用户中途取消时立刻 abort 流,避免为没用上的 token 付费。