服务器发送事件 (SSE): 流式传输使用 SSE 格式,内容类型为
text/event-stream。流式传输工作原理
1
启用流式传输
在您的请求中设置
stream: true 以接收增量响应。2
接收数据块
在生成响应令牌时获取它们,无需等待完成。
3
处理事件
处理包含 JSON 数据块的
data: 事件,直到收到 [DONE] 信号。实现示例
- Python
- JavaScript
- React
- cURL
Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
# 创建流式补全
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "Write a haiku about programming"}
],
stream=True
)
# 处理流
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
使用异步支持
Copy
Ask AI
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
async def stream_chat():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_chat())
流事件格式
Delta 事件
每个流式数据块遵循以下格式:Copy
Ask AI
data: {
"id": "chatcmpl-abc123",
"object": "chat.completion.chunk",
"created": 1677858242,
"model": "gpt-4",
"choices": [
{
"index": 0,
"delta": {
"content": "Hello"
},
"finish_reason": null
}
]
}
流生命周期
- 初始数据块 - 包含角色但没有内容:
Copy
Ask AI
data: {"choices": [{"delta": {"role": "assistant"}}]}
- 内容数据块 - 增量文本:
Copy
Ask AI
data: {"choices": [{"delta": {"content": "Hello, "}}]}
data: {"choices": [{"delta": {"content": "how "}}]}
data: {"choices": [{"delta": {"content": "are "}}]}
data: {"choices": [{"delta": {"content": "you?"}}]}
- 最终数据块 - 包含 finish_reason:
Copy
Ask AI
data: {"choices": [{"delta": {}, "finish_reason": "stop"}]}
- 流结束信号:
Copy
Ask AI
data: [DONE]
高级流式传输功能
流式函数调用
在生成时流式传输函数调用:Copy
Ask AI
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
stream=True
)
function_call = {"name": "", "arguments": ""}
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
tool_call = delta.tool_calls[0]
if tool_call.function.name:
function_call["name"] = tool_call.function.name
if tool_call.function.arguments:
function_call["arguments"] += tool_call.function.arguments
elif delta.content:
print(delta.content, end="")
进度跟踪
Copy
Ask AI
class StreamProgress:
def __init__(self):
self.tokens = 0
self.chunks = 0
self.start_time = time.time()
def update(self, chunk):
self.chunks += 1
if chunk.choices[0].delta.content:
# 近似令牌计数
self.tokens += len(chunk.choices[0].delta.content.split())
def get_stats(self):
elapsed = time.time() - self.start_time
return {
"chunks": self.chunks,
"tokens": self.tokens,
"time": elapsed,
"tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
}
# 使用方法
progress = StreamProgress()
for chunk in stream:
progress.update(chunk)
# 处理数据块...
print(progress.get_stats())
流中的错误处理
流连接可能在中途失败。始终实现适当的错误处理。
Copy
Ask AI
import time
def stream_with_retry(client, messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
return # 成功
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"流中断,将在 {wait_time}s 后重试...")
time.sleep(wait_time)
# 附加部分响应以继续
messages.append({"role": "assistant", "content": full_response})
messages.append({"role": "user", "content": "continue"})
else:
raise e
性能优化
缓冲策略
Copy
Ask AI
class StreamBuffer {
constructor(onFlush, bufferSize = 10, flushInterval = 100) {
this.buffer = [];
this.onFlush = onFlush;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.timer = null;
}
add(chunk) {
this.buffer.push(chunk);
if (this.buffer.length >= this.bufferSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.buffer.length > 0) {
this.onFlush(this.buffer.join(''));
this.buffer = [];
}
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}
// 使用方法
const buffer = new StreamBuffer((text) => {
document.getElementById('output').innerHTML += text;
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
buffer.add(content);
}
buffer.flush(); // 最终刷新
使用场景
实时聊天界面
Copy
Ask AI
def chat_interface():
print("聊天已开始。输入 'exit' 退出。")
while True:
user_input = input("\n您: ")
if user_input.lower() == 'exit':
break
print("助手: ", end="")
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_input}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # 响应后换行
实时翻译
Copy
Ask AI
def streaming_translator(text, target_language="Spanish"):
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Translate to {target_language}. Output only the translation."},
{"role": "user", "content": text}
],
stream=True,
temperature=0.3
)
translation = ""
for chunk in stream:
if chunk.choices[0].delta.content:
translation += chunk.choices[0].delta.content
yield chunk.choices[0].delta.content
return translation
最佳实践
- 处理连接中断 - 实现带有指数退避的重试逻辑
- 为 UI 更新设置缓冲 - 不要为每个数据块更新 DOM 以避免性能问题
- 显示加载指示器 - 显示输入指示器或进度条
- 实现超时 - 为流连接设置合理的超时
- 清理资源 - 始终正确关闭流以避免内存泄漏

