Ragora-Server / app /services /embedder_adapter.py
Peterase's picture
feat: async document processing with BackgroundTasks + thread pool executors
8b37702
"""Embedder adapter using sentence-transformers β€” fully non-blocking."""
from app.ports.embedder import EmbedderPort
from sentence_transformers import SentenceTransformer
from typing import List
from app.config import get_settings
import asyncio
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
# Shared thread-pool executor for CPU-bound work
_executor = None
def _get_executor():
"""Lazy-init a dedicated thread pool for embedding work."""
global _executor
if _executor is None:
from concurrent.futures import ThreadPoolExecutor
# Keep 1 worker β€” the model isn't thread-safe for parallel calls
_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="embedder")
return _executor
class SentenceTransformerAdapter(EmbedderPort):
"""Sentence-transformers implementation β€” runs in executor to avoid blocking."""
def __init__(self):
logger.info(f"Loading embedding model: {settings.EMBEDDING_MODEL}")
self.model = SentenceTransformer(settings.EMBEDDING_MODEL)
self.dimension = settings.EMBEDDING_DIMENSION
async def embed_text(self, text: str) -> List[float]:
"""Embed a single text without blocking the event loop."""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
_get_executor(),
lambda: self.model.encode(text, convert_to_numpy=True).tolist()
)
return result
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Embed a batch of texts without blocking the event loop."""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
_get_executor(),
lambda: self.model.encode(
texts,
convert_to_numpy=True,
show_progress_bar=True,
batch_size=8, # smaller batches = more responsive
).tolist()
)
return result
def get_dimension(self) -> int:
return self.dimension