概述
流式传输允许您在生成响应 token 时接收它们,而不是等待完整响应。这非常适合:- 聊天机器人 - 在键入时显示响应
- 实时助手 - 实时显示进度
- 长响应 - 立即开始显示内容
- 更好的用户体验 - 减少感知延迟
工作原理
1
启用流式传输
在请求中设置
stream: true2
接收数据块
通过 SSE 逐步获取响应 token
3
处理事件
解析包含 JSON 数据块的
data: 事件4
处理完成
监听
[DONE] 信号以了解何时完成端点
流式传输适用于两种 API 格式:Copy
Ask AI
POST https://ai.megallm.io/v1/chat/completions
POST https://ai.megallm.io/v1/messages
stream: true 参数。
请求格式
OpenAI 格式
Copy
Ask AI
{
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Tell me a story"}
],
"stream": true
}
Anthropic 格式
Copy
Ask AI
{
"model": "claude-3.5-sonnet",
"max_tokens": 500,
"messages": [
{"role": "user", "content": "Tell me a story"}
],
"stream": true
}
响应格式
事件流结构
响应以服务器发送事件的形式发送:Copy
Ask AI
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Once"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" upon"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
流生命周期
- 初始数据块 - 包含角色:
Copy
Ask AI
{"choices": [{"delta": {"role": "assistant"}}]}
- 内容数据块 - 增量文本:
Copy
Ask AI
{"choices": [{"delta": {"content": "Hello"}}]}
{"choices": [{"delta": {"content": " world"}}]}
- 最终数据块 - 包含 finish_reason:
Copy
Ask AI
{"choices": [{"delta": {}, "finish_reason": "stop"}]}
- 流结束:
Copy
Ask AI
data: [DONE]
实现示例
- Python
- JavaScript
- Browser
- React
- cURL
Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
# Create streaming completion
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "Tell me a story"}
],
stream=True
)
# Process the stream
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Async Python
Copy
Ask AI
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
async def stream_chat():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_chat())
高级功能
流式传输中的函数调用
Copy
Ask AI
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
stream=True
)
function_call = {"name": "", "arguments": ""}
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
tool_call = delta.tool_calls[0]
if tool_call.function.name:
function_call["name"] = tool_call.function.name
if tool_call.function.arguments:
function_call["arguments"] += tool_call.function.arguments
elif delta.content:
print(delta.content, end="", flush=True)
# Execute function when complete
if function_call["name"]:
result = execute_function(function_call)
进度跟踪
Copy
Ask AI
import time
class StreamProgress:
def __init__(self):
self.tokens = 0
self.chunks = 0
self.start_time = time.time()
def update(self, chunk):
self.chunks += 1
if chunk.choices[0].delta.content:
# Approximate token count
self.tokens += len(chunk.choices[0].delta.content.split())
def get_stats(self):
elapsed = time.time() - self.start_time
return {
"chunks": self.chunks,
"tokens": self.tokens,
"time": elapsed,
"tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
}
progress = StreamProgress()
for chunk in stream:
progress.update(chunk)
# Process chunk...
print(f"\nStats: {progress.get_stats()}")
性能缓冲
Copy
Ask AI
class StreamBuffer {
constructor(onFlush, bufferSize = 10, flushInterval = 100) {
this.buffer = [];
this.onFlush = onFlush;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.timer = null;
}
add(chunk) {
this.buffer.push(chunk);
if (this.buffer.length >= this.bufferSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.buffer.length > 0) {
this.onFlush(this.buffer.join(''));
this.buffer = [];
}
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}
// Usage
const buffer = new StreamBuffer((text) => {
document.getElementById('output').innerHTML += text;
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
buffer.add(content);
}
buffer.flush(); // Final flush
错误处理
流式连接可能在中途失败。始终实现重试逻辑。
Copy
Ask AI
import time
def stream_with_retry(client, messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
return # Success
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Stream interrupted, retrying in {wait_time}s...")
time.sleep(wait_time)
# Continue from partial response
messages.append({"role": "assistant", "content": full_response})
messages.append({"role": "user", "content": "continue"})
else:
raise e
最佳实践
- 缓冲 UI 更新 - 不要为每个数据块更新 DOM(批处理可提高性能)
- 显示加载指示器 - 在流式传输期间显示输入指示器
- 实现超时 - 为连接设置合理的超时
- 处理中断 - 使用指数退避的重试逻辑
- 清理资源 - 始终正确关闭流
- 测试错误场景 - 确保您的应用程序优雅地处理网络故障
性能提示
在更新 UI 之前将小数据块缓冲在一起,以避免过多的 DOM 更新。
- 在 Python 的 print 中使用
flush=True以立即输出 - 为频繁的 UI 更新实现防抖
- 考虑对长响应进行虚拟化
- 在浏览器中使用 Web Workers 进行解析
- 监控长流的内存使用情况
相关内容
- Chat Completions API - 兼容 OpenAI 的流式 API
- Messages API - 兼容 Anthropic 的流式 API
- 函数调用 - 在流式传输中使用函数
- 身份验证 - API 身份验证方法

