Appearance
Kimi API 支持流式输出(stream=True),通过 SSE 逐步返回 token,显著提升用户感知响应速度。本文提供 Python、Node.js 和原生 HTTP 的流式处理示例,以及 thinking 模式下 reasoning_content 的分离读取方法。
使用 Kimi API 的流式输出
为什么要用流式输出
- 用户感知响应更快(边生成边展示)
- 长回答不会因超时失败
- 可以随时中断,不浪费已生成内容
Python 示例
python
from openai import OpenAI
client = OpenAI(
api_key="MOONSHOT_API_KEY",
base_url="https://api.moonshot.cn/v1",
)
stream = client.chat.completions.create(
model="kimi-k2.6",
messages=[
{"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的 AI 助手。"},
{"role": "user", "content": "写一首关于秋天的诗"},
],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print()Node.js / TypeScript 示例
typescript
import OpenAI from "openai";
const client = new OpenAI({
apiKey: process.env.MOONSHOT_API_KEY,
baseURL: "https://api.moonshot.cn/v1",
});
const stream = await client.chat.completions.create({
model: "kimi-k2.6",
messages: [
{ role: "system", content: "你是 Kimi,由 Moonshot AI 提供的 AI 助手。" },
{ role: "user", content: "写一首关于秋天的诗" },
],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) process.stdout.write(content);
}
console.log();Thinking 模式下的流式处理
kimi-k2.6 默认开启 thinking,流式时 reasoning_content 和 content 分离输出:
python
stream = client.chat.completions.create(
model="kimi-k2.6",
messages=[{"role": "user", "content": "解题:..."}],
stream=True,
max_tokens=16000, # thinking 模式建议 ≥ 16000
)
reasoning_buffer = []
content_buffer = []
for chunk in stream:
delta = chunk.choices[0].delta
# reasoning_content 先输出(思考过程)
if hasattr(delta, "reasoning_content") and delta.reasoning_content:
reasoning_buffer.append(delta.reasoning_content)
# content 后输出(最终答案)
elif delta.content:
content_buffer.append(delta.content)
print("思考过程:", "".join(reasoning_buffer))
print("最终答案:", "".join(content_buffer))不使用 SDK 直接解析 SSE
python
import httpx
with httpx.Client() as http_client:
with http_client.stream(
"POST",
"https://api.moonshot.cn/v1/chat/completions",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
json={
"model": "kimi-k2.6",
"messages": [{"role": "user", "content": "你好"}],
"stream": True,
},
) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
import json
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)流式输出中的 Token 统计
流式输出默认不包含 usage,需要开启:
python
stream = client.chat.completions.create(
model="kimi-k2.6",
messages=messages,
stream=True,
stream_options={"include_usage": True}, # 开启 usage 统计
)
for chunk in stream:
if chunk.usage: # 最后一个 chunk 包含完整 usage
print(f"总 token 消耗: {chunk.usage.total_tokens}")常见问题
Q: 流式输出中途断开怎么处理?
A: 记录已接收的内容,使用 Partial Mode 续写。见 Partial Mode 指南。
Q: 流式输出比非流式更贵吗?
A: 费用相同,都按实际消耗的 token 计费。
Q: thinking 模式下 reasoning_content 也算入费用吗?
A: 是的,reasoning_content 的 token 也会计入总费用。
Kimi API 支持流式输出,通过 stream=True 开启 SSE 传输,模型每生成一个 token 立即推送给客户端,大幅降低用户感知延迟。本文详解流式输出原理、token 统计方法和不用 SDK 时的 HTTP 直接解析方式。
使用 Kimi API 的流式输出功能
为什么要用流式输出
非流式模式下,需要等待模型生成完所有内容才能看到结果,复杂问题可能要等 10~20 秒。流式输出(Streaming)让第一个 token 几乎立即到达,用户体验大幅提升。
与 OpenAI 相同:OpenAI、DeepSeek、Kimi 的流式接口格式完全一致,都基于 SSE(Server-Sent Events)协议。
基本用法
typescript
import OpenAI from "openai";
const client = new OpenAI({
apiKey: process.env.MOONSHOT_API_KEY,
baseURL: "https://api.moonshot.cn/v1",
});
const stream = await client.chat.completions.create({
model: "kimi-k2.6",
messages: [
{ role: "system", content: "你是 Kimi,由 Moonshot AI 提供的人工智能助手。" },
{ role: "user", content: "你好,1+1 等于多少?" },
],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}python
from openai import OpenAI
client = OpenAI(
api_key="MOONSHOT_API_KEY",
base_url="https://api.moonshot.cn/v1",
)
stream = client.chat.completions.create(
model="kimi-k2.6",
messages=[
{"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手。"},
{"role": "user", "content": "你好,1+1 等于多少?"},
],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)SSE 数据格式
流式响应的 Content-Type 为 text/event-stream,每个数据块格式如下:
data: {"id":"...","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"...","choices":[{"index":0,"delta":{"content":"你好"},"finish_reason":null}]}
data: {"id":"...","choices":[{"index":0,"delta":{},"finish_reason":"stop","usage":{"prompt_tokens":19,"completion_tokens":13,"total_tokens":32}}]}
data: [DONE]关键规则:
- 第一个数据块包含
role字段 - 后续数据块只有
content(逐 token 追加) - 最后一个数据块包含
finish_reason和usage(token 统计) data: [DONE]标志传输结束
Token 统计
流式输出时,token 用量在最后一个数据块的 usage 字段中:
typescript
let totalTokens = 0;
for await (const chunk of stream) {
const choice = chunk.choices[0];
if (choice.finish_reason === "stop") {
// 从最后一个 chunk 读取 usage
// 注意:需要在创建时传入 stream_options
}
}
// 更推荐的方式:最后读 stream.finalMessage()
const finalMessage = await stream.finalMessage();
console.log("总 token 数:", finalMessage.usage?.total_tokens);或者使用 Estimate API 预估 token 数:
typescript
const estimateResponse = await fetch(
"https://api.moonshot.cn/v1/tokenizers/estimate-token-count",
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.MOONSHOT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "kimi-k2.6",
messages: [{ role: "user", content: "你好" }],
}),
}
);
const data = await estimateResponse.json();
console.log("预估 token 数:", data.data.total_tokens);不使用 SDK 时直接解析 SSE
typescript
const response = await fetch("https://api.moonshot.cn/v1/chat/completions", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.MOONSHOT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "kimi-k2.6",
messages: [{ role: "user", content: "你好" }],
stream: true,
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") break;
const chunk = JSON.parse(data);
const content = chunk.choices[0]?.delta?.content;
if (content) process.stdout.write(content);
}
}
}多回复(n>1)处理
当 n > 1 时,用 chunk.choices[0].index 区分属于哪个回复:
typescript
const stream = await client.chat.completions.create({
model: "kimi-k2.6",
messages: [...],
stream: true,
n: 2,
});
const messages: Record<number, string> = {};
for await (const chunk of stream) {
for (const choice of chunk.choices) {
const idx = choice.index;
if (choice.delta.content) {
messages[idx] = (messages[idx] ?? "") + choice.delta.content;
}
}
}提前终止
直接 break 跳出循环即可,不需要额外操作:
typescript
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
if (content.includes("结束")) break; // 条件终止
}
}常见问题
Q: 流式输出时 token 统计不准确怎么办?
A: 流式输出的 token 统计在最后一个数据块的 usage 字段中。如果中途断开,已接收到的内容可以用 Estimate API 重新计算。注意:Kimi API 在每个 choice 的结束数据块中也会放置 usage 信息,可以按 choice 独立统计。
Q: 流式输出和非流式输出结果一样吗?
A: 最终内容相同,但延迟体验不同。流式模式下第一个 token 更快到达,适合用户交互场景;非流式适合批量处理场景。
Q: 怎么判断流式输出是否正常结束?
A: 等待 data: [DONE] 标志。在收到 [DONE] 之前,消息应视为不完整。