Source code for lexilux.embed

"""
Embedding API client.

Provides a simple, function-like API for text embeddings with unified usage tracking.
Supports both sync and async operations with connection pooling.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Literal

import requests

from lexilux._async_client import AsyncClientMixin
from lexilux.embed_params import EmbedParams
from lexilux.usage import Json, ResultBase, Usage
from lexilux.chat.utils import parse_usage

if TYPE_CHECKING:
    pass

# Type alias
Vector = list[float]


[docs] class EmbedResult(ResultBase): """ Embedding result. The vectors field contains: - Single Vector (List[float]) when input is a single string - List[Vector] (List[List[float]]) when input is a sequence of strings Attributes: vectors: Embedding vector(s). usage: Usage statistics. raw: Raw API response. Examples: >>> result = embed("Hello") >>> vector = result.vectors # List[float] >>> result = embed(["Hello", "World"]) >>> vectors = result.vectors # List[List[float]] """
[docs] def __init__( self, *, vectors: Vector | list[Vector], usage: Usage, raw: Json | None = None, ): """ Initialize EmbedResult. Args: vectors: Embedding vector(s). usage: Usage statistics. raw: Raw API response. """ super().__init__(usage=usage, raw=raw) self.vectors = vectors
[docs] def __repr__(self) -> str: """Return string representation.""" if isinstance(self.vectors[0], list): # List of vectors return f"EmbedResult(vectors=[{len(self.vectors)} vectors], usage={self.usage!r})" else: # Single vector return ( f"EmbedResult(vectors=[{len(self.vectors)} dims], usage={self.usage!r})" )
[docs] class Embed(AsyncClientMixin): """ Embedding API client with connection pooling. Provides a simple, function-like API for text embeddings. Uses connection pooling for improved performance in high-throughput scenarios. Examples: >>> embed = Embed(base_url="https://api.example.com/v1", api_key="key", model="text-embedding-ada-002") >>> result = embed("Hello, world!") >>> vector = result.vectors # List[float] >>> result = embed(["text1", "text2"]) >>> vectors = result.vectors # List[List[float]] >>> # Context manager for proper resource cleanup >>> with Embed(base_url="...", api_key="key") as embed: ... result = embed("Hello") """
[docs] def __init__( self, *, base_url: str, api_key: str | None = None, model: str | None = None, timeout_s: float = 60.0, headers: dict[str, str] | None = None, proxies: dict[str, str] | None = None, pool_size: int = 10, ): """ Initialize Embed client. Args: base_url: Base URL for the API (e.g., "https://api.openai.com/v1"). api_key: API key for authentication (optional if provided in headers). model: Default model to use (can be overridden in __call__). timeout_s: Request timeout in seconds. headers: Additional headers to include in requests. proxies: Optional proxy configuration dict (e.g., {"http": "http://proxy:port"}). If None, uses environment variables (HTTP_PROXY, HTTPS_PROXY). To disable proxies, pass {}. pool_size: Connection pool size for HTTP adapter (default: 10, max: 100). Raises: ValueError: If pool_size is not in range [1, 100]. """ if pool_size < 1: raise ValueError(f"pool_size must be at least 1, got {pool_size}") if pool_size > 100: raise ValueError(f"pool_size must be at most 100, got {pool_size}") self.base_url = base_url.rstrip("/") self.api_key = api_key self.model = model self.timeout_s = timeout_s self.headers = headers or {} self.proxies = proxies # None means use environment variables # Set default headers if self.api_key: self.headers.setdefault("Authorization", f"Bearer {self.api_key}") self.headers.setdefault("Content-Type", "application/json") # Create Session with connection pooling for sync requests self._session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=pool_size, pool_maxsize=pool_size, ) self._session.mount("http://", adapter) self._session.mount("https://", adapter) # Initialize async client (lazy) - required by AsyncClientMixin self._async_client = None
def _build_payload( self, input: str | Sequence[str], model: str | None, dimensions: int | None, encoding_format: Literal["float", "base64"] | None, user: str | None, params: EmbedParams | None, extra: Json | None, ) -> tuple[Json, bool]: """Build the payload for the embeddings request and identify input type.""" is_single = isinstance(input, str) input_list = [input] if is_single else list(input) if not input_list: raise ValueError("Input cannot be empty") final_model = model or self.model if not final_model: raise ValueError("Model must be specified (either in __init__ or in call)") # Start with params object if provided param_dict = params.to_dict(exclude_none=True) if params else {} # Individual arguments override what's in params if dimensions is not None: param_dict["dimensions"] = dimensions if encoding_format is not None: param_dict["encoding_format"] = encoding_format if user is not None: param_dict["user"] = user # Build final payload payload: Json = {"model": final_model, "input": input_list, **param_dict} # Merge extra parameters, which have the highest precedence if extra: payload.update(extra) return payload, is_single def _process_response( self, response_data: Json, is_single: bool, return_raw: bool ) -> EmbedResult: """Process the raw API response into an EmbedResult.""" data_list = response_data.get("data", []) if not data_list: raise ValueError("No data in API response") vectors: list[Vector] = [item["embedding"] for item in data_list] result_vectors: Vector | list[Vector] = vectors[0] if is_single else vectors usage = parse_usage(response_data) return EmbedResult( vectors=result_vectors, usage=usage, raw=response_data if return_raw else {}, )
[docs] def __call__( self, input: str | Sequence[str], *, model: str | None = None, dimensions: int | None = None, encoding_format: Literal["float", "base64"] | None = None, user: str | None = None, params: EmbedParams | None = None, extra: Json | None = None, return_raw: bool = False, ) -> EmbedResult: """ Make an embedding request. Supports both direct parameter passing (backward compatible) and EmbedParams dataclass for structured configuration. Args: input: Single text string or sequence of text strings. model: Model to use (overrides default). dimensions: Number of dimensions for output embeddings. Only supported in some models (e.g., ``text-embedding-3-*``). Default: None (use model default) encoding_format: Format to return embeddings. "float" (default) or "base64". Some providers may support additional formats. user: Unique identifier for end-user (for monitoring/rate limiting). params: EmbedParams dataclass instance. If provided, overrides individual parameters above. Useful for structured configuration. extra: Additional custom parameters for non-standard providers. Merged with params if both are provided. return_raw: Whether to include full raw response. Returns: EmbedResult with vectors and usage. Raises: requests.RequestException: On network or HTTP errors. ValueError: On invalid input or response format. Examples: Basic usage (backward compatible): >>> result = embed("Hello", dimensions=512) Using EmbedParams: >>> from lexilux import EmbedParams >>> params = EmbedParams(dimensions=512, encoding_format="float") >>> result = embed("Hello", params=params) Combining params and extra: >>> result = embed("Hello", params=params, extra={"custom": "value"}) """ payload, is_single = self._build_payload( input, model, dimensions, encoding_format, user, params, extra ) url = f"{self.base_url}/embeddings" # Use session with connection pooling response = self._session.post( url, json=payload, headers=self.headers, timeout=self.timeout_s, proxies=self.proxies, ) response.raise_for_status() response_data = response.json() return self._process_response(response_data, is_single, return_raw)
# ========================================================================= # Async Methods # =========================================================================
[docs] async def acall( self, input: str | Sequence[str], *, model: str | None = None, dimensions: int | None = None, encoding_format: Literal["float", "base64"] | None = None, user: str | None = None, params: EmbedParams | None = None, extra: Json | None = None, return_raw: bool = False, ) -> EmbedResult: """ Make an async embedding request. This is the async version of ``__call__()``. All parameters and behavior are identical to the sync version. Args: input: Single text string or sequence of text strings. model: Model to use (overrides default). dimensions: Number of dimensions for output embeddings. encoding_format: Format to return embeddings. user: Unique identifier for end-user. params: EmbedParams dataclass instance. extra: Additional custom parameters. return_raw: Whether to include full raw response. Returns: EmbedResult with vectors and usage. Examples: >>> result = await embed.acall("Hello") >>> vector = result.vectors """ payload, is_single = self._build_payload( input, model, dimensions, encoding_format, user, params, extra ) url = f"{self.base_url}/embeddings" client = self._get_async_client() response = await client.post(url, json=payload) response.raise_for_status() response_data = response.json() return self._process_response(response_data, is_single, return_raw)
[docs] def close(self) -> None: """ Close the sync session and release resources. Should be called when done with the client, or use context manager. """ if hasattr(self, "_session") and self._session is not None: self._session.close() self._session = None