跳转到主要内容
服务器发送事件 (SSE): 流式传输使用 SSE 格式,内容类型为 text/event-stream

流式传输工作原理

1

启用流式传输

在您的请求中设置 stream: true 以接收增量响应。
2

接收数据块

在生成响应令牌时获取它们,无需等待完成。
3

处理事件

处理包含 JSON 数据块的 data: 事件,直到收到 [DONE] 信号。

实现示例

  • Python
  • JavaScript
  • React
  • cURL
from openai import OpenAI

client = OpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

# 创建流式补全
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "Write a haiku about programming"}
    ],
    stream=True
)

# 处理流
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

使用异步支持

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

async def stream_chat():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(stream_chat())

流事件格式

Delta 事件

每个流式数据块遵循以下格式:
data: {
  "id": "chatcmpl-abc123",
  "object": "chat.completion.chunk",
  "created": 1677858242,
  "model": "gpt-4",
  "choices": [
    {
      "index": 0,
      "delta": {
        "content": "Hello"
      },
      "finish_reason": null
    }
  ]
}

流生命周期

  1. 初始数据块 - 包含角色但没有内容:
data: {"choices": [{"delta": {"role": "assistant"}}]}
  1. 内容数据块 - 增量文本:
data: {"choices": [{"delta": {"content": "Hello, "}}]}
data: {"choices": [{"delta": {"content": "how "}}]}
data: {"choices": [{"delta": {"content": "are "}}]}
data: {"choices": [{"delta": {"content": "you?"}}]}
  1. 最终数据块 - 包含 finish_reason:
data: {"choices": [{"delta": {}, "finish_reason": "stop"}]}
  1. 流结束信号:
data: [DONE]

高级流式传输功能

流式函数调用

在生成时流式传输函数调用:
stream = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    tools=tools,
    stream=True
)

function_call = {"name": "", "arguments": ""}

for chunk in stream:
    delta = chunk.choices[0].delta

    if delta.tool_calls:
        tool_call = delta.tool_calls[0]
        if tool_call.function.name:
            function_call["name"] = tool_call.function.name
        if tool_call.function.arguments:
            function_call["arguments"] += tool_call.function.arguments

    elif delta.content:
        print(delta.content, end="")

进度跟踪

class StreamProgress:
    def __init__(self):
        self.tokens = 0
        self.chunks = 0
        self.start_time = time.time()

    def update(self, chunk):
        self.chunks += 1
        if chunk.choices[0].delta.content:
            # 近似令牌计数
            self.tokens += len(chunk.choices[0].delta.content.split())

    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            "chunks": self.chunks,
            "tokens": self.tokens,
            "time": elapsed,
            "tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
        }

# 使用方法
progress = StreamProgress()

for chunk in stream:
    progress.update(chunk)
    # 处理数据块...

print(progress.get_stats())

流中的错误处理

流连接可能在中途失败。始终实现适当的错误处理。
import time

def stream_with_retry(client, messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="gpt-4",
                messages=messages,
                stream=True
            )

            full_response = ""
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    full_response += content
                    yield content

            return  # 成功

        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数退避
                print(f"流中断,将在 {wait_time}s 后重试...")
                time.sleep(wait_time)
                # 附加部分响应以继续
                messages.append({"role": "assistant", "content": full_response})
                messages.append({"role": "user", "content": "continue"})
            else:
                raise e

性能优化

缓冲策略

class StreamBuffer {
  constructor(onFlush, bufferSize = 10, flushInterval = 100) {
    this.buffer = [];
    this.onFlush = onFlush;
    this.bufferSize = bufferSize;
    this.flushInterval = flushInterval;
    this.timer = null;
  }

  add(chunk) {
    this.buffer.push(chunk);

    if (this.buffer.length >= this.bufferSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  flush() {
    if (this.buffer.length > 0) {
      this.onFlush(this.buffer.join(''));
      this.buffer = [];
    }
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }
  }
}

// 使用方法
const buffer = new StreamBuffer((text) => {
  document.getElementById('output').innerHTML += text;
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  buffer.add(content);
}
buffer.flush(); // 最终刷新

使用场景

实时聊天界面

def chat_interface():
    print("聊天已开始。输入 'exit' 退出。")

    while True:
        user_input = input("\n您: ")
        if user_input.lower() == 'exit':
            break

        print("助手: ", end="")
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": user_input}],
            stream=True
        )

        for chunk in stream:
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
        print()  # 响应后换行

实时翻译

def streaming_translator(text, target_language="Spanish"):
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": f"Translate to {target_language}. Output only the translation."},
            {"role": "user", "content": text}
        ],
        stream=True,
        temperature=0.3
    )

    translation = ""
    for chunk in stream:
        if chunk.choices[0].delta.content:
            translation += chunk.choices[0].delta.content
            yield chunk.choices[0].delta.content

    return translation

最佳实践

  1. 处理连接中断 - 实现带有指数退避的重试逻辑
  2. 为 UI 更新设置缓冲 - 不要为每个数据块更新 DOM 以避免性能问题
  3. 显示加载指示器 - 显示输入指示器或进度条
  4. 实现超时 - 为流连接设置合理的超时
  5. 清理资源 - 始终正确关闭流以避免内存泄漏

下一步