BUZZ AI Gateway
文档 · 指南 · SSE 流式处理

SSE 流式处理

用 Server-Sent Events 实时接收 Claude 的输出。本文涵盖什么时候该开 stream、7 种事件类型详解、Python / Node SDK 和原生 fetch 三种解析方式,以及流中错误处理。BUZZ 对 SSE 帧做透明转发,不做任何缓冲或重组。

POST https://buzzai.cc/v1/messages + "stream": true

什么时候用 streaming

开启 stream 后,响应会通过同一条 HTTP 连接以 SSE 帧的形式持续下发,你的应用可以边收边渲染。下面这些场景应该开:

启用方法

请求体加 "stream": true。其他参数全部不变。

curl -N -X POST https://buzzai.cc/v1/messages \
  -H "Authorization: Bearer $BUZZ_API_KEY" \
  -H "anthropic-version: 2023-06-01" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "claude-haiku-4-5-20251001",
    "max_tokens": 200,
    "stream": true,
    "messages": [
      {"role": "user", "content": "从 1 数到 5"}
    ]
  }'
from anthropic import Anthropic

client = Anthropic(
    base_url="https://buzzai.cc",
    api_key="",
)

with client.messages.stream(
    model="claude-haiku-4-5-20251001",
    max_tokens=200,
    messages=[{"role": "user", "content": "从 1 数到 5"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({
  baseURL: "https://buzzai.cc",
  apiKey: process.env.BUZZ_API_KEY,
});

const stream = await client.messages.stream({
  model: "claude-haiku-4-5-20251001",
  max_tokens: 200,
  messages: [{ role: "user", content: "从 1 数到 5" }],
});

for await (const chunk of stream) {
  if (chunk.type === "content_block_delta" && chunk.delta.type === "text_delta") {
    process.stdout.write(chunk.delta.text);
  }
}

curl 的 -N 参数关闭输出缓冲,SSE 帧到达终端后立即刷新。

7 种事件类型

每条流都是一组命名事件的序列。日常使用时大多数客户端只关心 content_block_delta,但每个事件都有明确职责。

事件含义典型动作
message_start响应头帧。带有 idmodel 和初始 usage记录消息 id 用于日志或重试关联。
content_block_start新内容块开始,块类型为 texttool_usethinkingindex 分配缓冲区。
content_block_delta当前内容块的增量,见下面的 4 种 delta 形式。追加到对应 index 的缓冲区。
content_block_stop当前内容块结束。固化缓冲区。tool_use 块需要把累积的 partial_json 解析成 input 对象。
message_delta最终 stop_reason 与累计 usage更新 token 计费,检查停止原因。
message_stop流结束,后面不会再有帧。关闭响应句柄。
ping心跳保活帧,任意时机可能下发。忽略(或重置空闲计时器)。
error200 响应之后的流中错误,见 流中错误停止读取,把错误抛给上层。

content_block_delta 内的 4 种 delta

delta.type取值字段用途
text_deltadelta.text可见文本生成
input_json_deltadelta.partial_jsonTool 参数 JSON 增量
thinking_deltadelta.thinkingExtended Thinking 推理痕迹(Opus 4.7)
signature_deltadelta.signature已签名 thinking 块的签名段

实测帧样本

以下是从 https://buzzai.cc/v1/messages 实测捕获的"从 1 数到 3"流帧:

event: message_start
data: {"type":"message_start","message":{"model":"claude-haiku-4-5-20251001","id":"msg_01YkyqfgStqigCHAgJ6uUDfd","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":7,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":1,"service_tier":"standard"}}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"1\n2\n3"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":7,"output_tokens":5}}

event: message_stop
data: {"type":"message_stop"}

Python SDK 解析

官方 Anthropic Python SDK 用上下文管理器封装了流式接口,指向 BUZZ 只需要改一处 base URL。

from anthropic import Anthropic

client = Anthropic(
    base_url="https://buzzai.cc",
    api_key="",
)

with client.messages.stream(
    model="claude-haiku-4-5-20251001",
    max_tokens=400,
    messages=[{"role": "user", "content": "写一首关于缓存的俳句"}],
) as stream:
    # 方式 A:只迭代文本
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # 方式 B:遍历完整事件并按类型分发
    # for event in stream:
    #     if event.type == "content_block_delta":
    #         if event.delta.type == "text_delta":
    #             print(event.delta.text, end="", flush=True)
    #         elif event.delta.type == "input_json_delta":
    #             handle_tool_input_chunk(event.index, event.delta.partial_json)

    final_message = stream.get_final_message()
    print()
    print("stop_reason:", final_message.stop_reason)
    print("usage:", final_message.usage)

上下文管理器保证即使消费者中途退出,底层 HTTP 响应也会被正确关闭。get_final_message() 在流结束后返回组装好的 Message 对象。

Node.js SDK 解析

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic({
  baseURL: "https://buzzai.cc",
  apiKey: process.env.BUZZ_API_KEY,
});

const stream = await client.messages.stream({
  model: "claude-haiku-4-5-20251001",
  max_tokens: 400,
  messages: [{ role: "user", content: "写一首关于缓存的俳句" }],
});

for await (const event of stream) {
  switch (event.type) {
    case "content_block_delta":
      if (event.delta.type === "text_delta") {
        process.stdout.write(event.delta.text);
      } else if (event.delta.type === "input_json_delta") {
        // 按 event.index 累积 event.delta.partial_json
      }
      break;
    case "message_delta":
      // 最终 stop_reason 和累计 usage
      break;
  }
}

const finalMessage = await stream.finalMessage();
console.log("\nstop_reason:", finalMessage.stop_reason);
console.log("usage:", finalMessage.usage);

SDK 已经处理了 SSE 分帧、JSON 解析、断帧重组缓冲。生产代码优先用 SDK,不要自己解析 wire 格式。

原生 fetch 解析 SSE

无法引入完整 SDK 时(比如边缘运行时),或需要完全控制分帧时使用。SSE wire 格式很朴素:event: 开头的行是事件名,data: 开头的行是 JSON 载荷,空行分隔每一帧。

async function streamMessages() {
  const resp = await fetch("https://buzzai.cc/v1/messages", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${process.env.BUZZ_API_KEY}`,
      "anthropic-version": "2023-06-01",
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      model: "claude-haiku-4-5-20251001",
      max_tokens: 400,
      stream: true,
      messages: [{ role: "user", content: "写一首关于缓存的俳句" }],
    }),
  });

  if (!resp.ok) {
    throw new Error(`HTTP ${resp.status} ${await resp.text()}`);
  }

  const reader = resp.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += decoder.decode(value, { stream: true });

    // SSE 帧之间用空行分隔
    const frames = buffer.split("\n\n");
    buffer = frames.pop() ?? "";

    for (const frame of frames) {
      const lines = frame.split("\n");
      let eventName = "message";
      let dataLines = [];
      for (const line of lines) {
        if (line.startsWith("event:")) eventName = line.slice(6).trim();
        else if (line.startsWith("data:")) dataLines.push(line.slice(5).trim());
      }
      if (dataLines.length === 0) continue;
      const payload = JSON.parse(dataLines.join("\n"));

      if (eventName === "content_block_delta" && payload.delta?.type === "text_delta") {
        process.stdout.write(payload.delta.text);
      } else if (eventName === "error") {
        throw new Error(`stream error: ${payload.error?.type} ${payload.error?.message}`);
      } else if (eventName === "message_stop") {
        return;
      }
    }
  }
}

第一次手写 SSE 解析常踩两个坑:

流中错误与断连

一旦网关已经发出 HTTP 200 OK 头,后面的所有失败都不会再以非 200 状态形式出现,而是以 SSE 事件的形式到达。三种情况都要显式处理:

1. event: error

上游可能在生成已经开始之后才触发 rate_limit_erroroverloaded_error。帧形如:

event: error
data: {"type":"error","error":{"type":"overloaded_error","message":"..."}}

按硬失败处理:停止读取、把 error.type 抛给上层、决定是否重试。具体退避策略见错误处理指南

2. 流中 TCP 断连

连接在没有 message_stop 的情况下断开。客户端表现为流提前结束。SDK 会抛异常(Python 是 APIConnectionError,Node 是 APIConnectionError)。

恢复需要重新发起整条请求。Claude 流没有原生 resume token。如果你需要幂等性,自己用 metadata.user_id 加业务侧的关联 id。

3. 客户端主动取消

关闭响应(Python 退出 with 块,Node 调 stream.controller.abort(),原生 fetch 调 reader.cancel())会通知上游停止生成。已经生成的 token 会照常计费,所以用户关掉聊天时尽快取消。

HTTP 状态在第一帧之前就定了。 4xx / 5xx 表示生成根本没开始,响应体是标准错误包络(见错误码表)。一旦你拿到 200,后续所有失败都在 SSE 帧里。

BUZZ 透明转发承诺

BUZZ 不缓冲、不重组、不改写 SSE 流。你看到的帧边界就是上游模型生成的帧边界,顺序也保持原样。具体表现:

生产清单

相关链接