mentorme / services /recommendation_service.py
Doanh Van Vu
Refactor mentor availability handling across services
69934b0
from typing import List, Dict, Any, Optional
import logging
import time
from services.embedding_service import EmbeddingService
from services.pinecone_service import PineconeService
from services.reranker_service import RerankerService
from utils.text_builder import build_mentor_text, build_mentee_query_text
from utils.scoring import rerank_mentors
from config.settings import get_settings
logger = logging.getLogger(__name__)
class RecommendationService:
def __init__(self):
self.embedding_service = EmbeddingService()
self.pinecone_service = PineconeService()
self.settings = get_settings()
def upsert_mentor(
self,
mentor_data: Dict[str, Any]
) -> bool:
try:
mentor_text = build_mentor_text(mentor_data)
embedding = self.embedding_service.encode(mentor_text, is_query=False)
def safe_float(value, default=0.0):
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def safe_int(value, default=0):
if value is None:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
metadata = {
"mentor_id": str(mentor_data["mentor_id"]),
"rating": safe_float(mentor_data.get("rating"), 0.0),
"total_ratings": safe_int(mentor_data.get("total_ratings"), 0),
"session_count": safe_int(mentor_data.get("session_count"), 0),
"career_id": safe_int(mentor_data.get("career_id")) if mentor_data.get("career_id") else None,
"status": str(mentor_data.get("status") or "ACTIVATED"),
"mentor_text": mentor_text
}
if mentor_data.get("skill_ids"):
metadata["skill_ids"] = [str(int(id)) for id in mentor_data["skill_ids"]]
if mentor_data.get("domain_ids"):
metadata["domain_ids"] = [str(int(id)) for id in mentor_data["domain_ids"]]
return self.pinecone_service.upsert_mentor(
mentor_id=str(mentor_data["mentor_id"]),
vector=embedding,
metadata=metadata
)
except Exception as e:
logger.error(f"Failed to upsert mentor: {str(e)}")
raise
def recommend_mentors(
self,
mentee_data: Dict[str, Any],
top_k: Optional[int] = None,
final_count: Optional[int] = None
) -> List[Dict[str, Any]]:
pipeline_start = time.perf_counter()
logger.info("[RECOMMEND] Starting recommendation pipeline")
try:
top_k = top_k or self.settings.RECOMMENDATION_TOP_K
final_count = final_count or self.settings.RECOMMENDATION_FINAL_COUNT
logger.info(f"[RECOMMEND] Configuration: top_k={top_k}, rerank_k={self.settings.RECOMMENDATION_RERANK_K}, final_count={final_count}")
query_build_start = time.perf_counter()
query_text = build_mentee_query_text(mentee_data)
query_build_time = time.perf_counter() - query_build_start
logger.info(f"[RECOMMEND] Query text built in {query_build_time:.3f}s: {query_text[:100]}...")
embedding_start = time.perf_counter()
query_embedding = self.embedding_service.encode(query_text, is_query=True)
embedding_time = time.perf_counter() - embedding_start
logger.info(f"[RECOMMEND] Query embedding generated in {embedding_time:.3f}s")
pinecone_start = time.perf_counter()
similar_mentors = self.pinecone_service.query_similar(
query_vector=query_embedding,
top_k=top_k,
filter=None,
include_metadata=True
)
pinecone_time = time.perf_counter() - pinecone_start
logger.info(f"[RECOMMEND] Pinecone query completed in {pinecone_time:.3f}s: found {len(similar_mentors)} candidates (no filter applied)")
if not similar_mentors:
logger.warning("[RECOMMEND] No similar mentors found, returning empty list")
return []
metadata_start = time.perf_counter()
for mentor in similar_mentors:
metadata = mentor.get("metadata", {})
mentor["mentor_text"] = metadata.get("mentor_text", "")
if not mentor["mentor_text"]:
logger.warning(f"[RECOMMEND] Mentor {mentor.get('mentor_id', 'unknown')} missing mentor_text in metadata")
metadata_time = time.perf_counter() - metadata_start
logger.info(f"[RECOMMEND] Metadata extraction completed in {metadata_time:.3f}s")
reranker_start = time.perf_counter()
reranker = RerankerService()
rerank_k = self.settings.RECOMMENDATION_RERANK_K
try:
reranked_mentors = reranker.rerank(
query_text=query_text,
candidates=similar_mentors,
top_k=rerank_k
)
reranker_time = time.perf_counter() - reranker_start
logger.info(f"[RECOMMEND] Reranking completed in {reranker_time:.3f}s: {len(reranked_mentors)} mentors reranked")
except Exception as e:
reranker_time = time.perf_counter() - reranker_start
logger.error(f"[RECOMMEND] Reranker failed after {reranker_time:.3f}s: {e}. Falling back to cosine similarity.", exc_info=True)
for mentor in similar_mentors:
mentor["reranker_score"] = mentor.get("score", 0.0)
reranked_mentors = sorted(similar_mentors, key=lambda x: x.get("reranker_score", 0.0), reverse=True)[:rerank_k]
scoring_start = time.perf_counter()
reranked = rerank_mentors(
reranked_mentors,
mentee_data,
final_count=final_count
)
scoring_time = time.perf_counter() - scoring_start
logger.info(f"[RECOMMEND] Final scoring completed in {scoring_time:.3f}s: {len(reranked)} final recommendations")
total_time = time.perf_counter() - pipeline_start
logger.info(f"[RECOMMEND] Recommendation pipeline completed in {total_time:.3f}s")
logger.info(f"[RECOMMEND] Time breakdown - Query: {query_build_time:.3f}s, Embedding: {embedding_time:.3f}s, "
f"Pinecone: {pinecone_time:.3f}s, Metadata: {metadata_time:.3f}s, "
f"Reranker: {reranker_time:.3f}s, Scoring: {scoring_time:.3f}s")
return reranked
except Exception as e:
total_time = time.perf_counter() - pipeline_start
logger.error(f"[RECOMMEND] Failed to recommend mentors after {total_time:.3f}s: {str(e)}", exc_info=True)
raise
def delete_mentor(self, mentor_id: str) -> bool:
try:
return self.pinecone_service.delete_mentor(mentor_id)
except Exception as e:
logger.error(f"Failed to delete mentor: {str(e)}")
raise