"""Embedder adapter using sentence-transformers — fully non-blocking.""" from app.ports.embedder import EmbedderPort from sentence_transformers import SentenceTransformer from typing import List from app.config import get_settings import asyncio import logging logger = logging.getLogger(__name__) settings = get_settings() # Shared thread-pool executor for CPU-bound work _executor = None def _get_executor(): """Lazy-init a dedicated thread pool for embedding work.""" global _executor if _executor is None: from concurrent.futures import ThreadPoolExecutor # Keep 1 worker — the model isn't thread-safe for parallel calls _executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="embedder") return _executor class SentenceTransformerAdapter(EmbedderPort): """Sentence-transformers implementation — runs in executor to avoid blocking.""" def __init__(self): logger.info(f"Loading embedding model: {settings.EMBEDDING_MODEL}") self.model = SentenceTransformer(settings.EMBEDDING_MODEL) self.dimension = settings.EMBEDDING_DIMENSION async def embed_text(self, text: str) -> List[float]: """Embed a single text without blocking the event loop.""" loop = asyncio.get_event_loop() result = await loop.run_in_executor( _get_executor(), lambda: self.model.encode(text, convert_to_numpy=True).tolist() ) return result async def embed_batch(self, texts: List[str]) -> List[List[float]]: """Embed a batch of texts without blocking the event loop.""" loop = asyncio.get_event_loop() result = await loop.run_in_executor( _get_executor(), lambda: self.model.encode( texts, convert_to_numpy=True, show_progress_bar=True, batch_size=8, # smaller batches = more responsive ).tolist() ) return result def get_dimension(self) -> int: return self.dimension