Обзор
Потоковая передача позволяет получать токены ответа по мере их генерации, а не ждать полного ответа. Это идеально подходит для:
- Чат-ботов - Отображение ответов по мере их ввода
- Живых ассистентов - Показ прогресса в реальном времени
- Длинных ответов - Немедленное начало отображения контента
- Лучшего UX - Снижение воспринимаемой задержки
Как это работает
Включение потоковой передачи
Установите stream: true в вашем запросе
Получение фрагментов
Получайте токены ответа постепенно через SSE
Обработка событий
Парсите события data: содержащие фрагменты JSON
Обработка завершения
Следите за сигналом [DONE], чтобы узнать о завершении
Конечные точки
Потоковая передача работает с обоими форматами API:
POST https://ai.megallm.io/v1/chat/completions
POST https://ai.megallm.io/v1/messages
Обе конечные точки поддерживают параметр stream: true.
Формат запроса
Формат OpenAI
{
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Расскажи мне историю"}
],
"stream": true
}
Формат Anthropic
{
"model": "claude-3.5-sonnet",
"max_tokens": 500,
"messages": [
{"role": "user", "content": "Расскажи мне историю"}
],
"stream": true
}
Формат ответа
Структура потока событий
Ответы отправляются как Server-Sent Events:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Однажды"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" давным"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Жизненный цикл потока
- Начальный фрагмент - Содержит роль:
{"choices": [{"delta": {"role": "assistant"}}]}
- Фрагменты контента - Инкрементальный текст:
{"choices": [{"delta": {"content": "Привет"}}]}
{"choices": [{"delta": {"content": " мир"}}]}
- Финальный фрагмент - Включает finish_reason:
{"choices": [{"delta": {}, "finish_reason": "stop"}]}
- Завершение потока:
Примеры реализации
Python
JavaScript
Browser
React
cURL
from openai import OpenAI
client = OpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
# Создание потокового завершения
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "Расскажи мне историю"}
],
stream=True
)
# Обработка потока
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Async Python
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://ai.megallm.io/v1",
api_key="your-api-key"
)
async def stream_chat():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Расскажи мне историю"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_chat())
Расширенные функции
Вызов функций с потоковой передачей
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
stream=True
)
function_call = {"name": "", "arguments": ""}
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
tool_call = delta.tool_calls[0]
if tool_call.function.name:
function_call["name"] = tool_call.function.name
if tool_call.function.arguments:
function_call["arguments"] += tool_call.function.arguments
elif delta.content:
print(delta.content, end="", flush=True)
# Выполнение функции при завершении
if function_call["name"]:
result = execute_function(function_call)
Отслеживание прогресса
import time
class StreamProgress:
def __init__(self):
self.tokens = 0
self.chunks = 0
self.start_time = time.time()
def update(self, chunk):
self.chunks += 1
if chunk.choices[0].delta.content:
# Приблизительный подсчет токенов
self.tokens += len(chunk.choices[0].delta.content.split())
def get_stats(self):
elapsed = time.time() - self.start_time
return {
"chunks": self.chunks,
"tokens": self.tokens,
"time": elapsed,
"tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
}
progress = StreamProgress()
for chunk in stream:
progress.update(chunk)
# Обработка фрагмента...
print(f"\nСтатистика: {progress.get_stats()}")
Буферизация для производительности
class StreamBuffer {
constructor(onFlush, bufferSize = 10, flushInterval = 100) {
this.buffer = [];
this.onFlush = onFlush;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.timer = null;
}
add(chunk) {
this.buffer.push(chunk);
if (this.buffer.length >= this.bufferSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.buffer.length > 0) {
this.onFlush(this.buffer.join(''));
this.buffer = [];
}
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}
// Использование
const buffer = new StreamBuffer((text) => {
document.getElementById('output').innerHTML += text;
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
buffer.add(content);
}
buffer.flush(); // Финальная очистка
Обработка ошибок
Потоковые соединения могут прерваться в середине потока. Всегда реализуйте логику повторных попыток.
import time
def stream_with_retry(client, messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
return # Успех
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Экспоненциальная задержка
print(f"Поток прерван, повтор через {wait_time}с...")
time.sleep(wait_time)
# Продолжение с частичного ответа
messages.append({"role": "assistant", "content": full_response})
messages.append({"role": "user", "content": "продолжи"})
else:
raise e
Лучшие практики
- Буферизация для обновлений UI - Не обновляйте DOM для каждого фрагмента (пакетная обработка улучшает производительность)
- Показывайте индикаторы загрузки - Отображайте индикаторы печати во время потоковой передачи
- Реализуйте таймауты - Устанавливайте разумные таймауты для соединений
- Обрабатывайте прерывания - Используйте логику повторных попыток с экспоненциальной задержкой
- Освобождайте ресурсы - Всегда правильно закрывайте потоки
- Тестируйте сценарии ошибок - Убедитесь, что ваше приложение корректно обрабатывает сбои сети
Советы по производительности
Объединяйте небольшие фрагменты перед обновлением UI, чтобы избежать избыточных обновлений DOM.
- Используйте
flush=True в print Python для немедленного вывода
- Реализуйте debouncing для частых обновлений UI
- Рассмотрите виртуализацию для длинных ответов
- Используйте Web Workers для парсинга в браузерах
- Отслеживайте использование памяти для длинных потоков
Связанное