Перейти к основному содержанию

Обзор

Потоковая передача позволяет получать токены ответа по мере их генерации, а не ждать полного ответа. Это идеально подходит для:
  • Чат-ботов - Отображение ответов по мере их ввода
  • Живых ассистентов - Показ прогресса в реальном времени
  • Длинных ответов - Немедленное начало отображения контента
  • Лучшего UX - Снижение воспринимаемой задержки

Как это работает

1

Включение потоковой передачи

Установите stream: true в вашем запросе
2

Получение фрагментов

Получайте токены ответа постепенно через SSE
3

Обработка событий

Парсите события data: содержащие фрагменты JSON
4

Обработка завершения

Следите за сигналом [DONE], чтобы узнать о завершении

Конечные точки

Потоковая передача работает с обоими форматами API:
POST https://ai.megallm.io/v1/chat/completions
POST https://ai.megallm.io/v1/messages
Обе конечные точки поддерживают параметр stream: true.

Формат запроса

Формат OpenAI

{
  "model": "gpt-4",
  "messages": [
    {"role": "user", "content": "Расскажи мне историю"}
  ],
  "stream": true
}

Формат Anthropic

{
  "model": "claude-3.5-sonnet",
  "max_tokens": 500,
  "messages": [
    {"role": "user", "content": "Расскажи мне историю"}
  ],
  "stream": true
}

Формат ответа

Структура потока событий

Ответы отправляются как Server-Sent Events:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Однажды"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" давным"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Жизненный цикл потока

  1. Начальный фрагмент - Содержит роль:
{"choices": [{"delta": {"role": "assistant"}}]}
  1. Фрагменты контента - Инкрементальный текст:
{"choices": [{"delta": {"content": "Привет"}}]}
{"choices": [{"delta": {"content": " мир"}}]}
  1. Финальный фрагмент - Включает finish_reason:
{"choices": [{"delta": {}, "finish_reason": "stop"}]}
  1. Завершение потока:
data: [DONE]

Примеры реализации

  • Python
  • JavaScript
  • Browser
  • React
  • cURL
from openai import OpenAI

client = OpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

# Создание потокового завершения
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "Расскажи мне историю"}
    ],
    stream=True
)

# Обработка потока
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Async Python

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

async def stream_chat():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Расскажи мне историю"}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(stream_chat())

Расширенные функции

Вызов функций с потоковой передачей

stream = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    tools=tools,
    stream=True
)

function_call = {"name": "", "arguments": ""}

for chunk in stream:
    delta = chunk.choices[0].delta

    if delta.tool_calls:
        tool_call = delta.tool_calls[0]
        if tool_call.function.name:
            function_call["name"] = tool_call.function.name
        if tool_call.function.arguments:
            function_call["arguments"] += tool_call.function.arguments

    elif delta.content:
        print(delta.content, end="", flush=True)

# Выполнение функции при завершении
if function_call["name"]:
    result = execute_function(function_call)

Отслеживание прогресса

import time

class StreamProgress:
    def __init__(self):
        self.tokens = 0
        self.chunks = 0
        self.start_time = time.time()

    def update(self, chunk):
        self.chunks += 1
        if chunk.choices[0].delta.content:
            # Приблизительный подсчет токенов
            self.tokens += len(chunk.choices[0].delta.content.split())

    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            "chunks": self.chunks,
            "tokens": self.tokens,
            "time": elapsed,
            "tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
        }

progress = StreamProgress()

for chunk in stream:
    progress.update(chunk)
    # Обработка фрагмента...

print(f"\nСтатистика: {progress.get_stats()}")

Буферизация для производительности

class StreamBuffer {
  constructor(onFlush, bufferSize = 10, flushInterval = 100) {
    this.buffer = [];
    this.onFlush = onFlush;
    this.bufferSize = bufferSize;
    this.flushInterval = flushInterval;
    this.timer = null;
  }

  add(chunk) {
    this.buffer.push(chunk);

    if (this.buffer.length >= this.bufferSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  flush() {
    if (this.buffer.length > 0) {
      this.onFlush(this.buffer.join(''));
      this.buffer = [];
    }
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }
  }
}

// Использование
const buffer = new StreamBuffer((text) => {
  document.getElementById('output').innerHTML += text;
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  buffer.add(content);
}
buffer.flush(); // Финальная очистка

Обработка ошибок

Потоковые соединения могут прерваться в середине потока. Всегда реализуйте логику повторных попыток.
import time

def stream_with_retry(client, messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="gpt-4",
                messages=messages,
                stream=True
            )

            full_response = ""
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    full_response += content
                    yield content

            return  # Успех

        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Экспоненциальная задержка
                print(f"Поток прерван, повтор через {wait_time}с...")
                time.sleep(wait_time)
                # Продолжение с частичного ответа
                messages.append({"role": "assistant", "content": full_response})
                messages.append({"role": "user", "content": "продолжи"})
            else:
                raise e

Лучшие практики

  1. Буферизация для обновлений UI - Не обновляйте DOM для каждого фрагмента (пакетная обработка улучшает производительность)
  2. Показывайте индикаторы загрузки - Отображайте индикаторы печати во время потоковой передачи
  3. Реализуйте таймауты - Устанавливайте разумные таймауты для соединений
  4. Обрабатывайте прерывания - Используйте логику повторных попыток с экспоненциальной задержкой
  5. Освобождайте ресурсы - Всегда правильно закрывайте потоки
  6. Тестируйте сценарии ошибок - Убедитесь, что ваше приложение корректно обрабатывает сбои сети

Советы по производительности

Объединяйте небольшие фрагменты перед обновлением UI, чтобы избежать избыточных обновлений DOM.
  • Используйте flush=True в print Python для немедленного вывода
  • Реализуйте debouncing для частых обновлений UI
  • Рассмотрите виртуализацию для длинных ответов
  • Используйте Web Workers для парсинга в браузерах
  • Отслеживайте использование памяти для длинных потоков

Связанное