Streaming with History Accumulation

Lexilux provides automatic text accumulation during streaming, allowing you to access the complete accumulated text at any point during iteration and seamlessly integrate with conversation history.

Overview

When using streaming, Lexilux automatically accumulates the text as chunks arrive. This allows you to:

  • Access the current accumulated text at any time during streaming

  • Convert streaming results to ChatResult for history management

  • Update conversation history in real-time during streaming

  • Handle interruptions gracefully

Key Concepts

  1. StreamingResult: Automatically accumulates text from chunks

  2. StreamingIterator: Wraps the chunk iterator and provides accumulated result

  3. Real-time Access: Get current accumulated text at any point

  4. Seamless Integration: Convert to ChatResult for history management

Basic Usage

Direct Streaming (Simple)

The simplest way to use streaming is to iterate directly over chat.stream():

from lexilux import Chat

chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")

# Stream directly
for chunk in chat.stream("Tell me a story"):
    print(chunk.delta, end="", flush=True)
    if chunk.done:
        print(f"\\n\\nUsage: {chunk.usage.total_tokens} tokens")
        print(f"Finish reason: {chunk.finish_reason}")

Automatic Accumulation (Default Behavior)

chat.stream() now returns StreamingIterator automatically, providing automatic text accumulation:

from lexilux import Chat

chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")

# chat.stream() returns StreamingIterator automatically
iterator = chat.stream("Tell me a story")

# Iterate chunks
for chunk in iterator:
    print(chunk.delta, end="")
    # Access accumulated text at any time
    current_text = iterator.result.text
    print(f"\\n[Current length: {len(current_text)}]")

# After streaming, get complete result
complete_result = iterator.result.to_chat_result()
print(f"\\nComplete text: {complete_result.text}")
print(f"Finish reason: {complete_result.finish_reason}")

Note

chat.stream() now returns StreamingIterator by default. You no longer need to manually wrap it. The iterator provides automatic text accumulation and real-time access to the accumulated result.

Accessing Accumulated Result

When using StreamingIterator, you can access the accumulated result at any point:

from lexilux.chat import StreamingIterator

iterator = StreamingIterator(chat.stream("Write a long story"))

# Before iteration
assert iterator.result.text == ""

# During iteration
for chunk in iterator:
    # Result is updated automatically
    if len(iterator.result.text) > 100:
        print("Story is getting long...")
        print(f"Current: {iterator.result.text[:100]}...")

# After iteration
assert iterator.result.done is True
assert len(iterator.result.text) > 0

Integration with History

Manual History Updates

Pass history explicitly and manually update it after streaming completes:

from lexilux import Chat, ChatHistory

chat = Chat(...)
history = ChatHistory()

# Pass history explicitly - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)
for chunk in iterator:
    print(chunk.delta, end="")

# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)
assert len(history.messages) == 2
assert history.messages[0]["role"] == "user"
assert history.messages[1]["role"] == "assistant"
assert history.messages[1]["content"] == result.text

Manual History Updates After Streaming

History is immutable - you must manually update it after streaming completes:

from lexilux import Chat, ChatHistory

history = ChatHistory()

# Pass history to stream - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)

for chunk in iterator:
    print(chunk.delta, end="")

# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)

# After manual update, history contains complete response
assert history.messages[1]["content"] == result.text

Note

History is immutable - it is not updated automatically when you pass it to stream(). You must manually add the user message and append the result after streaming completes.

Handling Interruptions

If streaming is interrupted, the accumulated result still contains what was received:

# chat.stream() returns StreamingIterator automatically
iterator = chat.stream("Write a long story")

try:
    for chunk in iterator:
        print(chunk.delta, end="")
        # Simulate interruption
        if len(iterator.result.text) > 50:
            raise ConnectionError("Network interrupted")
except ConnectionError as e:
    # Even though interrupted, we have partial result
    partial_text = iterator.result.text
    print(f"\\nInterrupted, but got: {partial_text}")
    # partial_text contains what was accumulated before interruption

# Result reflects partial state
assert iterator.result.done is False  # Not completed
assert len(iterator.result.text) > 0   # But has partial text

