Skip to content

OpenRouter callModel 的流式输出基于可复用流架构,支持多个并发消费者。提供 getTextStream()(文本 delta)、getReasoningStream()(推理过程)、getItemsStream()(推荐,完整 items)、getFullResponsesStream()(所有事件)、getToolCallsStream()(tool 调用)等多种消费方式,并支持 React、SSE 等前端集成,以及流取消操作。

文本流式

getTextStream()

实时接收文本内容:

typescript
import { OpenRouter } from '@openrouter/agent';

const openrouter = new OpenRouter({
  apiKey: process.env.OPENROUTER_API_KEY,
});

const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'Write a short poem about the ocean.',
});

for await (const delta of result.getTextStream()) {
  process.stdout.write(delta);
}

每次迭代返回一小段文本(通常是几个字符或一个词)。

推理流式

getReasoningStream()

适用于支持推理的模型(如 o1、Claude 思考模式):

typescript
const result = openrouter.callModel({
  model: 'openai/o1-preview',
  input: 'Solve this step by step: If x + 5 = 12, what is x?',
});

console.log('Reasoning:');
for await (const delta of result.getReasoningStream()) {
  process.stdout.write(delta);
}

console.log('\n\nFinal answer:');
const text = await result.getText();
console.log(text);

Items 流式(推荐)

getItemsStream()

流式接收完整 items,是处理多种输出类型(消息、tool 调用、推理)的推荐方式

typescript
import type { StreamableOutputItem } from '@openrouter/agent';

const result = openrouter.callModel({
  model: 'anthropic/claude-sonnet-4',
  input: 'Hello!',
  tools: [myTool],
});

for await (const item of result.getItemsStream()) {
  switch (item.type) {
    case 'message':
      console.log('Message:', item.content);
      break;
    case 'function_call':
      console.log('Tool call:', item.name, item.arguments);
      break;
    case 'reasoning':
      console.log('Thinking:', item.summary);
      break;
    case 'function_call_output':
      console.log('Tool result:', item.output);
      break;
  }
}

关键说明:每次迭代返回同一 ID 的 item 的最新快照(内容已更新)。应按 ID 替换,而不是累积 delta。

所有 item 类型:

类型说明
message助手文本响应
function_calltool 调用及参数
reasoning模型思考过程(扩展思考)
web_search_call网络搜索操作
file_search_call文件搜索操作
image_generation_call图片生成操作
function_call_outputtool 执行结果

完整事件流式

getFullResponsesStream()

接收所有响应事件,包括 tool 中间结果:

typescript
const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'Search for documents',
  tools: [searchTool], // 带 eventSchema 的 generator tool
});

for await (const event of result.getFullResponsesStream()) {
  switch (event.type) {
    case 'response.output_text.delta':
      process.stdout.write(event.delta);
      break;
    case 'response.completed':
      console.log('Response complete');
      break;
    case 'tool.preliminary_result':
      // generator tool 的中间进度
      console.log('Progress:', event.result);
      break;
    case 'tool.result':
      // tool 执行完成
      console.log('Tool completed:', event.toolCallId);
      console.log('Result:', event.result);
      if (event.preliminaryResults) {
        console.log('All progress events:', event.preliminaryResults);
      }
      break;
  }
}

事件类型一览

事件类型说明
response.created响应对象已创建
response.in_progress开始生成
response.output_text.delta文本 chunk
response.output_text.done文本完成
response.reasoning.delta推理内容 chunk
response.reasoning.done推理完成
response.function_call_arguments.deltatool 参数 chunk
response.function_call_arguments.donetool 参数完成
response.completed完整响应完成
tool.preliminary_resultgenerator tool 中间进度
tool.resulttool 执行最终结果

Tool 调用流式

getToolCallsStream()

流式接收结构化 tool 调用:

typescript
const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'What is the weather in Paris and Tokyo?',
  tools: [weatherTool],
  maxToolRounds: 0, // 不自动执行,只获取 tool 调用
});

for await (const toolCall of result.getToolCallsStream()) {
  console.log(`Tool: ${toolCall.name}`);
  console.log(`Arguments:`, toolCall.arguments);
  console.log(`ID: ${toolCall.id}`);
}

getToolStream()

同时流式接收参数 delta 和中间结果:

typescript
const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'Search for TypeScript tutorials',
  tools: [searchTool], // generator tool
});

for await (const event of result.getToolStream()) {
  if (event.type === 'delta') {
    process.stdout.write(event.content);
  } else if (event.type === 'preliminary_result') {
    console.log(`\nProgress (${event.toolCallId}):`, event.result);
  }
}

并发消费

多个消费者可以同时读取同一个 result:

typescript
const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'Write a story.',
});

const [text, response] = await Promise.all([
  // 消费者 1:收集文本
  (async () => {
    let text = '';
    for await (const delta of result.getTextStream()) {
      text += delta;
    }
    return text;
  })(),

  // 消费者 2:获取完整响应
  result.getResponse(),
]);

console.log('Text length:', text.length);
console.log('Token usage:', response.usage);

底层 ReusableReadableStream 确保每个消费者都能接收到完整事件。

取消流式

typescript
const result = openrouter.callModel({
  model: 'openai/gpt-5-nano',
  input: 'Write a very long essay...',
});

let charCount = 0;
for await (const delta of result.getTextStream()) {
  process.stdout.write(delta);
  charCount += delta.length;

  if (charCount > 500) {
    await result.cancel();
    break;
  }
}

与 UI 框架集成

React 示例

typescript
import { useState, useEffect } from 'react';

function ChatResponse({ prompt }: { prompt: string }) {
  const [text, setText] = useState('');
  const [isStreaming, setIsStreaming] = useState(true);

  useEffect(() => {
    const openrouter = new OpenRouter({ apiKey: API_KEY });
    const result = openrouter.callModel({
      model: 'openai/gpt-5-nano',
      input: prompt,
    });

    (async () => {
      for await (const delta of result.getTextStream()) {
        setText(prev => prev + delta);
      }
      setIsStreaming(false);
    })();

    return () => { result.cancel(); };
  }, [prompt]);

  return (
    <div>
      <p>{text}</p>
      {isStreaming && <span className="cursor">|</span>}
    </div>
  );
}

Server-Sent Events(SSE)

typescript
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';

const app = new Hono();

app.get('/stream', (c) => {
  return streamSSE(c, async (stream) => {
    const result = openrouter.callModel({
      model: 'openai/gpt-5-nano',
      input: c.req.query('prompt') || 'Hello!',
    });

    for await (const delta of result.getTextStream()) {
      await stream.writeSSE({
        data: JSON.stringify({ delta }),
        event: 'delta',
      });
    }

    await stream.writeSSE({
      data: JSON.stringify({ done: true }),
      event: 'done',
    });
  });
});

常见问题

Q: getItemsStream() 和 getNewMessagesStream() 有什么区别?

A: getNewMessagesStream() 已废弃。推荐用 getItemsStream(),它覆盖所有 item 类型,并遵循 items 模型(每次迭代是同一 item 的最新完整快照,而不是增量消息)。

Q: 并发消费者会互相影响吗?

A: 不会。每个消费者都从同一个 ReusableReadableStream 独立读取,互不干扰,都能接收到完整的事件序列。

Q: 流式和非流式(getText)在费用上有差异吗?

A: 没有差异。两者都调用同一个 API endpoint,消耗相同的 token。流式只是返回方式不同,不影响计费。