"""
Rerank API client.
Provides a simple, function-like API for document reranking with unified usage tracking.
Supports multiple provider modes: OpenAI-compatible and DashScope.
Supports both sync and async operations with connection pooling.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING
import requests
from lexilux._async_client import AsyncClientMixin
from lexilux.usage import Json, ResultBase, Usage
from lexilux.chat.utils import parse_usage
if TYPE_CHECKING:
import httpx
# Type aliases
Ranked = list[tuple[int, float]] # (index, score)
RankedWithDoc = list[tuple[int, float, str]] # (index, score, doc)
[docs]
class RerankResult(ResultBase):
"""
Rerank result.
The results field contains:
- Ranked (List[Tuple[int, float]]) when include_docs=False
- RankedWithDoc (List[Tuple[int, float, str]]) when include_docs=True
Results are sorted by score in descending order.
Attributes:
results: Ranked results (with or without documents).
usage: Usage statistics.
raw: Raw API response.
Examples:
>>> result = rerank("python http", ["urllib", "requests", "httpx"])
>>> ranked = result.results # List[Tuple[int, float]]
>>> print(ranked[0]) # (1, 0.95) - (index, score)
>>> result = rerank("python http", ["urllib", "requests"], include_docs=True)
>>> ranked = result.results # List[Tuple[int, float, str]]
>>> print(ranked[0]) # (1, 0.95, "requests") - (index, score, doc)
"""
[docs]
def __init__(
self,
*,
results: Ranked | RankedWithDoc,
usage: Usage,
raw: Json | None = None,
):
"""
Initialize RerankResult.
Args:
results: Ranked results.
usage: Usage statistics.
raw: Raw API response.
"""
super().__init__(usage=usage, raw=raw)
self.results = results
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"RerankResult(results=[{len(self.results)} items], usage={self.usage!r})"
)
[docs]
class RerankModeHandler(ABC):
"""
Abstract base class for rerank mode handlers.
Each handler implements provider-specific request/response format conversion
while maintaining a unified interface. Supports connection pooling via
shared session.
"""
[docs]
def __init__(
self,
base_url: str,
api_key: str | None,
headers: dict[str, str],
timeout_s: float,
proxies: dict[str, str] | None = None,
session: requests.Session | None = None,
):
"""
Initialize mode handler.
Args:
base_url: Base URL for the API.
api_key: API key for authentication.
headers: HTTP headers.
timeout_s: Request timeout in seconds.
proxies: Optional proxy configuration dict.
session: Optional requests.Session for connection pooling.
"""
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.headers = headers
self.timeout_s = timeout_s
self.proxies = proxies
self._session = session
[docs]
@abstractmethod
def build_request(
self,
query: str,
docs: Sequence[str],
model: str,
top_k: int | None,
include_docs: bool,
extra: Json | None,
) -> tuple[str, Json]:
"""
Build HTTP request for this mode.
Args:
query: Query string.
docs: Document strings to rerank.
model: Model identifier.
top_k: Number of top results to return.
include_docs: Whether to include documents in response.
extra: Additional parameters.
Returns:
Tuple of (url, payload).
"""
pass
[docs]
@abstractmethod
def parse_response(
self,
response_data: Json,
docs: Sequence[str],
include_docs: bool,
top_k: int | None,
) -> tuple[list[tuple[int, float, str | None]], Usage]:
"""
Parse API response for this mode.
Args:
response_data: Raw API response JSON.
docs: Original document list (for index mapping).
include_docs: Whether documents were requested.
top_k: Requested top_k limit.
Returns:
Tuple of (parsed_results, usage).
parsed_results: List of (index, score, optional_doc) tuples.
"""
pass
@staticmethod
def _parse_common_results(
results_data: list[Json], include_docs: bool
) -> list[tuple[int, float, str | None]]:
"""Parse common rerank result format."""
parsed_results: list[tuple[int, float, str | None]] = []
for item in results_data:
if not isinstance(item, dict):
raise ValueError(
f"Unexpected result format: {item} (type: {type(item)})"
)
index = item.get("index", 0)
score = item.get("relevance_score", 0.0)
doc = None
if include_docs:
doc_obj = item.get("document")
if isinstance(doc_obj, dict):
doc = doc_obj.get("text") or doc_obj.get("content")
elif isinstance(doc_obj, str):
doc = doc_obj
parsed_results.append((index, score, doc))
return parsed_results
[docs]
def make_request(self, url: str, payload: Json) -> Json:
"""
Make HTTP request using connection pooling.
Args:
url: Request URL.
payload: Request payload.
Returns:
Response JSON data.
Raises:
requests.RequestException: On network or HTTP errors.
"""
# Use shared session if available, otherwise create a one-off request
if self._session is not None:
response = self._session.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout_s,
proxies=self.proxies,
)
else:
response = requests.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout_s,
proxies=self.proxies,
)
response.raise_for_status()
return response.json()
[docs]
async def amake_request(
self, url: str, payload: Json, async_client: "httpx.AsyncClient"
) -> Json:
"""
Make async HTTP request.
Args:
url: Request URL.
payload: Request payload.
async_client: httpx.AsyncClient instance.
Returns:
Response JSON data.
Raises:
httpx.HTTPError: On network or HTTP errors.
"""
response = await async_client.post(url, json=payload)
response.raise_for_status()
return response.json()
@staticmethod
def _normalize_results(
parsed_results: list[tuple[int, float, str | None]],
include_docs: bool,
top_k: int | None,
) -> Ranked | RankedWithDoc:
"""
Normalize parsed results to unified format.
Args:
parsed_results: List of (index, score, optional_doc) tuples.
include_docs: Whether to include documents.
top_k: Limit results to top_k.
Returns:
Normalized results (Ranked or RankedWithDoc).
"""
# Sort by score (descending - higher is better)
parsed_results.sort(key=lambda x: x[1], reverse=True)
# Apply top_k if specified
if top_k is not None:
parsed_results = parsed_results[:top_k]
# Format results based on include_docs
if include_docs:
results: Ranked | RankedWithDoc = [
(idx, score, doc)
for idx, score, doc in parsed_results
if doc is not None
]
else:
results = [(idx, score) for idx, score, _ in parsed_results]
return results
[docs]
class OpenAICompatibleHandler(RerankModeHandler):
"""
Handler for OpenAI-compatible rerank API.
Standard OpenAI rerank format:
- Endpoint: POST {base_url}/rerank
- Request: {"model": "...", "query": "...", "documents": [...], "top_n": ..., "return_documents": ...}
- Response: {"results": [{"index": 0, "relevance_score": 0.95, "document": {"text": "..."}}, ...], "usage": {...}}
"""
[docs]
def build_request(
self,
query: str,
docs: Sequence[str],
model: str,
top_k: int | None,
include_docs: bool,
extra: Json | None,
) -> tuple[str, Json]:
"""Build OpenAI-compatible request."""
payload: Json = {
"model": model,
"query": query,
"documents": list(docs),
}
if top_k is not None:
payload["top_n"] = top_k
if include_docs:
payload["return_documents"] = True
if extra:
payload.update(extra)
# Determine endpoint URL
if "/rerank" in self.base_url:
url = self.base_url
else:
url = f"{self.base_url}/rerank"
return url, payload
[docs]
def parse_response(
self,
response_data: Json,
docs: Sequence[str],
include_docs: bool,
top_k: int | None,
) -> tuple[list[tuple[int, float, str | None]], Usage]:
"""Parse OpenAI-compatible response."""
results_data = response_data.get("results", [])
if not results_data:
raise ValueError("No results in API response")
parsed_results = self._parse_common_results(results_data, include_docs)
usage = parse_usage(response_data)
return parsed_results, usage
[docs]
class DashScopeHandler(RerankModeHandler):
"""
Handler for Alibaba Cloud DashScope rerank API.
DashScope rerank format:
- Endpoint: POST {base_url}/text-rerank/text-rerank (full path in base_url)
- Request: {"model": "...", "input": {"query": "...", "documents": [...]}, "parameters": {...}}
- Response: {"output": {"results": [...]}, "usage": {...}}
"""
[docs]
def build_request(
self,
query: str,
docs: Sequence[str],
model: str,
top_k: int | None,
include_docs: bool,
extra: Json | None,
) -> tuple[str, Json]:
"""Build DashScope request."""
payload: Json = {
"model": model,
"input": {
"query": query,
"documents": list(docs),
},
}
# DashScope uses "parameters" for additional options
if top_k is not None or include_docs or extra:
parameters: Json = {}
if top_k is not None:
parameters["top_n"] = top_k
if include_docs:
parameters["return_documents"] = True
if extra:
parameters.update(extra)
if parameters:
payload["parameters"] = parameters
# DashScope endpoint is typically the full path
url = self.base_url
return url, payload
[docs]
def parse_response(
self,
response_data: Json,
docs: Sequence[str],
include_docs: bool,
top_k: int | None,
) -> tuple[list[tuple[int, float, str | None]], Usage]:
"""Parse DashScope response."""
output = response_data.get("output", {})
results_data = output.get("results", [])
if not results_data:
raise ValueError("No results in API response")
parsed_results = self._parse_common_results(results_data, include_docs)
usage = parse_usage(response_data)
return parsed_results, usage
[docs]
class Rerank(AsyncClientMixin):
"""
Rerank API client with connection pooling.
Provides a simple, function-like API for document reranking.
Supports two modes:
- "openai": OpenAI-compatible standard API (default)
- "dashscope": Alibaba Cloud DashScope API
Uses connection pooling for improved performance in high-throughput scenarios.
Examples:
>>> # OpenAI-compatible mode (default)
>>> rerank = Rerank(
... base_url="https://api.example.com/v1",
... api_key="key",
... model="rerank-model",
... mode="openai"
... )
>>> result = rerank("python http", ["urllib", "requests", "httpx"])
>>> ranked = result.results # List[Tuple[int, float]]
>>> # DashScope mode
>>> rerank = Rerank(
... base_url="https://dashscope.aliyuncs.com/api/v1/services/rerank/text-rerank/text-rerank",
... api_key="key",
... model="qwen3-rerank",
... mode="dashscope"
... )
>>> result = rerank("python http", ["urllib", "requests", "httpx"])
>>> # Context manager for proper resource cleanup
>>> with Rerank(base_url="...", api_key="key") as rerank:
... result = rerank("query", ["doc1", "doc2"])
"""
# Mode handler registry
_HANDLERS: dict[str, type[RerankModeHandler]] = {
"openai": OpenAICompatibleHandler,
"dashscope": DashScopeHandler,
}
[docs]
def __init__(
self,
*,
base_url: str,
api_key: str | None = None,
model: str | None = None,
mode: str = "openai",
timeout_s: float = 60.0,
headers: dict[str, str] | None = None,
proxies: dict[str, str] | None = None,
pool_size: int = 10,
):
"""
Initialize Rerank client.
Args:
base_url: Base URL for the API (e.g., "https://api.example.com/v1").
api_key: API key for authentication (optional if provided in headers).
model: Default model to use (can be overridden in __call__).
mode: Rerank mode. "openai" for OpenAI-compatible, "dashscope" for DashScope.
Default is "openai".
timeout_s: Request timeout in seconds.
headers: Additional headers to include in requests.
proxies: Optional proxy configuration dict (e.g., {"http": "http://proxy:port"}).
If None, uses environment variables (HTTP_PROXY, HTTPS_PROXY).
To disable proxies, pass {}.
pool_size: Connection pool size for HTTP adapter (default: 10, max: 100).
Raises:
ValueError: If mode is not supported or pool_size is out of range.
"""
if pool_size < 1:
raise ValueError(f"pool_size must be at least 1, got {pool_size}")
if pool_size > 100:
raise ValueError(f"pool_size must be at most 100, got {pool_size}")
if mode not in self._HANDLERS:
available = ", ".join(f'"{m}"' for m in self._HANDLERS.keys())
raise ValueError(f'Mode must be one of {available}, got "{mode}"')
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.model = model
self.mode = mode
self.timeout_s = timeout_s
self.headers = headers or {}
self.proxies = proxies # None means use environment variables
# Set default headers
if self.api_key:
self.headers.setdefault("Authorization", f"Bearer {self.api_key}")
self.headers.setdefault("Content-Type", "application/json")
# Create Session with connection pooling for sync requests
self._session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_size,
pool_maxsize=pool_size,
)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Initialize mode handler with shared session
handler_class = self._HANDLERS[mode]
self._handler = handler_class(
base_url=self.base_url,
api_key=self.api_key,
headers=self.headers,
timeout_s=self.timeout_s,
proxies=self.proxies,
session=self._session,
)
# Initialize async client (lazy) - required by AsyncClientMixin
self._async_client = None
def _get_handler_for_call(self, mode: str | None) -> RerankModeHandler:
"""Get or create handler for a specific call."""
use_mode = mode or self.mode
if use_mode not in self._HANDLERS:
available = ", ".join(f'"{m}"' for m in self._HANDLERS.keys())
raise ValueError(f'Mode must be one of {available}, got "{use_mode}"')
if use_mode == self.mode:
return self._handler
# Create temporary handler for mode override (with shared session)
handler_class = self._HANDLERS[use_mode]
return handler_class(
base_url=self.base_url,
api_key=self.api_key,
headers=self.headers,
timeout_s=self.timeout_s,
proxies=self.proxies,
session=self._session,
)
def _prepare_rerank(
self,
query: str,
docs: Sequence[str],
model: str | None,
top_k: int | None,
include_docs: bool,
extra: Json | None,
mode: str | None,
) -> tuple[RerankModeHandler, str, Json]:
"""Prepare for a rerank request, returning handler, URL, and payload."""
if not docs:
raise ValueError("Docs cannot be empty")
final_model = model or self.model
if not final_model:
raise ValueError("Model must be specified (either in __init__ or in call)")
handler = self._get_handler_for_call(mode)
url, payload = handler.build_request(
query=query,
docs=docs,
model=final_model,
top_k=top_k,
include_docs=include_docs,
extra=extra,
)
return handler, url, payload
def _process_response(
self,
handler: RerankModeHandler,
response_data: Json,
docs: Sequence[str],
include_docs: bool,
top_k: int | None,
return_raw: bool,
) -> RerankResult:
"""Process the raw response data into a RerankResult."""
parsed_results, usage = handler.parse_response(
response_data=response_data,
docs=docs,
include_docs=include_docs,
top_k=top_k,
)
results = handler._normalize_results(parsed_results, include_docs, top_k)
return RerankResult(
results=results,
usage=usage,
raw=response_data if return_raw else {},
)
[docs]
def __call__(
self,
query: str,
docs: Sequence[str],
*,
model: str | None = None,
top_k: int | None = None,
include_docs: bool = False,
extra: Json | None = None,
return_raw: bool = False,
mode: str | None = None,
) -> RerankResult:
"""
Make a rerank request.
Args:
query: Query string.
docs: Sequence of document strings to rerank.
model: Model to use (overrides default).
top_k: Number of top results to return (optional).
include_docs: Whether to include documents in results.
extra: Additional parameters to include in the request.
return_raw: Whether to include full raw response.
mode: Override mode for this call ("openai" or "dashscope").
Returns:
RerankResult with ranked results and usage.
Raises:
requests.RequestException: On network or HTTP errors.
ValueError: On invalid input or response format.
"""
handler, url, payload = self._prepare_rerank(
query, docs, model, top_k, include_docs, extra, mode
)
response_data = handler.make_request(url, payload)
return self._process_response(
handler, response_data, docs, include_docs, top_k, return_raw
)
# =========================================================================
# Async Methods
# =========================================================================
[docs]
async def acall(
self,
query: str,
docs: Sequence[str],
*,
model: str | None = None,
top_k: int | None = None,
include_docs: bool = False,
extra: Json | None = None,
return_raw: bool = False,
mode: str | None = None,
) -> RerankResult:
"""
Make an async rerank request.
This is the async version of ``__call__()``. All parameters and behavior
are identical to the sync version.
Args:
query: Query string.
docs: Sequence of document strings to rerank.
model: Model to use (overrides default).
top_k: Number of top results to return (optional).
include_docs: Whether to include documents in results.
extra: Additional parameters to include in the request.
return_raw: Whether to include full raw response.
mode: Override mode for this call.
Returns:
RerankResult with ranked results and usage.
Examples:
>>> result = await rerank.acall("python http", ["urllib", "requests"])
>>> ranked = result.results
"""
handler, url, payload = self._prepare_rerank(
query, docs, model, top_k, include_docs, extra, mode
)
client = self._get_async_client()
response_data = await handler.amake_request(url, payload, client)
return self._process_response(
handler, response_data, docs, include_docs, top_k, return_raw
)
[docs]
def close(self) -> None:
"""
Close the sync session and release resources.
Should be called when done with the client, or use context manager.
"""
if hasattr(self, "_session") and self._session is not None:
self._session.close()
self._session = None