BUZZ AI Gateway
文档 · 实战示例 · Agent 循环

Agent 循环

一个会调用工具直到完成任务的多轮 agent。循环主体很短,真正难的是四道护栏:停止条件、最大迭代、token 预算、工具失败恢复。

POST https://buzzai.cc/v1/messages

循环骨架

剥到最简,六行就够:

while True:
    response = call_claude(messages)
    if response.stop_reason != "tool_use":
        return response
    tool_results = run_all_tool_uses(response)
    messages.append(assistant_turn(response))
    messages.append(user_turn(tool_results))

这套用来跑 demo 没问题。生产要再加四道护栏。

停止条件

Claude 通过 stop_reason 告诉你为什么停下来。每一种都要显式处理:

stop_reason含义循环动作
tool_use想调用一个或多个工具执行工具、追加结果、继续
end_turn完成,返回最终答案退出循环,返回 text 内容
max_tokens响应中途撞到 max_tokens把已有 assistant 消息留在历史里,追加 user nudge 让它续写,或者把状态抛给用户
stop_sequence命中自定义停止串退出,把输出当作截断处理
pause_turn服务端暂停(罕见,长 thinking、server tool)把同一段对话再发一次让它续上
refusal模型拒绝抛给用户,不要无脑重试

最大迭代安全阀

有些任务确实需要 20 多轮。有些会陷入死循环,反复换略微不同的参数调同一个工具。设硬上限:

到上限不要直接 abort。注入一条 user 消息,让 Claude 总结并停下:

"已达迭代上限。停止调用工具。
最终消息里总结你完成了什么、还剩什么、有什么阻塞。"

Token 预算

每轮读 response.usage 累加。超阈值就强制收尾:

budget_input  = 1_000_000   # tokens
budget_output =   200_000

total_in = total_out = 0
for turn in range(MAX_TURNS):
    resp = client.messages.create(...)
    total_in  += resp.usage.input_tokens + resp.usage.cache_read_input_tokens
    total_out += resp.usage.output_tokens
    if total_in > budget_input or total_out > budget_output:
        # 注入"收尾"指令,再让它最后说一次
        ...

缓存读按 10% 输入价计费,但仍然占 token。如果你按 token 向终端用户计费,单独追踪;如果按金额,只追踪美元等价。

工具失败恢复

工具会抛异常,网络会超时,模型会传错参。不要让循环崩溃,把失败结构化包成 tool_result 让 Claude 自己反应。

{
  "type": "tool_result",
  "tool_use_id": "toolu_...",
  "content": "ERROR: file not found at path 'src/foo.py'. Try listing the directory first.",
  "is_error": true
}

三档失败,处理方式不同:

例子恢复
可恢复,模型的锅路径错、参数类型错返回 is_error: true + 提示。Claude 通常会修一下重来。
可恢复,瞬时下游 API 503、网络抖动在工具内部带退避重试,2-3 次封顶,再返回。
不可恢复权限拒绝、依赖缺失、凭据错返回错误同时中止循环。不要烧 token 看 Claude 反复失败。

完整可运行样例

"""
Agent 循环:迭代上限、token 预算、工具失败恢复。
依赖:pip install anthropic
"""
import os, time, random
from anthropic import Anthropic
from anthropic import APIStatusError

client = Anthropic(
    base_url="https://buzzai.cc",
    api_key=os.environ["BUZZ_API_KEY"],
)

# === 可调护栏 ===
MAX_ITERATIONS = 30
BUDGET_INPUT_TOKENS  = 1_000_000
BUDGET_OUTPUT_TOKENS =   200_000

# === 工具 ===
TOOLS = [
    {
        "name": "search",
        "description": "搜索知识库,返回 top 匹配。",
        "input_schema": {
            "type": "object",
            "properties": {"query": {"type": "string"}},
            "required": ["query"],
        },
    },
    {
        "name": "fetch_url",
        "description": "拉取 URL,返回正文文本。",
        "input_schema": {
            "type": "object",
            "properties": {"url": {"type": "string", "format": "uri"}},
            "required": ["url"],
        },
    },
]


class UnrecoverableToolError(Exception):
    pass


def execute_tool(name, args):
    if name == "search":
        # ... 真实搜索 ...
        return f"Top 3 results for {args['query']!r}: ..."
    if name == "fetch_url":
        import urllib.request
        for attempt in range(3):
            try:
                with urllib.request.urlopen(args["url"], timeout=10) as r:
                    return r.read().decode("utf-8", errors="replace")[:8000]
            except urllib.error.HTTPError as e:
                if e.code in (403, 404):
                    raise UnrecoverableToolError(f"HTTP {e.code} for {args['url']}")
                time.sleep(2 ** attempt + random.random())
        raise RuntimeError(f"fetch_url failed after retries: {args['url']}")
    raise UnrecoverableToolError(f"unknown tool: {name}")


