Streaming with History Accumulation¶
Lexilux provides automatic text accumulation during streaming, allowing you to access the complete accumulated text at any point during iteration and seamlessly integrate with conversation history.
Overview¶
When using streaming, Lexilux automatically accumulates the text as chunks arrive. This allows you to:
Access the current accumulated text at any time during streaming
Convert streaming results to
ChatResultfor history managementUpdate conversation history in real-time during streaming
Handle interruptions gracefully
Key Concepts¶
StreamingResult: Automatically accumulates text from chunks
StreamingIterator: Wraps the chunk iterator and provides accumulated result
Real-time Access: Get current accumulated text at any point
Seamless Integration: Convert to
ChatResultfor history management
Basic Usage¶
Direct Streaming (Simple)¶
The simplest way to use streaming is to iterate directly over chat.stream():
from lexilux import Chat
chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")
# Stream directly
for chunk in chat.stream("Tell me a story"):
print(chunk.delta, end="", flush=True)
if chunk.done:
print(f"\\n\\nUsage: {chunk.usage.total_tokens} tokens")
print(f"Finish reason: {chunk.finish_reason}")
Automatic Accumulation (Default Behavior)¶
chat.stream() now returns StreamingIterator automatically, providing
automatic text accumulation:
from lexilux import Chat
chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")
# chat.stream() returns StreamingIterator automatically
iterator = chat.stream("Tell me a story")
# Iterate chunks
for chunk in iterator:
print(chunk.delta, end="")
# Access accumulated text at any time
current_text = iterator.result.text
print(f"\\n[Current length: {len(current_text)}]")
# After streaming, get complete result
complete_result = iterator.result.to_chat_result()
print(f"\\nComplete text: {complete_result.text}")
print(f"Finish reason: {complete_result.finish_reason}")
Note
chat.stream() now returns StreamingIterator by default. You no longer
need to manually wrap it. The iterator provides automatic text accumulation
and real-time access to the accumulated result.
Accessing Accumulated Result¶
When using StreamingIterator, you can access the accumulated result at any point:
from lexilux.chat import StreamingIterator
iterator = StreamingIterator(chat.stream("Write a long story"))
# Before iteration
assert iterator.result.text == ""
# During iteration
for chunk in iterator:
# Result is updated automatically
if len(iterator.result.text) > 100:
print("Story is getting long...")
print(f"Current: {iterator.result.text[:100]}...")
# After iteration
assert iterator.result.done is True
assert len(iterator.result.text) > 0
Integration with History¶
Manual History Updates¶
Pass history explicitly and manually update it after streaming completes:
from lexilux import Chat, ChatHistory
chat = Chat(...)
history = ChatHistory()
# Pass history explicitly - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)
for chunk in iterator:
print(chunk.delta, end="")
# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)
assert len(history.messages) == 2
assert history.messages[0]["role"] == "user"
assert history.messages[1]["role"] == "assistant"
assert history.messages[1]["content"] == result.text
Manual History Updates After Streaming¶
History is immutable - you must manually update it after streaming completes:
from lexilux import Chat, ChatHistory
history = ChatHistory()
# Pass history to stream - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)
for chunk in iterator:
print(chunk.delta, end="")
# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)
# After manual update, history contains complete response
assert history.messages[1]["content"] == result.text
Note
History is immutable - it is not updated automatically when you pass it to stream().
You must manually add the user message and append the result after streaming completes.
Handling Interruptions¶
If streaming is interrupted, the accumulated result still contains what was received:
# chat.stream() returns StreamingIterator automatically
iterator = chat.stream("Write a long story")
try:
for chunk in iterator:
print(chunk.delta, end="")
# Simulate interruption
if len(iterator.result.text) > 50:
raise ConnectionError("Network interrupted")
except ConnectionError as e:
# Even though interrupted, we have partial result
partial_text = iterator.result.text
print(f"\\nInterrupted, but got: {partial_text}")
# partial_text contains what was accumulated before interruption
# Result reflects partial state
assert iterator.result.done is False # Not completed
assert len(iterator.result.text) > 0 # But has partial text
Best Practices¶
Check Completion: Always check if streaming completed:
# chat.stream() returns StreamingIterator automatically iterator = chat.stream("Tell me a story") for chunk in iterator: print(chunk.delta, end="") if iterator.result.done: # Completed successfully result = iterator.result.to_chat_result() history.append_result(result) else: # Interrupted or incomplete print("Streaming was interrupted")
Update History Efficiently: Don’t update history on every chunk:
# Less efficient - updates on every chunk iterator = chat.stream("Tell me a story") for chunk in iterator: history.update_last_assistant(iterator.result.text) # More efficient - update only at the end iterator = chat.stream("Tell me a story") for chunk in iterator: print(chunk.delta, end="") if iterator.result.done: history.update_last_assistant(iterator.result.text)
Handle Partial Results: Be prepared for incomplete results:
iterator = chat.stream("Write a story") try: for chunk in iterator: print(chunk.delta, end="") except Exception as e: # Handle error, but still use partial result if len(iterator.result.text) > 0: # Save partial result partial_result = iterator.result.to_chat_result() history.append_result(partial_result)
Monitor Progress: Use accumulated text to monitor progress:
iterator = chat.stream("Write a long story") for chunk in iterator: print(chunk.delta, end="") # Monitor progress if len(iterator.result.text) % 100 == 0: print(f"\\n[Progress: {len(iterator.result.text)} chars]")
Common Pitfalls¶
Assuming Completion: Don’t assume streaming completed just because the loop ended. Always check
iterator.result.done:# Wrong - may be incomplete iterator = chat.stream("Tell me a story") for chunk in iterator: pass result = iterator.result.to_chat_result() # May be incomplete! # Correct - check completion iterator = chat.stream("Tell me a story") for chunk in iterator: pass if iterator.result.done: result = iterator.result.to_chat_result()
Multiple Iterations: Don’t iterate the same iterator multiple times:
iterator = chat.stream("Tell me a story") list(iterator) # First iteration - consumes all chunks list(iterator) # Second iteration - empty! No chunks left
Result State During Iteration: The result is updated during iteration, but
doneandfinish_reasonare only set when a chunk withdone=Truearrives:iterator = chat.stream("Tell me a story") for chunk in iterator: # result.text is updated immediately # but result.done is False until done=True chunk arrives if iterator.result.done: # This only happens when done=True chunk is processed break
Usage Statistics: Usage statistics are only available in the final chunk (when
done=True). Don’t rely on usage during intermediate chunks:iterator = chat.stream("Tell me a story") for chunk in iterator: # chunk.usage may be empty for intermediate chunks if chunk.done: # Now usage is complete print(f"Tokens: {chunk.usage.total_tokens}") # Or access from iterator.result after completion if iterator.result.done: print(f"Total tokens: {iterator.result.usage.total_tokens}")
Examples¶
Complete Streaming Workflow¶
from lexilux import Chat
from lexilux.chat import ChatHistory, ChatHistoryFormatter
chat = Chat(base_url="https://api.example.com/v1", api_key="key", model="gpt-4")
history = ChatHistory(system="You are a storyteller")
history.add_user("Tell me a story about Python")
# chat.stream() returns StreamingIterator automatically
iterator = chat.stream(history.get_messages())
# Stream and display
for chunk in iterator:
print(chunk.delta, end="", flush=True)
# Check completion
if iterator.result.done:
# Add to history
result = iterator.result.to_chat_result()
history.append_result(result)
# Export conversation
ChatHistoryFormatter.save(history, "story.md")
Streaming with History¶
from lexilux import Chat, ChatHistory
chat = Chat(...)
history = ChatHistory()
# Pass history explicitly - original history is NOT modified
iterator = chat.stream("Tell me a story", history=history)
for chunk in iterator:
print(chunk.delta, end="")
# Get result and manually update history
result = iterator.result.to_chat_result()
history.add_user("Tell me a story")
history.append_result(result)
assert len(history.messages) == 2 # user + assistant
assert len(history.messages[1]["content"]) == len(result.text)
Progress Monitoring¶
iterator = chat.stream("Write a long article")
last_length = 0
for chunk in iterator:
print(chunk.delta, end="")
current_length = len(iterator.result.text)
# Report progress every 100 characters
if current_length - last_length >= 100:
print(f"\\n[Progress: {current_length} characters]", end="", flush=True)
last_length = current_length
print(f"\\n\\nComplete: {len(iterator.result.text)} characters")
Error Recovery¶
iterator = chat.stream("Write a long story")
try:
for chunk in iterator:
print(chunk.delta, end="")
except Exception as e:
print(f"\\nError: {e}")
# Check if we got anything
if len(iterator.result.text) > 0:
print(f"\\nPartial result ({len(iterator.result.text)} chars):")
print(iterator.result.text[:200] + "...")
# Save partial result
partial = iterator.result.to_chat_result()
# Note: finish_reason will be None for incomplete results