Source code for lexilux.chat.streaming

"""
Streaming result accumulation.

Provides StreamingResult and StreamingIterator for automatic text accumulation
during streaming, allowing history to be updated in real-time.

Also provides async versions: AsyncStreamingIterator for async streaming.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING

from lexilux.chat.exceptions import ChatIncompleteResponseError
from lexilux.chat.models import ChatResult, ChatStreamChunk
from lexilux.usage import Usage

if TYPE_CHECKING:
    pass


[docs] class StreamingResult: """ Streaming accumulated result (can be used as ChatResult). Automatically accumulates text during streaming, content updates automatically on each iteration. Can be used as a string, or converted to ChatResult. """ __slots__ = ("_text_parts", "_text_cache", "_finish_reason", "_usage", "_done")
[docs] def __init__(self) -> None: """Initialize accumulated result.""" # Use list buffer for O(n) instead of O(n²) string concatenation self._text_parts: list[str] = [] self._text_cache: str | None = None # Lazy cache for joined text self._finish_reason: str | None = None self._usage: Usage = Usage() self._done: bool = False
[docs] def update(self, chunk: ChatStreamChunk) -> None: """Update accumulated content (internal call).""" if chunk.delta: self._text_parts.append(chunk.delta) self._text_cache = None # Invalidate cache if chunk.done: self._done = True # Only update finish_reason if chunk provides one (don't overwrite with None) if chunk.finish_reason is not None: self._finish_reason = chunk.finish_reason if chunk.usage: self._usage = chunk.usage
@property def text(self) -> str: """Get currently accumulated text (can be used as string).""" if self._text_cache is None: self._text_cache = "".join(self._text_parts) return self._text_cache @property def finish_reason(self) -> str | None: """Get finish_reason.""" return self._finish_reason @property def usage(self) -> Usage: """Get usage.""" return self._usage @property def done(self) -> bool: """Whether streaming is done.""" return self._done
[docs] def set_result( self, text: str, finish_reason: str | None, usage: Usage, ) -> None: """ Set complete result directly (for merged streaming results). This method properly sets all attributes according to __slots__, avoiding dynamic attribute creation. Args: text: Complete text content. finish_reason: Reason why generation stopped. usage: Usage statistics. """ self._text_parts = [text] # Store as single-element list self._text_cache = text # Pre-compute cache self._finish_reason = finish_reason self._usage = usage self._done = True
[docs] def to_chat_result(self) -> ChatResult: """Convert to ChatResult (for history).""" return ChatResult( text=self.text, finish_reason=self._finish_reason, usage=self._usage, )
[docs] def __str__(self) -> str: """Use as string.""" return self.text
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"StreamingResult(text={self.text!r}, done={self._done}, " f"finish_reason={self._finish_reason!r})" )
[docs] class StreamingIterator: """ Streaming iterator (wraps original iterator, provides accumulated result). Automatically updates accumulated result on each iteration, user can access current state at any time. """
[docs] def __init__(self, chunk_iterator: Iterator[ChatStreamChunk]) -> None: """Initialize.""" self._iterator = chunk_iterator self._result = StreamingResult()
[docs] def __iter__(self) -> Iterator[ChatStreamChunk]: """Iterate chunks.""" for chunk in self._iterator: self._result.update(chunk) # Auto-accumulate yield chunk
@property def result(self) -> StreamingResult: """Get currently accumulated result (accessible at any time).""" return self._result
class AsyncStreamingIterator: """ Async streaming iterator (wraps async iterator, provides accumulated result). Automatically updates accumulated result on each iteration, user can access current state at any time. Examples: >>> async for chunk in chat.astream("Hello"): ... print(chunk.delta, end="") >>> result = iterator.result.to_chat_result() """ def __init__(self, chunk_iterator: AsyncIterator[ChatStreamChunk]) -> None: """ Initialize async streaming iterator. Args: chunk_iterator: Async iterator yielding ChatStreamChunk objects. """ self._iterator = chunk_iterator self._result = StreamingResult() def __aiter__(self) -> AsyncIterator[ChatStreamChunk]: """Return self as async iterator.""" return self async def __anext__(self) -> ChatStreamChunk: """Get next chunk and update accumulated result.""" try: chunk = await self._iterator.__anext__() self._result.update(chunk) return chunk except StopAsyncIteration: raise @property def result(self) -> StreamingResult: """Get currently accumulated result (accessible at any time).""" return self._result async def collect(self) -> ChatResult: """ Consume all chunks and return final ChatResult. This is a convenience method to consume the entire stream and get the final result. Returns: ChatResult with accumulated text and usage. Examples: >>> result = await chat.astream("Hello").collect() >>> print(result.text) """ async for _ in self: pass return self._result.to_chat_result() class CompleteStreamingIterator(StreamingIterator): """ StreamingIterator wrapper that checks for truncation after iteration. Used by complete_stream() to ensure the final result is complete and raise an exception if still truncated after max_continues. """ def __init__( self, chunk_gen: Iterator[ChatStreamChunk], max_continues: int, ensure_complete: bool, ): super().__init__(chunk_gen) self._max_continues = max_continues self._ensure_complete = ensure_complete def __iter__(self) -> Iterator[ChatStreamChunk]: for chunk in self._iterator: self._result.update(chunk) yield chunk if self._ensure_complete: final_result = self.result.to_chat_result() if final_result.finish_reason == "length": raise ChatIncompleteResponseError( f"Response still truncated after {self._max_continues} continues. " f"Consider increasing max_continues or max_tokens.", final_result=final_result, continue_count=self._max_continues, max_continues=self._max_continues, ) class AsyncCompleteStreamingIterator(AsyncStreamingIterator): """ AsyncStreamingIterator wrapper that checks for truncation on completion. Used by acomplete_stream() to ensure the final result is complete and raise an exception if still truncated after max_continues. """ def __init__( self, chunk_gen: AsyncIterator[ChatStreamChunk], max_continues: int, ensure_complete: bool, ): super().__init__(chunk_gen) self._max_continues = max_continues self._ensure_complete = ensure_complete async def __anext__(self) -> ChatStreamChunk: try: chunk = await self._iterator.__anext__() self._result.update(chunk) return chunk except StopAsyncIteration: if self._ensure_complete: final_result = self.result.to_chat_result() if final_result.finish_reason == "length": raise ChatIncompleteResponseError( f"Response still truncated after {self._max_continues} continues.", final_result=final_result, continue_count=self._max_continues, max_continues=self._max_continues, ) raise