def call_with_retry(**kwargs):
    """API 瞬时错误外层重试(429/500/529)。"""
    for attempt in range(5):
        try:
            return client.messages.create(**kwargs)
        except APIStatusError as e:
            if e.status_code in (429, 500, 503, 529):
                wait = (2 ** attempt) + random.random()
                time.sleep(min(wait, 60))
                continue
            raise
    raise RuntimeError("API failed after retries")


def run_agent(user_request: str):
    messages = [{"role": "user", "content": user_request}]
    total_in = total_out = 0
    iteration = 0
    abort = False

    while iteration < MAX_ITERATIONS:
        iteration += 1

        # 超预算就注入收尾指令
        if total_in > BUDGET_INPUT_TOKENS or total_out > BUDGET_OUTPUT_TOKENS:
            messages.append({
                "role": "user",
                "content": "Token 预算已耗尽。停止调用工具。"
                           "立即给出进展摘要和剩余事项。",
            })

        resp = call_with_retry(
            model="claude-sonnet-4-6",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )
        total_in  += resp.usage.input_tokens + (resp.usage.cache_read_input_tokens or 0)
        total_out += resp.usage.output_tokens

        messages.append({"role": "assistant", "content": resp.content})

        if resp.stop_reason == "end_turn":
            return _final_text(resp), {"iters": iteration, "in": total_in, "out": total_out}
        if resp.stop_reason == "refusal":
            return "[refused]", {"iters": iteration, "in": total_in, "out": total_out}
        if resp.stop_reason == "max_tokens":
            messages.append({"role": "user", "content": "从刚才停下的地方继续。"})
            continue
        if resp.stop_reason != "tool_use":
            # pause_turn / stop_sequence / 未知 — 直接再循环一次
            continue

        tool_results = []
        for block in resp.content:
            if block.type != "tool_use":
                continue
            try:
                output = execute_tool(block.name, block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
            except UnrecoverableToolError as e:
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": f"FATAL: {e}",
                    "is_error": True,
                })
                abort = True
            except Exception as e:
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": f"ERROR (recoverable): {e}",
                    "is_error": True,
                })

        messages.append({"role": "user", "content": tool_results})
        if abort:
            return "[aborted: unrecoverable tool error]", {
                "iters": iteration, "in": total_in, "out": total_out,
            }

    # 命中 MAX_ITERATIONS
    messages.append({
        "role": "user",
        "content": "已达迭代上限。停止调用工具,"
                   "总结完成了什么、还剩什么、有什么阻塞。",
    })
    resp = call_with_retry(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=messages,  # 故意去掉 tools,强制文本回复
    )
    return _final_text(resp), {"iters": iteration + 1, "in": total_in, "out": total_out}


def _final_text(resp):
    return "\n".join(b.text for b in resp.content if b.type == "text")


if __name__ == "__main__":
    text, stats = run_agent("找出 2025 年关于 prompt caching 引用最多的三篇论文,总结它们的发现。")
    print(text)
    print(f"\n[stats] iterations={stats['iters']} input_tokens={stats['in']} output_tokens={stats['out']}")
// Agent 循环:迭代上限、token 预算、工具失败恢复。
// 依赖:npm i @anthropic-ai/sdk
import Anthropic, { APIError } from "@anthropic-ai/sdk";

const client = new Anthropic({
  baseURL: "https://buzzai.cc",
  apiKey: process.env.BUZZ_API_KEY,
});

const MAX_ITERATIONS = 30;
const BUDGET_INPUT_TOKENS = 1_000_000;
const BUDGET_OUTPUT_TOKENS = 200_000;

const TOOLS = [
  {
    name: "search",
    description: "搜索知识库,返回 top 匹配。",
    input_schema: {
      type: "object",
      properties: { query: { type: "string" } },
      required: ["query"],
    },
  },
  {
    name: "fetch_url",
    description: "拉取 URL,返回正文文本。",
    input_schema: {
      type: "object",
      properties: { url: { type: "string", format: "uri" } },
      required: ["url"],
    },
  },
];

class UnrecoverableToolError extends Error {}

function sleep(ms) { return new Promise((r) => setTimeout(r, ms)); }

async function executeTool(name, args) {
  if (name === "search") {
    return `Top 3 results for "${args.query}": ...`;
  }
  if (name === "fetch_url") {
    for (let attempt = 0; attempt < 3; attempt++) {
      try {
        const r = await fetch(args.url, { signal: AbortSignal.timeout(10000) });
        if (r.status === 403 || r.status === 404) {
          throw new UnrecoverableToolError(`HTTP ${r.status} for ${args.url}`);
        }
        if (!r.ok) throw new Error(`HTTP ${r.status}`);
        const text = await r.text();
        return text.slice(0, 8000);
      } catch (e) {
        if (e instanceof UnrecoverableToolError) throw e;
        await sleep(1000 * 2 ** attempt + Math.random() * 1000);
      }
    }
    throw new Error(`fetch_url failed after retries: ${args.url}`);
  }
  throw new UnrecoverableToolError(`unknown tool: ${name}`);
}

