Source code for lexilux.rerank

"""
Rerank API client.

Provides a simple, function-like API for document reranking with unified usage tracking.
Supports multiple provider modes: OpenAI-compatible and DashScope.
Supports both sync and async operations with connection pooling.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING

import requests

from lexilux._async_client import AsyncClientMixin
from lexilux.usage import Json, ResultBase, Usage
from lexilux.chat.utils import parse_usage

if TYPE_CHECKING:
    import httpx

# Type aliases
Ranked = list[tuple[int, float]]  # (index, score)
RankedWithDoc = list[tuple[int, float, str]]  # (index, score, doc)


[docs] class RerankResult(ResultBase): """ Rerank result. The results field contains: - Ranked (List[Tuple[int, float]]) when include_docs=False - RankedWithDoc (List[Tuple[int, float, str]]) when include_docs=True Results are sorted by score in descending order. Attributes: results: Ranked results (with or without documents). usage: Usage statistics. raw: Raw API response. Examples: >>> result = rerank("python http", ["urllib", "requests", "httpx"]) >>> ranked = result.results # List[Tuple[int, float]] >>> print(ranked[0]) # (1, 0.95) - (index, score) >>> result = rerank("python http", ["urllib", "requests"], include_docs=True) >>> ranked = result.results # List[Tuple[int, float, str]] >>> print(ranked[0]) # (1, 0.95, "requests") - (index, score, doc) """
[docs] def __init__( self, *, results: Ranked | RankedWithDoc, usage: Usage, raw: Json | None = None, ): """ Initialize RerankResult. Args: results: Ranked results. usage: Usage statistics. raw: Raw API response. """ super().__init__(usage=usage, raw=raw) self.results = results
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"RerankResult(results=[{len(self.results)} items], usage={self.usage!r})" )
[docs] class RerankModeHandler(ABC): """ Abstract base class for rerank mode handlers. Each handler implements provider-specific request/response format conversion while maintaining a unified interface. Supports connection pooling via shared session. """
[docs] def __init__( self, base_url: str, api_key: str | None, headers: dict[str, str], timeout_s: float, proxies: dict[str, str] | None = None, session: requests.Session | None = None, ): """ Initialize mode handler. Args: base_url: Base URL for the API. api_key: API key for authentication. headers: HTTP headers. timeout_s: Request timeout in seconds. proxies: Optional proxy configuration dict. session: Optional requests.Session for connection pooling. """ self.base_url = base_url.rstrip("/") self.api_key = api_key self.headers = headers self.timeout_s = timeout_s self.proxies = proxies self._session = session
[docs] @abstractmethod def build_request( self, query: str, docs: Sequence[str], model: str, top_k: int | None, include_docs: bool, extra: Json | None, ) -> tuple[str, Json]: """ Build HTTP request for this mode. Args: query: Query string. docs: Document strings to rerank. model: Model identifier. top_k: Number of top results to return. include_docs: Whether to include documents in response. extra: Additional parameters. Returns: Tuple of (url, payload). """ pass
[docs] @abstractmethod def parse_response( self, response_data: Json, docs: Sequence[str], include_docs: bool, top_k: int | None, ) -> tuple[list[tuple[int, float, str | None]], Usage]: """ Parse API response for this mode. Args: response_data: Raw API response JSON. docs: Original document list (for index mapping). include_docs: Whether documents were requested. top_k: Requested top_k limit. Returns: Tuple of (parsed_results, usage). parsed_results: List of (index, score, optional_doc) tuples. """ pass
@staticmethod def _parse_common_results( results_data: list[Json], include_docs: bool ) -> list[tuple[int, float, str | None]]: """Parse common rerank result format.""" parsed_results: list[tuple[int, float, str | None]] = [] for item in results_data: if not isinstance(item, dict): raise ValueError( f"Unexpected result format: {item} (type: {type(item)})" ) index = item.get("index", 0) score = item.get("relevance_score", 0.0) doc = None if include_docs: doc_obj = item.get("document") if isinstance(doc_obj, dict): doc = doc_obj.get("text") or doc_obj.get("content") elif isinstance(doc_obj, str): doc = doc_obj parsed_results.append((index, score, doc)) return parsed_results
[docs] def make_request(self, url: str, payload: Json) -> Json: """ Make HTTP request using connection pooling. Args: url: Request URL. payload: Request payload. Returns: Response JSON data. Raises: requests.RequestException: On network or HTTP errors. """ # Use shared session if available, otherwise create a one-off request if self._session is not None: response = self._session.post( url, json=payload, headers=self.headers, timeout=self.timeout_s, proxies=self.proxies, ) else: response = requests.post( url, json=payload, headers=self.headers, timeout=self.timeout_s, proxies=self.proxies, ) response.raise_for_status() return response.json()
[docs] async def amake_request( self, url: str, payload: Json, async_client: "httpx.AsyncClient" ) -> Json: """ Make async HTTP request. Args: url: Request URL. payload: Request payload. async_client: httpx.AsyncClient instance. Returns: Response JSON data. Raises: httpx.HTTPError: On network or HTTP errors. """ response = await async_client.post(url, json=payload) response.raise_for_status() return response.json()
@staticmethod def _normalize_results( parsed_results: list[tuple[int, float, str | None]], include_docs: bool, top_k: int | None, ) -> Ranked | RankedWithDoc: """ Normalize parsed results to unified format. Args: parsed_results: List of (index, score, optional_doc) tuples. include_docs: Whether to include documents. top_k: Limit results to top_k. Returns: Normalized results (Ranked or RankedWithDoc). """ # Sort by score (descending - higher is better) parsed_results.sort(key=lambda x: x[1], reverse=True) # Apply top_k if specified if top_k is not None: parsed_results = parsed_results[:top_k] # Format results based on include_docs if include_docs: results: Ranked | RankedWithDoc = [ (idx, score, doc) for idx, score, doc in parsed_results if doc is not None ] else: results = [(idx, score) for idx, score, _ in parsed_results] return results
[docs] class OpenAICompatibleHandler(RerankModeHandler): """ Handler for OpenAI-compatible rerank API. Standard OpenAI rerank format: - Endpoint: POST {base_url}/rerank - Request: {"model": "...", "query": "...", "documents": [...], "top_n": ..., "return_documents": ...} - Response: {"results": [{"index": 0, "relevance_score": 0.95, "document": {"text": "..."}}, ...], "usage": {...}} """
[docs] def build_request( self, query: str, docs: Sequence[str], model: str, top_k: int | None, include_docs: bool, extra: Json | None, ) -> tuple[str, Json]: """Build OpenAI-compatible request.""" payload: Json = { "model": model, "query": query, "documents": list(docs), } if top_k is not None: payload["top_n"] = top_k if include_docs: payload["return_documents"] = True if extra: payload.update(extra) # Determine endpoint URL if "/rerank" in self.base_url: url = self.base_url else: url = f"{self.base_url}/rerank" return url, payload
[docs] def parse_response( self, response_data: Json, docs: Sequence[str], include_docs: bool, top_k: int | None, ) -> tuple[list[tuple[int, float, str | None]], Usage]: """Parse OpenAI-compatible response.""" results_data = response_data.get("results", []) if not results_data: raise ValueError("No results in API response") parsed_results = self._parse_common_results(results_data, include_docs) usage = parse_usage(response_data) return parsed_results, usage
[docs] class DashScopeHandler(RerankModeHandler): """ Handler for Alibaba Cloud DashScope rerank API. DashScope rerank format: - Endpoint: POST {base_url}/text-rerank/text-rerank (full path in base_url) - Request: {"model": "...", "input": {"query": "...", "documents": [...]}, "parameters": {...}} - Response: {"output": {"results": [...]}, "usage": {...}} """
[docs] def build_request( self, query: str, docs: Sequence[str], model: str, top_k: int | None, include_docs: bool, extra: Json | None, ) -> tuple[str, Json]: """Build DashScope request.""" payload: Json = { "model": model, "input": { "query": query, "documents": list(docs), }, } # DashScope uses "parameters" for additional options if top_k is not None or include_docs or extra: parameters: Json = {} if top_k is not None: parameters["top_n"] = top_k if include_docs: parameters["return_documents"] = True if extra: parameters.update(extra) if parameters: payload["parameters"] = parameters # DashScope endpoint is typically the full path url = self.base_url return url, payload
[docs] def parse_response( self, response_data: Json, docs: Sequence[str], include_docs: bool, top_k: int | None, ) -> tuple[list[tuple[int, float, str | None]], Usage]: """Parse DashScope response.""" output = response_data.get("output", {}) results_data = output.get("results", []) if not results_data: raise ValueError("No results in API response") parsed_results = self._parse_common_results(results_data, include_docs) usage = parse_usage(response_data) return parsed_results, usage
[docs] class Rerank(AsyncClientMixin): """ Rerank API client with connection pooling. Provides a simple, function-like API for document reranking. Supports two modes: - "openai": OpenAI-compatible standard API (default) - "dashscope": Alibaba Cloud DashScope API Uses connection pooling for improved performance in high-throughput scenarios. Examples: >>> # OpenAI-compatible mode (default) >>> rerank = Rerank( ... base_url="https://api.example.com/v1", ... api_key="key", ... model="rerank-model", ... mode="openai" ... ) >>> result = rerank("python http", ["urllib", "requests", "httpx"]) >>> ranked = result.results # List[Tuple[int, float]] >>> # DashScope mode >>> rerank = Rerank( ... base_url="https://dashscope.aliyuncs.com/api/v1/services/rerank/text-rerank/text-rerank", ... api_key="key", ... model="qwen3-rerank", ... mode="dashscope" ... ) >>> result = rerank("python http", ["urllib", "requests", "httpx"]) >>> # Context manager for proper resource cleanup >>> with Rerank(base_url="...", api_key="key") as rerank: ... result = rerank("query", ["doc1", "doc2"]) """ # Mode handler registry _HANDLERS: dict[str, type[RerankModeHandler]] = { "openai": OpenAICompatibleHandler, "dashscope": DashScopeHandler, }
[docs] def __init__( self, *, base_url: str, api_key: str | None = None, model: str | None = None, mode: str = "openai", timeout_s: float = 60.0, headers: dict[str, str] | None = None, proxies: dict[str, str] | None = None, pool_size: int = 10, ): """ Initialize Rerank client. Args: base_url: Base URL for the API (e.g., "https://api.example.com/v1"). api_key: API key for authentication (optional if provided in headers). model: Default model to use (can be overridden in __call__). mode: Rerank mode. "openai" for OpenAI-compatible, "dashscope" for DashScope. Default is "openai". timeout_s: Request timeout in seconds. headers: Additional headers to include in requests. proxies: Optional proxy configuration dict (e.g., {"http": "http://proxy:port"}). If None, uses environment variables (HTTP_PROXY, HTTPS_PROXY). To disable proxies, pass {}. pool_size: Connection pool size for HTTP adapter (default: 10, max: 100). Raises: ValueError: If mode is not supported or pool_size is out of range. """ if pool_size < 1: raise ValueError(f"pool_size must be at least 1, got {pool_size}") if pool_size > 100: raise ValueError(f"pool_size must be at most 100, got {pool_size}") if mode not in self._HANDLERS: available = ", ".join(f'"{m}"' for m in self._HANDLERS.keys()) raise ValueError(f'Mode must be one of {available}, got "{mode}"') self.base_url = base_url.rstrip("/") self.api_key = api_key self.model = model self.mode = mode self.timeout_s = timeout_s self.headers = headers or {} self.proxies = proxies # None means use environment variables # Set default headers if self.api_key: self.headers.setdefault("Authorization", f"Bearer {self.api_key}") self.headers.setdefault("Content-Type", "application/json") # Create Session with connection pooling for sync requests self._session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=pool_size, pool_maxsize=pool_size, ) self._session.mount("http://", adapter) self._session.mount("https://", adapter) # Initialize mode handler with shared session handler_class = self._HANDLERS[mode] self._handler = handler_class( base_url=self.base_url, api_key=self.api_key, headers=self.headers, timeout_s=self.timeout_s, proxies=self.proxies, session=self._session, ) # Initialize async client (lazy) - required by AsyncClientMixin self._async_client = None
def _get_handler_for_call(self, mode: str | None) -> RerankModeHandler: """Get or create handler for a specific call.""" use_mode = mode or self.mode if use_mode not in self._HANDLERS: available = ", ".join(f'"{m}"' for m in self._HANDLERS.keys()) raise ValueError(f'Mode must be one of {available}, got "{use_mode}"') if use_mode == self.mode: return self._handler # Create temporary handler for mode override (with shared session) handler_class = self._HANDLERS[use_mode] return handler_class( base_url=self.base_url, api_key=self.api_key, headers=self.headers, timeout_s=self.timeout_s, proxies=self.proxies, session=self._session, ) def _prepare_rerank( self, query: str, docs: Sequence[str], model: str | None, top_k: int | None, include_docs: bool, extra: Json | None, mode: str | None, ) -> tuple[RerankModeHandler, str, Json]: """Prepare for a rerank request, returning handler, URL, and payload.""" if not docs: raise ValueError("Docs cannot be empty") final_model = model or self.model if not final_model: raise ValueError("Model must be specified (either in __init__ or in call)") handler = self._get_handler_for_call(mode) url, payload = handler.build_request( query=query, docs=docs, model=final_model, top_k=top_k, include_docs=include_docs, extra=extra, ) return handler, url, payload def _process_response( self, handler: RerankModeHandler, response_data: Json, docs: Sequence[str], include_docs: bool, top_k: int | None, return_raw: bool, ) -> RerankResult: """Process the raw response data into a RerankResult.""" parsed_results, usage = handler.parse_response( response_data=response_data, docs=docs, include_docs=include_docs, top_k=top_k, ) results = handler._normalize_results(parsed_results, include_docs, top_k) return RerankResult( results=results, usage=usage, raw=response_data if return_raw else {}, )
[docs] def __call__( self, query: str, docs: Sequence[str], *, model: str | None = None, top_k: int | None = None, include_docs: bool = False, extra: Json | None = None, return_raw: bool = False, mode: str | None = None, ) -> RerankResult: """ Make a rerank request. Args: query: Query string. docs: Sequence of document strings to rerank. model: Model to use (overrides default). top_k: Number of top results to return (optional). include_docs: Whether to include documents in results. extra: Additional parameters to include in the request. return_raw: Whether to include full raw response. mode: Override mode for this call ("openai" or "dashscope"). Returns: RerankResult with ranked results and usage. Raises: requests.RequestException: On network or HTTP errors. ValueError: On invalid input or response format. """ handler, url, payload = self._prepare_rerank( query, docs, model, top_k, include_docs, extra, mode ) response_data = handler.make_request(url, payload) return self._process_response( handler, response_data, docs, include_docs, top_k, return_raw )
# ========================================================================= # Async Methods # =========================================================================
[docs] async def acall( self, query: str, docs: Sequence[str], *, model: str | None = None, top_k: int | None = None, include_docs: bool = False, extra: Json | None = None, return_raw: bool = False, mode: str | None = None, ) -> RerankResult: """ Make an async rerank request. This is the async version of ``__call__()``. All parameters and behavior are identical to the sync version. Args: query: Query string. docs: Sequence of document strings to rerank. model: Model to use (overrides default). top_k: Number of top results to return (optional). include_docs: Whether to include documents in results. extra: Additional parameters to include in the request. return_raw: Whether to include full raw response. mode: Override mode for this call. Returns: RerankResult with ranked results and usage. Examples: >>> result = await rerank.acall("python http", ["urllib", "requests"]) >>> ranked = result.results """ handler, url, payload = self._prepare_rerank( query, docs, model, top_k, include_docs, extra, mode ) client = self._get_async_client() response_data = await handler.amake_request(url, payload, client) return self._process_response( handler, response_data, docs, include_docs, top_k, return_raw )
[docs] def close(self) -> None: """ Close the sync session and release resources. Should be called when done with the client, or use context manager. """ if hasattr(self, "_session") and self._session is not None: self._session.close() self._session = None