| """ |
| embeddings.py — Embedding interface backed by BAAI/bge-m3 (local or API). |
| |
| Design: |
| - EmbeddingModel (local) and APIEmbeddingModel (API) share duck-typed interface. |
| - Local: runs fully offline via FlagEmbedding — no API calls. |
| - API: OpenAI-compatible endpoint (EPFL RCP /v1/embeddings). |
| - Query vs. document encoding are separate methods. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from typing import Sequence |
|
|
| import httpx |
| import numpy as np |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class EmbeddingModel: |
| """ |
| Local embedding model using BAAI/bge-m3 via FlagEmbedding. |
| |
| Parameters |
| ---------- |
| model_name : str |
| HuggingFace model name or local path. Default: "BAAI/bge-m3". |
| batch_size : int |
| Batch size for encoding. |
| dimension : int |
| Dense embedding dimension (BGE-M3 = 1024). |
| use_fp16 : bool |
| Use fp16 precision for faster inference on GPU/CPU. |
| """ |
|
|
| def __init__( |
| self, |
| model_name: str = "BAAI/bge-m3", |
| batch_size: int = 32, |
| dimension: int = 1024, |
| use_fp16: bool = True, |
| ) -> None: |
| self.model_name = model_name |
| self.batch_size = batch_size |
| self.dimension = dimension |
|
|
| logger.info("Loading BGE-M3 model: %s (dimension=%d)", model_name, dimension) |
| from FlagEmbedding import BGEM3FlagModel |
| self._model = BGEM3FlagModel(model_name, use_fp16=use_fp16) |
| logger.info("BGE-M3 model loaded successfully.") |
|
|
| |
| |
| |
|
|
| def encode_texts(self, texts: Sequence[str]) -> np.ndarray: |
| """ |
| Encode a batch of texts into a (N, D) float32 numpy array using BGE-M3. |
| """ |
| output = self._model.encode( |
| list(texts), |
| batch_size=self.batch_size, |
| max_length=8192, |
| return_dense=True, |
| return_sparse=False, |
| return_colbert_vecs=False, |
| ) |
| return np.asarray(output["dense_vecs"], dtype=np.float32) |
|
|
| |
| |
| |
|
|
| def encode_query(self, query: str) -> np.ndarray: |
| """Encode a single query string → (D,) vector.""" |
| return self.encode_texts([query])[0] |
|
|
|
|
| class APIEmbeddingModel: |
| """ |
| OpenAI-compatible embedding via HTTP API (e.g. EPFL RCP /v1/embeddings). |
| |
| Parameters |
| ---------- |
| base_url : str |
| API base URL, e.g. "https://inference.rcp.epfl.ch/v1". |
| api_key : str |
| Bearer token for Authorization header. |
| model : str |
| Model name sent in the request body, e.g. "BAAI/bge-m3". |
| dimension : int |
| Expected embedding dimension; stored as .dimension property. |
| batch_size : int |
| Max texts per API call. |
| timeout : int |
| HTTP request timeout in seconds. |
| """ |
|
|
| def __init__( |
| self, |
| base_url: str, |
| api_key: str, |
| model: str = "BAAI/bge-m3", |
| dimension: int = 1024, |
| batch_size: int = 32, |
| timeout: int = 120, |
| ) -> None: |
| self.model = model |
| self.batch_size = batch_size |
| self._dimension = dimension |
| self._endpoint = f"{base_url}/embeddings" |
| self._client = httpx.Client( |
| headers={ |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json", |
| }, |
| timeout=timeout, |
| ) |
| logger.info("APIEmbeddingModel initialized: endpoint=%s, model=%s, dimension=%d", |
| self._endpoint, model, dimension) |
|
|
| @property |
| def dimension(self) -> int: |
| return self._dimension |
|
|
| def encode_texts(self, texts: Sequence[str]) -> np.ndarray: |
| """ |
| Encode texts using OpenAI-compatible /embeddings endpoint. |
| |
| Batches large inputs automatically per batch_size. |
| Returns np.ndarray of shape (N, D). |
| """ |
| texts = list(texts) |
| if not texts: |
| return np.empty((0, self._dimension), dtype=np.float32) |
|
|
| embeddings = [] |
| for i in range(0, len(texts), self.batch_size): |
| batch = texts[i : i + self.batch_size] |
| try: |
| response = self._client.post( |
| self._endpoint, |
| json={"model": self.model, "input": batch}, |
| ) |
| response.raise_for_status() |
| except httpx.TimeoutException as e: |
| raise RuntimeError(f"Embedding API timeout: {e}") from e |
| except httpx.HTTPStatusError as e: |
| raise RuntimeError( |
| f"Embedding API error {e.response.status_code}: {e.response.text}" |
| ) from e |
|
|
| data = response.json() |
| |
| batch_embeddings = sorted(data.get("data", []), key=lambda x: x.get("index", 0)) |
| embeddings.extend([item["embedding"] for item in batch_embeddings]) |
|
|
| return np.asarray(embeddings, dtype=np.float32) |
|
|
| def encode_query(self, query: str) -> np.ndarray: |
| """Encode a single query string → (D,) vector.""" |
| return self.encode_texts([query])[0] |
|
|
| def close(self) -> None: |
| """Close the HTTP client.""" |
| self._client.close() |
|
|
|
|
| |
| |
| |
|
|
| def build_embedder(cfg: dict) -> EmbeddingModel | APIEmbeddingModel: |
| """Build an EmbeddingModel (local or API) from the 'embeddings' section of config.""" |
| e = cfg["embeddings"] |
| backend = e.get("backend", "local") |
|
|
| if backend == "api": |
| return APIEmbeddingModel( |
| base_url=e["base_url"], |
| api_key=e["api_key"], |
| model=e.get("model_name", "BAAI/bge-m3"), |
| dimension=e.get("dimension", 1024), |
| batch_size=e.get("batch_size", 32), |
| timeout=e.get("timeout", 120), |
| ) |
|
|
| |
| return EmbeddingModel( |
| model_name=e.get("model_name", "BAAI/bge-m3"), |
| batch_size=e.get("batch_size", 32), |
| dimension=e.get("dimension", 1024), |
| use_fp16=e.get("use_fp16", True), |
| ) |
|
|