import os import asyncio import logging from typing import List, Tuple from app.models.schemas import Candidate logger = logging.getLogger(__name__) class MatchService: """ Stage 2: Embedding-based semantic matching using Pinecone + SentenceTransformers. Stores candidate embeddings, queries with JD embedding, returns top-K candidates. """ def __init__(self): self._model = None self._index = None self._initialized = False def _lazy_init(self): """Defer heavy imports until first use to keep startup fast.""" if self._initialized: return try: from pinecone import Pinecone from sentence_transformers import SentenceTransformer api_key = os.getenv("PINECONE_API_KEY", "") index_name = os.getenv("PINECONE_INDEX", "recruitment-index") model_name = os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3") if not api_key: raise ValueError("PINECONE_API_KEY not set in environment.") logger.info(f"[MatchService] Connecting to Pinecone index: {index_name}") pc = Pinecone(api_key=api_key) self._index = pc.Index(index_name) logger.info(f"[MatchService] Loading embedding model: {model_name}") self._model = SentenceTransformer(model_name) self._initialized = True logger.info("[MatchService] Ready.") except Exception as e: logger.error(f"[MatchService] Initialization failed: {e}") raise def get_embedding(self, text: str) -> List[float]: self._lazy_init() return self._model.encode(text, normalize_embeddings=True).tolist() def _build_search_text(self, c: Candidate) -> str: parts = [ c.name or "", c.skills or "", c.experience or "", c.projects or "", c.education or "", c.resume_text or "", ] return " ".join(p for p in parts if p.strip()) async def get_top_candidates( self, jd: str, candidates: List[Candidate], top_k: int = None ) -> List[Candidate]: """ 1. Embed all candidates and upsert to Pinecone. 2. Embed JD and query Pinecone. 3. Return top_k candidates sorted by similarity. """ if top_k is None: top_k = int(os.getenv("STAGE2_TOP_K", "20")) self._lazy_init() candidate_map = {c.id: c for c in candidates} # Build and embed vectors (run in thread to avoid blocking event loop) loop = asyncio.get_event_loop() def build_vectors(): vectors = [] for c in candidates: text = self._build_search_text(c) embedding = self.get_embedding(text) vectors.append({ "id": c.id, "values": embedding, "metadata": { "name": c.name, "email": c.email or "", }, }) return vectors logger.info(f"[MatchService] Embedding {len(candidates)} candidates...") vectors = await loop.run_in_executor(None, build_vectors) # Upsert in batches of 100 (Pinecone limit) batch_size = 100 for i in range(0, len(vectors), batch_size): batch = vectors[i: i + batch_size] self._index.upsert(vectors=batch) # Embed JD and query logger.info("[MatchService] Querying Pinecone with JD embedding...") jd_embedding = await loop.run_in_executor(None, self.get_embedding, jd) effective_k = min(top_k, len(candidates)) query_results = self._index.query( vector=jd_embedding, top_k=effective_k, include_metadata=True, ) top_candidates: List[Candidate] = [] for match in query_results.matches: if match.id in candidate_map: top_candidates.append(candidate_map[match.id]) logger.info(f"[MatchService] Retrieved {len(top_candidates)} top candidates.") return top_candidates async def cleanup_index(self, candidate_ids: List[str]): """Optional: remove candidate vectors after evaluation to keep index clean.""" try: self._index.delete(ids=candidate_ids) logger.info(f"[MatchService] Cleaned up {len(candidate_ids)} vectors from index.") except Exception as e: logger.warning(f"[MatchService] Cleanup failed: {e}") match_service = MatchService()