Source code for lexilux.chat.client

"""
Chat API client.

Provides a simple, function-like API for chat completions with support for
both non-streaming and streaming responses.
"""

from __future__ import annotations

import logging
from collections.abc import AsyncIterator, Callable, Iterator, Sequence
from typing import TYPE_CHECKING, Any

from lexilux._base import BaseAPIClient
from lexilux._rate_limit import RateLimiter
from lexilux.chat._request import (
    SSEChatStreamParser,
    build_api_messages,
    build_params_dict,
    build_payload,
    parse_chat_completion_response,
    prepare_messages_for_request,
)
from lexilux.chat.continuer import ConversationContinuer
from lexilux.chat.reasoning import (
    build_reasoning_request,
    normalize_reasoning,
)
from lexilux.providers.registry import detect_provider_from_url

from lexilux.chat.history import ChatHistory
from lexilux.chat.models import ChatResult, ChatStreamChunk, MessagesLike
from lexilux.chat.params import ChatParams
from lexilux.chat.streaming import (
    AsyncStreamingIterator,
    StreamingIterator,
)
from lexilux.chat.validation import (
    validate_chat_params,
    validate_messages,
    validate_model,
    validate_stop,
)
from lexilux.usage import Json

if TYPE_CHECKING:
    from lexilux.chat.tools import Tool

logger = logging.getLogger(__name__)


def _get_original_prompt(messages: MessagesLike) -> str:
    return messages if isinstance(messages, str) else str(messages)