async function callWithRetry(params) {
  for (let attempt = 0; attempt < 5; attempt++) {
    try {
      return await client.messages.create(params);
    } catch (e) {
      if (e instanceof APIError && [429, 500, 503, 529].includes(e.status)) {
        await sleep(Math.min(60000, 1000 * 2 ** attempt + Math.random() * 1000));
        continue;
      }
      throw e;
    }
  }
  throw new Error("API failed after retries");
}

function finalText(resp) {
  return resp.content.filter((b) => b.type === "text").map((b) => b.text).join("\n");
}

export async function runAgent(userRequest) {
  const messages = [{ role: "user", content: userRequest }];
  let totalIn = 0, totalOut = 0, iteration = 0, abort = false;

  while (iteration < MAX_ITERATIONS) {
    iteration++;

    if (totalIn > BUDGET_INPUT_TOKENS || totalOut > BUDGET_OUTPUT_TOKENS) {
      messages.push({
        role: "user",
        content:
          "Token 预算已耗尽。停止调用工具。" +
          "立即给出进展摘要和剩余事项。",
      });
    }

    const resp = await callWithRetry({
      model: "claude-sonnet-4-6",
      max_tokens: 4096,
      tools: TOOLS,
      messages,
    });
    totalIn += (resp.usage.input_tokens || 0) + (resp.usage.cache_read_input_tokens || 0);
    totalOut += resp.usage.output_tokens || 0;

    messages.push({ role: "assistant", content: resp.content });

    if (resp.stop_reason === "end_turn") {
      return { text: finalText(resp), stats: { iters: iteration, in: totalIn, out: totalOut } };
    }
    if (resp.stop_reason === "refusal") {
      return { text: "[refused]", stats: { iters: iteration, in: totalIn, out: totalOut } };
    }
    if (resp.stop_reason === "max_tokens") {
      messages.push({ role: "user", content: "从刚才停下的地方继续。" });
      continue;
    }
    if (resp.stop_reason !== "tool_use") continue;

    const toolResults = [];
    for (const block of resp.content) {
      if (block.type !== "tool_use") continue;
      try {
        const output = await executeTool(block.name, block.input);
        toolResults.push({ type: "tool_result", tool_use_id: block.id, content: output });
      } catch (e) {
        const isFatal = e instanceof UnrecoverableToolError;
        toolResults.push({
          type: "tool_result",
          tool_use_id: block.id,
          content: `${isFatal ? "FATAL" : "ERROR (recoverable)"}: ${e.message}`,
          is_error: true,
        });
        if (isFatal) abort = true;
      }
    }
    messages.push({ role: "user", content: toolResults });
    if (abort) {
      return {
        text: "[aborted: unrecoverable tool error]",
        stats: { iters: iteration, in: totalIn, out: totalOut },
      };
    }
  }

  messages.push({
    role: "user",
    content:
      "已达迭代上限。停止调用工具," +
      "总结完成了什么、还剩什么、有什么阻塞。",
  });
  const resp = await callWithRetry({
    model: "claude-sonnet-4-6",
    max_tokens: 2048,
    messages,
  });
  return {
    text: finalText(resp),
    stats: { iters: iteration + 1, in: totalIn, out: totalOut },
  };
}

const { text, stats } = await runAgent(
  "找出 2025 年关于 prompt caching 引用最多的三篇论文,总结它们的发现。"
);
console.log(text);
console.log(`\n[stats] iterations=${stats.iters} input=${stats.in} output=${stats.out}`);

管理消息历史

长跑 agent 会堆积几百条 tool_result,每轮输入成本爆炸。三招:

1. 缓存前缀

稳定的 system prompt + tool 定义放进 system 缓存块。对增长的 message 尾部没用,但能消除固定成本。

2. 裁剪老的 tool_result

N 轮之后,把老 tool_result 的 content 替换成短摘要,保留结构:

def trim_old_results(messages, keep_last=8):
    # 除最后 keep_last 条,前面的 user 消息扫一遍
    for msg in messages[:-keep_last]:
        if msg["role"] != "user" or not isinstance(msg["content"], list):
            continue
        for block in msg["content"]:
            if isinstance(block, dict) and block.get("type") == "tool_result":
                if len(block.get("content", "")) > 200:
                    block["content"] = block["content"][:180] + "... [trimmed]"

3. 摘要后重启

定期让 Claude 总结对话,然后用摘要起一个新对话。最重的招,适合 50+ 轮的超长会话。

在循环里用 Opus thinking

Opus 4.7 可以开 extended thinking。两件事注意:

resp = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=8192,
    thinking={"type": "enabled", "budget_tokens": 4096},
    tools=TOOLS,
    messages=messages,
)
# 原样追加 resp.content,thinking 块跟着走。
messages.append({"role": "assistant", "content": resp.content})

相关链接