Coderound_Comeback / matching_service.py
cloud450's picture
Upload 11 files
b2efd24 verified
import os
import asyncio
import logging
from typing import List, Tuple
from app.models.schemas import Candidate
logger = logging.getLogger(__name__)
class MatchService:
"""
Stage 2: Embedding-based semantic matching using Pinecone + SentenceTransformers.
Stores candidate embeddings, queries with JD embedding, returns top-K candidates.
"""
def __init__(self):
self._model = None
self._index = None
self._initialized = False
def _lazy_init(self):
"""Defer heavy imports until first use to keep startup fast."""
if self._initialized:
return
try:
from pinecone import Pinecone
from sentence_transformers import SentenceTransformer
api_key = os.getenv("PINECONE_API_KEY", "")
index_name = os.getenv("PINECONE_INDEX", "recruitment-index")
model_name = os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3")
if not api_key:
raise ValueError("PINECONE_API_KEY not set in environment.")
logger.info(f"[MatchService] Connecting to Pinecone index: {index_name}")
pc = Pinecone(api_key=api_key)
self._index = pc.Index(index_name)
logger.info(f"[MatchService] Loading embedding model: {model_name}")
self._model = SentenceTransformer(model_name)
self._initialized = True
logger.info("[MatchService] Ready.")
except Exception as e:
logger.error(f"[MatchService] Initialization failed: {e}")
raise
def get_embedding(self, text: str) -> List[float]:
self._lazy_init()
return self._model.encode(text, normalize_embeddings=True).tolist()
def _build_search_text(self, c: Candidate) -> str:
parts = [
c.name or "",
c.skills or "",
c.experience or "",
c.projects or "",
c.education or "",
c.resume_text or "",
]
return " ".join(p for p in parts if p.strip())
async def get_top_candidates(
self, jd: str, candidates: List[Candidate], top_k: int = None
) -> List[Candidate]:
"""
1. Embed all candidates and upsert to Pinecone.
2. Embed JD and query Pinecone.
3. Return top_k candidates sorted by similarity.
"""
if top_k is None:
top_k = int(os.getenv("STAGE2_TOP_K", "20"))
self._lazy_init()
candidate_map = {c.id: c for c in candidates}
# Build and embed vectors (run in thread to avoid blocking event loop)
loop = asyncio.get_event_loop()
def build_vectors():
vectors = []
for c in candidates:
text = self._build_search_text(c)
embedding = self.get_embedding(text)
vectors.append({
"id": c.id,
"values": embedding,
"metadata": {
"name": c.name,
"email": c.email or "",
},
})
return vectors
logger.info(f"[MatchService] Embedding {len(candidates)} candidates...")
vectors = await loop.run_in_executor(None, build_vectors)
# Upsert in batches of 100 (Pinecone limit)
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i: i + batch_size]
self._index.upsert(vectors=batch)
# Embed JD and query
logger.info("[MatchService] Querying Pinecone with JD embedding...")
jd_embedding = await loop.run_in_executor(None, self.get_embedding, jd)
effective_k = min(top_k, len(candidates))
query_results = self._index.query(
vector=jd_embedding,
top_k=effective_k,
include_metadata=True,
)
top_candidates: List[Candidate] = []
for match in query_results.matches:
if match.id in candidate_map:
top_candidates.append(candidate_map[match.id])
logger.info(f"[MatchService] Retrieved {len(top_candidates)} top candidates.")
return top_candidates
async def cleanup_index(self, candidate_ids: List[str]):
"""Optional: remove candidate vectors after evaluation to keep index clean."""
try:
self._index.delete(ids=candidate_ids)
logger.info(f"[MatchService] Cleaned up {len(candidate_ids)} vectors from index.")
except Exception as e:
logger.warning(f"[MatchService] Cleanup failed: {e}")
match_service = MatchService()