Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.megallm.io/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Streaming allows you to receive response tokens as they’re generated, rather than waiting for the complete response. This is perfect for:
  • Chatbots - Display responses as they’re typed
  • Live assistants - Show progress in real-time
  • Long responses - Start displaying content immediately
  • Better UX - Reduce perceived latency

How It Works

1

Enable Streaming

Set stream: true in your request
2

Receive Chunks

Get response tokens incrementally via SSE
3

Process Events

Parse data: events containing JSON chunks
4

Handle Completion

Watch for [DONE] signal to know when complete

Endpoints

Streaming works with both API formats:
POST https://ai.megallm.io/v1/chat/completions
POST https://ai.megallm.io/v1/messages
Both endpoints support the stream: true parameter.

Request Format

OpenAI Format

{
  "model": "gpt-4",
  "messages": [
    {"role": "user", "content": "Tell me a story"}
  ],
  "stream": true
}

Anthropic Format

{
  "model": "claude-3.5-sonnet",
  "max_tokens": 500,
  "messages": [
    {"role": "user", "content": "Tell me a story"}
  ],
  "stream": true
}

Response Format

Event Stream Structure

Responses are sent as Server-Sent Events:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Once"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" upon"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Stream Lifecycle

  1. Initial chunk - Contains role:
{"choices": [{"delta": {"role": "assistant"}}]}
  1. Content chunks - Incremental text:
{"choices": [{"delta": {"content": "Hello"}}]}
{"choices": [{"delta": {"content": " world"}}]}
  1. Final chunk - Includes finish_reason:
{"choices": [{"delta": {}, "finish_reason": "stop"}]}
  1. Stream end:
data: [DONE]

Implementation Examples

from openai import OpenAI

client = OpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

# Create streaming completion
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "Tell me a story"}
    ],
    stream=True
)

# Process the stream
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Async Python

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://ai.megallm.io/v1",
    api_key="your-api-key"
)

async def stream_chat():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(stream_chat())

Advanced Features

Function Calling with Streaming

stream = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    tools=tools,
    stream=True
)

function_call = {"name": "", "arguments": ""}

for chunk in stream:
    delta = chunk.choices[0].delta

    if delta.tool_calls:
        tool_call = delta.tool_calls[0]
        if tool_call.function.name:
            function_call["name"] = tool_call.function.name
        if tool_call.function.arguments:
            function_call["arguments"] += tool_call.function.arguments

    elif delta.content:
        print(delta.content, end="", flush=True)

# Execute function when complete
if function_call["name"]:
    result = execute_function(function_call)

Progress Tracking

import time

class StreamProgress:
    def __init__(self):
        self.tokens = 0
        self.chunks = 0
        self.start_time = time.time()

    def update(self, chunk):
        self.chunks += 1
        if chunk.choices[0].delta.content:
            # Approximate token count
            self.tokens += len(chunk.choices[0].delta.content.split())

    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            "chunks": self.chunks,
            "tokens": self.tokens,
            "time": elapsed,
            "tokens_per_second": self.tokens / elapsed if elapsed > 0 else 0
        }

progress = StreamProgress()

for chunk in stream:
    progress.update(chunk)
    # Process chunk...

print(f"\nStats: {progress.get_stats()}")

Buffering for Performance

class StreamBuffer {
  constructor(onFlush, bufferSize = 10, flushInterval = 100) {
    this.buffer = [];
    this.onFlush = onFlush;
    this.bufferSize = bufferSize;
    this.flushInterval = flushInterval;
    this.timer = null;
  }

  add(chunk) {
    this.buffer.push(chunk);

    if (this.buffer.length >= this.bufferSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  flush() {
    if (this.buffer.length > 0) {
      this.onFlush(this.buffer.join(''));
      this.buffer = [];
    }
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }
  }
}

// Usage
const buffer = new StreamBuffer((text) => {
  document.getElementById('output').innerHTML += text;
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  buffer.add(content);
}
buffer.flush(); // Final flush

Error Handling

Streaming connections can fail mid-stream. Always implement retry logic.
import time

def stream_with_retry(client, messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="gpt-4",
                messages=messages,
                stream=True
            )

            full_response = ""
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    full_response += content
                    yield content

            return  # Success

        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Stream interrupted, retrying in {wait_time}s...")
                time.sleep(wait_time)
                # Continue from partial response
                messages.append({"role": "assistant", "content": full_response})
                messages.append({"role": "user", "content": "continue"})
            else:
                raise e

Best Practices

  1. Buffer for UI updates - Don’t update DOM for every chunk (batching improves performance)
  2. Show loading indicators - Display typing indicators during streaming
  3. Implement timeouts - Set reasonable timeouts for connections
  4. Handle interruptions - Use retry logic with exponential backoff
  5. Clean up resources - Always close streams properly
  6. Test error scenarios - Ensure your app handles network failures gracefully

Performance Tips

Buffer small chunks together before updating the UI to avoid excessive DOM updates.
  • Use flush=True in Python’s print for immediate output
  • Implement debouncing for frequent UI updates
  • Consider virtualization for long responses
  • Use Web Workers for parsing in browsers
  • Monitor memory usage for long streams