Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import time | |
| from services.embedding_service import EmbeddingService | |
| from services.pinecone_service import PineconeService | |
| from services.reranker_service import RerankerService | |
| from utils.text_builder import build_mentor_text, build_mentee_query_text | |
| from utils.scoring import rerank_mentors | |
| from config.settings import get_settings | |
| logger = logging.getLogger(__name__) | |
| class RecommendationService: | |
| def __init__(self): | |
| self.embedding_service = EmbeddingService() | |
| self.pinecone_service = PineconeService() | |
| self.settings = get_settings() | |
| def upsert_mentor( | |
| self, | |
| mentor_data: Dict[str, Any] | |
| ) -> bool: | |
| try: | |
| mentor_text = build_mentor_text(mentor_data) | |
| embedding = self.embedding_service.encode(mentor_text, is_query=False) | |
| def safe_float(value, default=0.0): | |
| if value is None: | |
| return default | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| return default | |
| def safe_int(value, default=0): | |
| if value is None: | |
| return default | |
| try: | |
| return int(value) | |
| except (ValueError, TypeError): | |
| return default | |
| metadata = { | |
| "mentor_id": str(mentor_data["mentor_id"]), | |
| "rating": safe_float(mentor_data.get("rating"), 0.0), | |
| "total_ratings": safe_int(mentor_data.get("total_ratings"), 0), | |
| "session_count": safe_int(mentor_data.get("session_count"), 0), | |
| "career_id": safe_int(mentor_data.get("career_id")) if mentor_data.get("career_id") else None, | |
| "status": str(mentor_data.get("status") or "ACTIVATED"), | |
| "mentor_text": mentor_text | |
| } | |
| if mentor_data.get("skill_ids"): | |
| metadata["skill_ids"] = [str(int(id)) for id in mentor_data["skill_ids"]] | |
| if mentor_data.get("domain_ids"): | |
| metadata["domain_ids"] = [str(int(id)) for id in mentor_data["domain_ids"]] | |
| return self.pinecone_service.upsert_mentor( | |
| mentor_id=str(mentor_data["mentor_id"]), | |
| vector=embedding, | |
| metadata=metadata | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to upsert mentor: {str(e)}") | |
| raise | |
| def recommend_mentors( | |
| self, | |
| mentee_data: Dict[str, Any], | |
| top_k: Optional[int] = None, | |
| final_count: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| pipeline_start = time.perf_counter() | |
| logger.info("[RECOMMEND] Starting recommendation pipeline") | |
| try: | |
| top_k = top_k or self.settings.RECOMMENDATION_TOP_K | |
| final_count = final_count or self.settings.RECOMMENDATION_FINAL_COUNT | |
| logger.info(f"[RECOMMEND] Configuration: top_k={top_k}, rerank_k={self.settings.RECOMMENDATION_RERANK_K}, final_count={final_count}") | |
| query_build_start = time.perf_counter() | |
| query_text = build_mentee_query_text(mentee_data) | |
| query_build_time = time.perf_counter() - query_build_start | |
| logger.info(f"[RECOMMEND] Query text built in {query_build_time:.3f}s: {query_text[:100]}...") | |
| embedding_start = time.perf_counter() | |
| query_embedding = self.embedding_service.encode(query_text, is_query=True) | |
| embedding_time = time.perf_counter() - embedding_start | |
| logger.info(f"[RECOMMEND] Query embedding generated in {embedding_time:.3f}s") | |
| pinecone_start = time.perf_counter() | |
| similar_mentors = self.pinecone_service.query_similar( | |
| query_vector=query_embedding, | |
| top_k=top_k, | |
| filter=None, | |
| include_metadata=True | |
| ) | |
| pinecone_time = time.perf_counter() - pinecone_start | |
| logger.info(f"[RECOMMEND] Pinecone query completed in {pinecone_time:.3f}s: found {len(similar_mentors)} candidates (no filter applied)") | |
| if not similar_mentors: | |
| logger.warning("[RECOMMEND] No similar mentors found, returning empty list") | |
| return [] | |
| metadata_start = time.perf_counter() | |
| for mentor in similar_mentors: | |
| metadata = mentor.get("metadata", {}) | |
| mentor["mentor_text"] = metadata.get("mentor_text", "") | |
| if not mentor["mentor_text"]: | |
| logger.warning(f"[RECOMMEND] Mentor {mentor.get('mentor_id', 'unknown')} missing mentor_text in metadata") | |
| metadata_time = time.perf_counter() - metadata_start | |
| logger.info(f"[RECOMMEND] Metadata extraction completed in {metadata_time:.3f}s") | |
| reranker_start = time.perf_counter() | |
| reranker = RerankerService() | |
| rerank_k = self.settings.RECOMMENDATION_RERANK_K | |
| try: | |
| reranked_mentors = reranker.rerank( | |
| query_text=query_text, | |
| candidates=similar_mentors, | |
| top_k=rerank_k | |
| ) | |
| reranker_time = time.perf_counter() - reranker_start | |
| logger.info(f"[RECOMMEND] Reranking completed in {reranker_time:.3f}s: {len(reranked_mentors)} mentors reranked") | |
| except Exception as e: | |
| reranker_time = time.perf_counter() - reranker_start | |
| logger.error(f"[RECOMMEND] Reranker failed after {reranker_time:.3f}s: {e}. Falling back to cosine similarity.", exc_info=True) | |
| for mentor in similar_mentors: | |
| mentor["reranker_score"] = mentor.get("score", 0.0) | |
| reranked_mentors = sorted(similar_mentors, key=lambda x: x.get("reranker_score", 0.0), reverse=True)[:rerank_k] | |
| scoring_start = time.perf_counter() | |
| reranked = rerank_mentors( | |
| reranked_mentors, | |
| mentee_data, | |
| final_count=final_count | |
| ) | |
| scoring_time = time.perf_counter() - scoring_start | |
| logger.info(f"[RECOMMEND] Final scoring completed in {scoring_time:.3f}s: {len(reranked)} final recommendations") | |
| total_time = time.perf_counter() - pipeline_start | |
| logger.info(f"[RECOMMEND] Recommendation pipeline completed in {total_time:.3f}s") | |
| logger.info(f"[RECOMMEND] Time breakdown - Query: {query_build_time:.3f}s, Embedding: {embedding_time:.3f}s, " | |
| f"Pinecone: {pinecone_time:.3f}s, Metadata: {metadata_time:.3f}s, " | |
| f"Reranker: {reranker_time:.3f}s, Scoring: {scoring_time:.3f}s") | |
| return reranked | |
| except Exception as e: | |
| total_time = time.perf_counter() - pipeline_start | |
| logger.error(f"[RECOMMEND] Failed to recommend mentors after {total_time:.3f}s: {str(e)}", exc_info=True) | |
| raise | |
| def delete_mentor(self, mentor_id: str) -> bool: | |
| try: | |
| return self.pinecone_service.delete_mentor(mentor_id) | |
| except Exception as e: | |
| logger.error(f"Failed to delete mentor: {str(e)}") | |
| raise | |