Best Practices

  1. Check Completion: Always check if streaming completed:

    # chat.stream() returns StreamingIterator automatically
    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        print(chunk.delta, end="")
    
    if iterator.result.done:
        # Completed successfully
        result = iterator.result.to_chat_result()
        history.append_result(result)
    else:
        # Interrupted or incomplete
        print("Streaming was interrupted")
    
  2. Update History Efficiently: Don’t update history on every chunk:

    # Less efficient - updates on every chunk
    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        history.update_last_assistant(iterator.result.text)
    
    # More efficient - update only at the end
    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        print(chunk.delta, end="")
    if iterator.result.done:
        history.update_last_assistant(iterator.result.text)
    
  3. Handle Partial Results: Be prepared for incomplete results:

    iterator = chat.stream("Write a story")
    try:
        for chunk in iterator:
            print(chunk.delta, end="")
    except Exception as e:
        # Handle error, but still use partial result
        if len(iterator.result.text) > 0:
            # Save partial result
            partial_result = iterator.result.to_chat_result()
            history.append_result(partial_result)
    
  4. Monitor Progress: Use accumulated text to monitor progress:

    iterator = chat.stream("Write a long story")
    for chunk in iterator:
        print(chunk.delta, end="")
        # Monitor progress
        if len(iterator.result.text) % 100 == 0:
            print(f"\\n[Progress: {len(iterator.result.text)} chars]")
    

Common Pitfalls

  1. Assuming Completion: Don’t assume streaming completed just because the loop ended. Always check iterator.result.done:

    # Wrong - may be incomplete
    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        pass
    result = iterator.result.to_chat_result()  # May be incomplete!
    
    # Correct - check completion
    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        pass
    if iterator.result.done:
        result = iterator.result.to_chat_result()
    
  2. Multiple Iterations: Don’t iterate the same iterator multiple times:

    iterator = chat.stream("Tell me a story")
    list(iterator)  # First iteration - consumes all chunks
    list(iterator)  # Second iteration - empty! No chunks left
    
  3. Result State During Iteration: The result is updated during iteration, but done and finish_reason are only set when a chunk with done=True arrives:

    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        # result.text is updated immediately
        # but result.done is False until done=True chunk arrives
        if iterator.result.done:
            # This only happens when done=True chunk is processed
            break
    
  4. Usage Statistics: Usage statistics are only available in the final chunk (when done=True). Don’t rely on usage during intermediate chunks:

    iterator = chat.stream("Tell me a story")
    for chunk in iterator:
        # chunk.usage may be empty for intermediate chunks
        if chunk.done:
            # Now usage is complete
            print(f"Tokens: {chunk.usage.total_tokens}")
    
    # Or access from iterator.result after completion
    if iterator.result.done:
        print(f"Total tokens: {iterator.result.usage.total_tokens}")
    

Examples

Complete Streaming Workflow

from lexilux import Chat
from lexilux.chat import ChatHistory, ChatHistoryFormatter

chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")

history = ChatHistory(system="You are a storyteller")
history.add_user("Tell me a story about Python")

# chat.stream() returns StreamingIterator automatically
iterator = chat.stream(history.get_messages())

# Stream and display
for chunk in iterator:
    print(chunk.delta, end="", flush=True)

# Check completion
if iterator.result.done:
    # Add to history
    result = iterator.result.to_chat_result()
    history.append_result(result)

    # Export conversation
    ChatHistoryFormatter.save(history, "story.md")

Streaming with History

from lexilux import Chat, ChatHistory

chat = Chat(...)
history = ChatHistory()

# Pass history explicitly - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)
for chunk in iterator:
    print(chunk.delta, end="")

# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)
assert len(history.messages) == 2  # user + assistant
assert len(history.messages[1]["content"]) == len(result.text)

Progress Monitoring

iterator = chat.stream("Write a long article")

last_length = 0
for chunk in iterator:
    print(chunk.delta, end="")
    current_length = len(iterator.result.text)

    # Report progress every 100 characters
    if current_length - last_length >= 100:
        print(f"\\n[Progress: {current_length} characters]", end="", flush=True)
        last_length = current_length

print(f"\\n\\nComplete: {len(iterator.result.text)} characters")

Error Recovery

iterator = chat.stream("Write a long story")

try:
    for chunk in iterator:
        print(chunk.delta, end="")
except Exception as e:
    print(f"\\nError: {e}")

    # Check if we got anything
    if len(iterator.result.text) > 0:
        print(f"\\nPartial result ({len(iterator.result.text)} chars):")
        print(iterator.result.text[:200] + "...")

        # Save partial result
        partial = iterator.result.to_chat_result()
        # Note: finish_reason will be None for incomplete results