import logging import os from typing import Dict, Any, List from src.core.config import settings from src.core.ports.embedder_port import EmbedderPort logger = logging.getLogger(__name__) if os.name == 'nt': os.environ["HF_HUB_DISABLE_SYMLINKS"] = "1" try: import transformers.utils.import_utils if not hasattr(transformers.utils.import_utils, 'is_torch_fx_available'): transformers.utils.import_utils.is_torch_fx_available = lambda: False from FlagEmbedding import BGEM3FlagModel HAS_FLAG_EMBEDDING = True except ImportError as e: HAS_FLAG_EMBEDDING = False logger.warning(f"FlagEmbedding not installed: {e}. Using dummy embeddings.") class BgeEmbedderAdapter(EmbedderPort): def __init__(self): self.model = None self.model_name = settings.EMBEDDING_MODEL def _load_model(self): if self.model is None: if not HAS_FLAG_EMBEDDING: logger.warning("FlagEmbedding not installed. Using dummy embeddings.") return logger.info(f"Loading embedding model: {self.model_name}") try: self.model = BGEM3FlagModel(self.model_name, use_fp16=True) logger.info(f"Successfully loaded {self.model_name} (Hybrid Mode)") except Exception as e: logger.error(f"Failed to load embedding model: {e}", exc_info=True) raise e def encode_query(self, text: str) -> Dict[str, Any]: """Encodes a query string into dense and sparse vectors.""" if self.model is None: self._load_model() if not HAS_FLAG_EMBEDDING or self.model is None: return { "dense": [0.1] * settings.VECTOR_SIZE, "sparse": None } embeddings = self.model.encode( sentences=[text], batch_size=1, max_length=512, return_dense=True, return_sparse=True, return_colbert_vecs=False ) dense_vec = embeddings['dense_vecs'][0].tolist() lexical_dict = embeddings['lexical_weights'][0] sparse_vec = { "indices": [int(k) for k in lexical_dict.keys()], "values": [float(v) for v in lexical_dict.values()] } return { "dense": dense_vec, "sparse": sparse_vec } def encode_sparse_only(self, text: str) -> Dict[str, Any]: """ Encodes only the sparse (BM25/lexical) vector for a single query. Skips dense computation — ~2x faster than encode_query. Used for per-language sparse queries when the dense vector is already available from the English query. """ if self.model is None: self._load_model() if not HAS_FLAG_EMBEDDING or self.model is None: return {"sparse": None} embeddings = self.model.encode( sentences=[text], batch_size=1, max_length=512, return_dense=False, # skip dense — saves ~60% compute return_sparse=True, return_colbert_vecs=False ) lexical_dict = embeddings['lexical_weights'][0] sparse_vec = { "indices": [int(k) for k in lexical_dict.keys()], "values": [float(v) for v in lexical_dict.values()] } return {"sparse": sparse_vec} def encode_sparse_batch(self, texts: List[str]) -> List[Dict[str, Any]]: """ Encode multiple texts into sparse vectors in a SINGLE model forward pass. Why this matters: BGE-M3 holds the Python GIL during inference — ThreadPoolExecutor gives zero benefit for CPU-bound model calls. Calling encode_sparse_only() 6 times in a thread pool still runs sequentially. This method batches all 6 language queries into one model.encode() call, which is ~5x faster than 6 sequential calls because: - One tokenization pass for all texts - One forward pass through the transformer - GPU/CPU utilisation is much higher with batch_size=6 vs batch_size=1 Returns a list of sparse dicts in the same order as `texts`. Falls back to empty sparse vectors on failure. """ if not texts: return [] if self.model is None: self._load_model() if not HAS_FLAG_EMBEDDING or self.model is None: return [{"sparse": None} for _ in texts] try: embeddings = self.model.encode( sentences=texts, batch_size=len(texts), # all in one shot max_length=512, return_dense=False, # skip dense — not needed here return_sparse=True, return_colbert_vecs=False ) results = [] for lexical_dict in embeddings['lexical_weights']: results.append({ "sparse": { "indices": [int(k) for k in lexical_dict.keys()], "values": [float(v) for v in lexical_dict.values()], } }) return results except Exception as e: logger.error(f"encode_sparse_batch failed: {e} — returning empty sparse vectors") return [{"sparse": None} for _ in texts]