[docs] class Chat(BaseAPIClient): """ Chat API client. Provides a simple, function-like API for chat completions with support for both non-streaming and streaming responses. **Important:** Chat is STATELESS - each call is independent. For multi-turn conversations, use ChatHistory to manage context and pass it via the `history` parameter. Method Overview: - `chat()` / `acall()`: Single request (may be truncated) - `stream()` / `astream()`: Streaming response (may be truncated) - `complete()` / `acomplete()`: Auto-continue if truncated - `complete_stream()` / `acomplete_stream()`: Streaming + auto-continue Related Classes: - ChatHistory: Manages conversation state (pass via `history` parameter) - Conversation: Low-level utility for handling truncated responses (use `chat.complete()` instead for simplicity) Examples: >>> # Simple single-turn query >>> chat = Chat(base_url="...", api_key="...", model="gpt-4") >>> result = chat("Hello, world!") >>> print(result.text) >>> # Streaming >>> for chunk in chat.stream("Tell me a joke"): ... print(chunk.delta, end="") >>> # Multi-turn conversation (use ChatHistory) >>> from lexilux import ChatHistory >>> history = ChatHistory(system="You are helpful") >>> history.add_user("My name is Alice") >>> result = chat(history.get_messages()) >>> history.add_assistant(result.text) >>> history.add_user("What's my name?") >>> result = chat(history.get_messages()) # AI remembers! >>> # Long content (auto-continue) >>> result = chat.complete("Write an essay", max_tokens=100) """
[docs] def __init__( self, *, base_url: str, api_key: str | None = None, model: str | None = None, timeout_s: float = 60.0, connect_timeout_s: float | None = None, read_timeout_s: float | None = None, max_retries: int = 0, headers: dict[str, str] | None = None, proxies: dict[str, str] | None = None, rate_limit: tuple[int, float] | None = None, ): """ Initialize Chat client. Args: base_url: Base URL for the API (e.g., "https://api.openai.com/v1"). api_key: API key for authentication (optional if provided in headers). model: Default model to use (can be overridden in __call__). timeout_s: Request timeout in seconds (default for both connect and read). connect_timeout_s: Connection timeout in seconds (overrides timeout_s). read_timeout_s: Read timeout in seconds (overrides timeout_s). max_retries: Maximum number of retries for failed requests (default: 0). headers: Additional headers to include in requests. proxies: Optional proxy configuration dict (e.g., {"http": "http://proxy:port"}). If None, uses environment variables (HTTP_PROXY, HTTPS_PROXY). To disable proxies, pass {}. rate_limit: Optional rate limiting as (max_rate, time_period) tuple. Example: (10, 60.0) for 10 requests per 60 seconds. Requires aiolimiter to be installed. Note: Each HTTP request creates a new connection that closes after completion. """ # Initialize base client super().__init__( base_url=base_url, api_key=api_key, timeout_s=timeout_s, connect_timeout_s=connect_timeout_s, read_timeout_s=read_timeout_s, max_retries=max_retries, headers=headers, proxies=proxies, ) # Chat-specific attributes self.model = model # Rate limiting self._rate_limiter: RateLimiter | None = None if rate_limit is not None: max_rate, time_period = rate_limit self._rate_limiter = RateLimiter(max_rate=max_rate, time_period=time_period) # Conversation continuer for complete() methods self._continuer = ConversationContinuer(self)
@property def timeout_s(self) -> float: """ Backward compatibility property for timeout. Returns the timeout value (or read timeout if tuple). """ if isinstance(self.timeout, tuple): return self.timeout[1] # Return read timeout return self.timeout def _build_payload( self, messages: MessagesLike, *, history: ChatHistory | None, model: str | None, system: str | None, params: ChatParams | None, extra: Json | None, stream: bool, include_usage: bool, reasoning: bool | dict[str, Any] | None = None, **kwargs: Any, ) -> Json: """ Build request payload (read-only from history, no cloning). This is the fast path for basic __call__ and stream operations. History is only read, never modified. """ # Validate model final_model = validate_model(model, self.model) # Build messages (read-only from history) api_messages = build_api_messages(messages, system=system, history=history) validate_messages(api_messages) # Validate chat parameters validate_chat_params( temperature=kwargs.get("temperature"), top_p=kwargs.get("top_p"), max_tokens=kwargs.get("max_tokens"), presence_penalty=kwargs.get("presence_penalty"), frequency_penalty=kwargs.get("frequency_penalty"), ) # Validate stop sequences stop = kwargs.get("stop") if stop is not None: validated_stop = validate_stop(stop) kwargs["stop"] = validated_stop param_dict = build_params_dict(params=params, **kwargs) # Build reasoning params if enabled reasoning_params = {} if reasoning is not None: provider_id = detect_provider_from_url(self.base_url) if provider_id: normalized = normalize_reasoning(reasoning) reasoning_params = build_reasoning_request(provider_id, normalized) # Merge reasoning params into param_dict if reasoning_params: param_dict.update(reasoning_params) return build_payload( model=final_model, messages=api_messages, params=param_dict, stream=stream, include_usage=include_usage, extra=extra, ) def _prepare_chat_request_with_history( self, messages: MessagesLike, *, history: ChatHistory | None, model: str | None, system: str | None, params: ChatParams | None, extra: Json | None, stream: bool, include_usage: bool, clone_history: bool = True, **kwargs: Any, ) -> tuple[Json, ChatHistory | None]: """ Prepare request payload with mutable history tracking. This is for complete() family methods that need to track conversation state across multiple API calls. """ # Validate model final_model = validate_model(model, self.model) normalized_messages, working_history, user_messages_to_add = ( prepare_messages_for_request( messages, system=system, history=history, clone_history=clone_history, ) ) # Validate messages validate_messages(normalized_messages) # Validate chat parameters validate_chat_params( temperature=kwargs.get("temperature"), top_p=kwargs.get("top_p"), max_tokens=kwargs.get("max_tokens"), presence_penalty=kwargs.get("presence_penalty"), frequency_penalty=kwargs.get("frequency_penalty"), ) # Validate stop sequences stop = kwargs.get("stop") if stop is not None: validated_stop = validate_stop(stop) kwargs["stop"] = validated_stop param_dict = build_params_dict(params=params, **kwargs) payload = build_payload( model=final_model, messages=normalized_messages, params=param_dict, stream=stream, include_usage=include_usage, extra=extra, ) if working_history: for user_msg in user_messages_to_add: working_history.add_user(user_msg) return payload, working_history def _process_chat_response_with_history( self, response_data: Json, working_history: ChatHistory | None, return_raw: bool, ) -> ChatResult: """Process response and update working history (for complete() family).""" result = parse_chat_completion_response(response_data, return_raw=return_raw) if working_history: working_history.append_result(result) return result
[docs] def __call__( self, messages: MessagesLike, *, history: ChatHistory | None = None, model: str | None = None, system: str | None = None, temperature: float | None = None, top_p: float | None = None, max_tokens: int | None = None, stop: str | Sequence[str] | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, logit_bias: dict[int, float] | None = None, user: str | None = None, n: int | None = None, tools: list[Tool] | None = None, tool_choice: str | Any | None = None, parallel_tool_calls: bool | None = None, params: ChatParams | None = None, extra: Json | None = None, reasoning: bool | dict[str, Any] | None = None, return_raw: bool = False, ) -> ChatResult: """ Make a single chat completion request. History is read-only - used for context but never modified. """ payload = self._build_payload( messages=messages, history=history, model=model, system=system, params=params, extra=extra, stream=False, include_usage=False, reasoning=reasoning, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, logit_bias=logit_bias, user=user, n=n, tools=tools, tool_choice=tool_choice, parallel_tool_calls=parallel_tool_calls, ) response = self._make_request("chat/completions", payload) return parse_chat_completion_response(response.json(), return_raw=return_raw)
[docs] def stream( self, messages: MessagesLike, *, history: ChatHistory | None = None, model: str | None = None, system: str | None = None, temperature: float | None = None, top_p: float | None = None, max_tokens: int | None = None, stop: str | Sequence[str] | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, logit_bias: dict[int, float] | None = None, user: str | None = None, tools: list[Tool] | None = None, tool_choice: str | Any | None = None, parallel_tool_calls: bool | None = None, params: ChatParams | None = None, extra: Json | None = None, reasoning: bool | dict[str, Any] | None = None, include_usage: bool = True, return_raw_events: bool = False, include_reasoning: bool = False, ) -> StreamingIterator: """ Stream a single chat completion response. History is read-only - used for context but never modified. """ payload = self._build_payload( messages=messages, history=history, model=model, system=system, params=params, extra=extra, stream=True, include_usage=include_usage, reasoning=reasoning, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, logit_bias=logit_bias, user=user, n=None, # n>1 is not supported in streaming tools=tools, tool_choice=tool_choice, parallel_tool_calls=parallel_tool_calls, ) # Create internal chunk generator using context manager def _chunk_generator() -> Iterator[ChatStreamChunk]: """Internal generator for streaming chunks.""" parser = SSEChatStreamParser( return_raw_events=return_raw_events, include_reasoning=include_reasoning, ) # Use context manager to ensure response is always closed with self._streaming_request_context( "chat/completions", payload ) as response: for line in response.iter_lines(): if not line: continue try: line_str = line.decode("utf-8") except UnicodeDecodeError: continue chunk = parser.feed_line(line_str) if chunk is None: continue yield chunk if parser.done: break # Create and return iterator (no cleanup wrapper needed) return StreamingIterator(_chunk_generator())
# ========================================================================= # Async Methods # ========================================================================= async def _amake_request( self, endpoint: str, payload: dict[str, Any], ) -> Any: """ Send async POST request to API endpoint with rate limiting. Overrides base method to add rate limiting if configured. Args: endpoint: API endpoint (e.g., "chat/completions"). payload: Request body as dict. Returns: httpx.Response object. Raises: LexiluxTimeoutError: On timeout. LexiluxConnectionError: On connection failure. AuthenticationError: On authentication failure. RateLimitError: On rate limit exceeded. APIError: On other API errors. ValidationError: On invalid input. """ # Apply rate limiting if configured if self._rate_limiter is not None: await self._rate_limiter.acquire() # Call parent's _amake_request return await super()._amake_request(endpoint, payload)
[docs] async def acall( self, messages: MessagesLike, *, history: ChatHistory | None = None, model: str | None = None, system: str | None = None, temperature: float | None = None, top_p: float | None = None, max_tokens: int | None = None, stop: str | Sequence[str] | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, logit_bias: dict[int, float] | None = None, user: str | None = None, n: int | None = None, tools: list[Tool] | None = None, tool_choice: str | Any | None = None, parallel_tool_calls: bool | None = None, params: ChatParams | None = None, extra: Json | None = None, reasoning: bool | dict[str, Any] | None = None, return_raw: bool = False, ) -> ChatResult: """ Make an async chat completion request. History is read-only - used for context but never modified. """ payload = self._build_payload( messages=messages, history=history, model=model, system=system, params=params, extra=extra, stream=False, include_usage=False, reasoning=reasoning, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, logit_bias=logit_bias, user=user, n=n, tools=tools, tool_choice=tool_choice, parallel_tool_calls=parallel_tool_calls, ) # Make async request response = await self._amake_request("chat/completions", payload) return parse_chat_completion_response(response.json(), return_raw=return_raw)
[docs] async def astream( self, messages: MessagesLike, *, history: ChatHistory | None = None, model: str | None = None, system: str | None = None, temperature: float | None = None, top_p: float | None = None, max_tokens: int | None = None, stop: str | Sequence[str] | None = None, presence_penalty: float | None = None, frequency_penalty: float | None = None, logit_bias: dict[int, float] | None = None, user: str | None = None, tools: list[Tool] | None = None, tool_choice: str | Any | None = None, parallel_tool_calls: bool | None = None, params: ChatParams | None = None, extra: Json | None = None, reasoning: bool | dict[str, Any] | None = None, include_usage: bool = True, return_raw_events: bool = False, include_reasoning: bool = False, ) -> AsyncStreamingIterator: """ Stream an async chat completion response. History is read-only - used for context but never modified. """ payload = self._build_payload( messages=messages, history=history, model=model, system=system, params=params, extra=extra, stream=True, include_usage=include_usage, reasoning=reasoning, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, logit_bias=logit_bias, user=user, n=None, # n>1 is not supported in streaming tools=tools, tool_choice=tool_choice, parallel_tool_calls=parallel_tool_calls, ) async def _async_chunk_generator() -> AsyncIterator[ChatStreamChunk]: # Apply rate limiting if configured (before starting stream) if self._rate_limiter is not None: await self._rate_limiter.acquire() parser = SSEChatStreamParser( return_raw_events=return_raw_events, include_reasoning=include_reasoning, ) stream = self._amake_streaming_request("chat/completions", payload) try: async for line in stream: chunk = parser.feed_line(line) if chunk is None: continue yield chunk if parser.done: break finally: logger.debug( "Closing async streaming response and releasing connection" ) await stream.aclose() # Create and return async iterator (no cleanup wrapper needed) return AsyncStreamingIterator(_async_chunk_generator())
[docs] def complete( self, messages: MessagesLike, *, history: ChatHistory | None = None, max_continues: int = 5, ensure_complete: bool = True, continue_prompt: str | Callable = "continue", on_progress: Callable | None = None, continue_delay: float | tuple[float, float] = 0.0, on_error: str = "raise", on_error_callback: Callable | None = None, **params: Any, ) -> ChatResult: """ Ensure a complete response, automatically handling truncation. **Behavior**: Automatically continues generation if the response is truncated, ensuring the returned result is complete (or raises an exception). **History Immutability**: If history is provided, a clone is created and used internally. The original history is never modified. **History Management**: - If `history` is provided, uses it (for multi-turn conversations) - If `history` is None, creates a new history internally (for single-turn conversations) - The history is automatically updated with the prompt and response Use this when: - You need a complete response (e.g., JSON extraction) - You cannot accept partial responses - Reliability is more important than performance For single responses (even if truncated), use `chat()` instead. Args: messages: Input messages. history: Optional ChatHistory instance. If None, creates a new one internally. max_continues: Maximum number of continuation attempts. ensure_complete: If True, raises ChatIncompleteResponseError if result is still truncated after max_continues. If False, returns partial result. continue_prompt: User prompt for continuation requests. Can be a string or a callable with signature: (count: int, max_count: int, current_text: str, original_prompt: str) -> str on_progress: Optional progress callback function with signature: (count: int, max_count: int, current_result: ChatResult, all_results: List[ChatResult]) -> None continue_delay: Delay between continue requests (seconds). Can be a float (fixed delay) or tuple (min, max) for random delay. Delay is only applied after the first continue. on_error: Error handling strategy: "raise" (default) or "return_partial". on_error_callback: Optional error callback function with signature: (error: Exception, partial_result: ChatResult) -> dict params: Additional parameters to pass to chat and continue requests. Returns: Complete ChatResult (never truncated, unless max_continues exceeded). Raises: ChatIncompleteResponseError: If ensure_complete=True and result is still truncated after max_continues. Examples: Single-turn conversation (no history needed): >>> result = chat.complete("Write a long JSON", max_tokens=100) >>> import json >>> json_data = json.loads(result.text) # Response is complete Multi-turn conversation (provide history): >>> history = ChatHistory() >>> result1 = chat.complete("First question", history=history) >>> result2 = chat.complete("Follow-up question", history=history) With progress tracking: >>> def on_progress(count, max_count, current, all_results): ... print(f"Continuing generation {count}/{max_count}...") >>> result = chat.complete("Write JSON", on_progress=on_progress) """ return self._continuer.complete( messages=messages, history=history, max_continues=max_continues, ensure_complete=ensure_complete, continue_prompt=continue_prompt, on_progress=on_progress, continue_delay=continue_delay, on_error=on_error, on_error_callback=on_error_callback, **params, )
[docs] def complete_stream( self, messages: MessagesLike, *, history: ChatHistory | None = None, max_continues: int = 5, ensure_complete: bool = True, continue_prompt: str | Callable = "continue", on_progress: Callable | None = None, continue_delay: float | tuple[float, float] = 0.0, on_error: str = "raise", on_error_callback: Callable | None = None, **params: Any, ) -> StreamingIterator: """ Stream a complete response, automatically handling truncation. **Behavior**: Automatically continues streaming if the response is truncated, ensuring the final result is complete (or raises an exception). **History Immutability**: If history is provided, a clone is created and used internally. The original history is never modified. **History Management**: - If `history` is provided, uses it (for multi-turn conversations) - If `history` is None, creates a new history internally (for single-turn conversations) - The history is automatically updated with the prompt and response Use this when: - You need a complete response with real-time output - You cannot accept partial responses - You want both streaming and completeness For single streaming responses (even if truncated), use `chat.stream()` instead. Args: messages: Input messages. history: Optional ChatHistory instance. If None, creates a new one internally. max_continues: Maximum number of continuation attempts. ensure_complete: If True, raises ChatIncompleteResponseError if result is still truncated after max_continues. If False, returns partial result. continue_prompt: User prompt for continuation requests. Can be a string or a callable with signature: (count: int, max_count: int, current_text: str, original_prompt: str) -> str on_progress: Optional progress callback function with signature: (count: int, max_count: int, current_result: ChatResult, all_results: List[ChatResult]) -> None continue_delay: Delay between continue requests (seconds). Can be a float (fixed delay) or tuple (min, max) for random delay. Delay is only applied after the first continue. on_error: Error handling strategy: "raise" (default) or "return_partial". on_error_callback: Optional error callback function with signature: (error: Exception, partial_result: ChatResult) -> dict params: Additional parameters to pass to chat and continue requests. Returns: StreamingIterator: Iterator that yields ChatStreamChunk objects from initial request and all continue requests. Access accumulated result via iterator.result. Raises: ChatIncompleteResponseError: If ensure_complete=True and result is still truncated after max_continues. Examples: Single-turn conversation (no history needed): >>> iterator = chat.complete_stream("Write a long JSON", max_tokens=100) >>> for chunk in iterator: ... print(chunk.delta, end="", flush=True) >>> result = iterator.result.to_chat_result() >>> import json >>> json_data = json.loads(result.text) # Response is complete Multi-turn conversation (provide history): >>> history = ChatHistory() >>> iterator1 = chat.complete_stream("First question", history=history) >>> iterator2 = chat.complete_stream("Follow-up", history=history) """ return self._continuer.complete_stream( messages=messages, history=history, max_continues=max_continues, ensure_complete=ensure_complete, continue_prompt=continue_prompt, on_progress=on_progress, continue_delay=continue_delay, on_error=on_error, on_error_callback=on_error_callback, **params, )
[docs] async def acomplete( self, messages: MessagesLike, *, history: ChatHistory | None = None, max_continues: int = 5, ensure_complete: bool = True, continue_prompt: str | Callable = "continue", on_progress: Callable | None = None, continue_delay: float | tuple[float, float] = 0.0, on_error: str = "raise", on_error_callback: Callable | None = None, **params: Any, ) -> ChatResult: """ Async version of complete(). Ensure a complete response asynchronously, automatically handling truncation. **Behavior**: Automatically continues generation if the response is truncated, ensuring the returned result is complete (or raises an exception). **History Immutability**: If history is provided, a clone is created and used internally. The original history is never modified. Args: messages: Input messages. history: Optional ChatHistory instance. max_continues: Maximum number of continuation attempts. ensure_complete: If True, raises ChatIncompleteResponseError if result is still truncated after max_continues. continue_prompt: User prompt for continuation requests. on_progress: Optional progress callback function. continue_delay: Delay between continue requests (seconds). on_error: Error handling strategy: "raise" (default) or "return_partial". on_error_callback: Optional error callback function. params: Additional parameters to pass to chat and continue requests. Returns: Complete ChatResult (never truncated, unless max_continues exceeded). Examples: >>> result = await chat.acomplete("Write a long JSON", max_tokens=100) >>> import json >>> json_data = json.loads(result.text) # Response is complete """ return await self._continuer.acomplete( messages=messages, history=history, max_continues=max_continues, ensure_complete=ensure_complete, continue_prompt=continue_prompt, on_progress=on_progress, continue_delay=continue_delay, on_error=on_error, on_error_callback=on_error_callback, **params, )
[docs] async def acomplete_stream( self, messages: MessagesLike, *, history: ChatHistory | None = None, max_continues: int = 5, ensure_complete: bool = True, continue_prompt: str | Callable = "continue", on_progress: Callable | None = None, continue_delay: float | tuple[float, float] = 0.0, on_error: str = "raise", on_error_callback: Callable | None = None, **params: Any, ) -> AsyncStreamingIterator: """ Async version of complete_stream(). Stream a complete response asynchronously, automatically handling truncation. **Behavior**: Automatically continues streaming if the response is truncated, ensuring the final result is complete (or raises an exception). **History Immutability**: If history is provided, a clone is created and used internally. The original history is never modified. Args: messages: Input messages. history: Optional ChatHistory instance. max_continues: Maximum number of continuation attempts. ensure_complete: If True, raises ChatIncompleteResponseError if result is still truncated after max_continues. continue_prompt: User prompt for continuation requests. on_progress: Optional progress callback function. continue_delay: Delay between continue requests (seconds). on_error: Error handling strategy: "raise" (default) or "return_partial". on_error_callback: Optional error callback function. params: Additional parameters to pass to chat and continue requests. Returns: AsyncStreamingIterator: Async iterator that yields ChatStreamChunk objects. Examples: >>> async for chunk in await chat.acomplete_stream("Write JSON"): ... print(chunk.delta, end="", flush=True) >>> result = iterator.result.to_chat_result() """ return await self._continuer.acomplete_stream( messages=messages, history=history, max_continues=max_continues, ensure_complete=ensure_complete, continue_prompt=continue_prompt, on_progress=on_progress, continue_delay=continue_delay, on_error=on_error, on_error_callback=on_error_callback, **params, )
[docs] def chat_with_history( self, history: ChatHistory, message: str | dict | None = None, **params, ) -> ChatResult: r""" Make a chat completion request using history. This is a convenience method. You can also use: >>> chat(message, history=history, \*\*params) Args: history: ChatHistory instance to use. message: Optional new message to add. If None, uses history as-is. ``**params``: Additional parameters to pass to __call__. Returns: ChatResult from the API call. Examples: >>> history = ChatHistory.from_messages("Hello") >>> result = chat.chat_with_history(history, temperature=0.7) >>> # Or with a new message: >>> result = chat.chat_with_history(history, "Continue", temperature=0.7) """ if message is not None: return self(message, history=history, **params) else: # Use last user message from history as the message last_user = history.get_last_user_message() if last_user is None: raise ValueError( "History has no user messages. Provide a message parameter." ) return self(last_user, history=history, **params)
[docs] def stream_with_history( self, history: ChatHistory, message: str | dict | None = None, **params, ) -> StreamingIterator: r""" Make a streaming chat completion request using history. This is a convenience method. You can also use: >>> chat.stream(message, history=history, \*\*params) Args: history: ChatHistory instance to use. message: Optional new message to add. If None, uses history as-is. ``**params``: Additional parameters to pass to stream(). Returns: StreamingIterator for the streaming response. Examples: >>> history = ChatHistory.from_messages("Hello") >>> iterator = chat.stream_with_history(history, temperature=0.7) >>> # Or with a new message: >>> iterator = chat.stream_with_history(history, "Continue", temperature=0.7) >>> for chunk in iterator: ... print(chunk.delta, end="") """ if message is not None: return self.stream(message, history=history, **params) else: # Use last user message from history as the message last_user = history.get_last_user_message() if last_user is None: raise ValueError( "History has no user messages. Provide a message parameter." ) return self.stream(last_user, history=history, **params)