from pinecone import Pinecone, ServerlessSpec from typing import List, Dict, Optional, Any import logging import time from config.settings import get_settings logger = logging.getLogger(__name__) class PineconeService: _instance = None _client = None _index = None def __new__(cls): if cls._instance is None: cls._instance = super(PineconeService, cls).__new__(cls) return cls._instance def __init__(self): if PineconeService._client is None: self._initialize() def _initialize(self): settings = get_settings() if not settings.PINECONE_API_KEY: raise ValueError("PINECONE_API_KEY is required") try: PineconeService._client = Pinecone(api_key=settings.PINECONE_API_KEY) index_name = settings.PINECONE_INDEX existing_indexes = [idx.name for idx in PineconeService._client.list_indexes()] if index_name not in existing_indexes: logger.info(f"Creating Pinecone index: {index_name}") PineconeService._client.create_index( name=index_name, dimension=settings.PINECONE_DIMENSION, metric="cosine", spec=ServerlessSpec( cloud="aws", region=settings.PINECONE_ENVIRONMENT ) ) logger.info(f"Index {index_name} created successfully") PineconeService._index = PineconeService._client.Index(index_name) logger.info(f"Connected to Pinecone index: {index_name}") except Exception as e: logger.error(f"Failed to initialize Pinecone: {str(e)}") raise def upsert_mentor( self, mentor_id: str, vector: List[float], metadata: Dict[str, Any] ) -> bool: try: settings = get_settings() expected_dim = settings.PINECONE_DIMENSION if len(vector) != expected_dim: error_msg = f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}" logger.error(error_msg) raise ValueError(error_msg) PineconeService._index.upsert( vectors=[{ "id": str(mentor_id), "values": vector, "metadata": metadata }] ) logger.info(f"Mentor {mentor_id} upserted successfully") return True except Exception as e: logger.error(f"Failed to upsert mentor {mentor_id}: {str(e)}") raise def upsert_mentors_batch( self, vectors: List[Dict[str, Any]] ) -> bool: try: PineconeService._index.upsert(vectors=vectors) logger.info(f"Batch upserted {len(vectors)} mentors") return True except Exception as e: logger.error(f"Failed to batch upsert mentors: {str(e)}") raise def query_similar( self, query_vector: List[float], top_k: int = 30, filter: Optional[Dict[str, Any]] = None, include_metadata: bool = True ) -> List[Dict[str, Any]]: try: start_time = time.perf_counter() settings = get_settings() expected_dim = settings.PINECONE_DIMENSION if len(query_vector) != expected_dim: error_msg = f"Query vector dimension mismatch: expected {expected_dim}, got {len(query_vector)}" logger.error(f"[PINECONE] {error_msg}") raise ValueError(error_msg) logger.info(f"[PINECONE] Querying similar mentors: top_k={top_k}, filter={filter}") query_response = PineconeService._index.query( vector=query_vector, top_k=top_k, filter=filter, include_metadata=include_metadata ) query_time = time.perf_counter() - start_time results = [] for idx, match in enumerate(query_response.matches, 1): mentor_data = { "mentor_id": match.id, "score": match.score, "metadata": match.metadata if include_metadata else None } results.append(mentor_data) if include_metadata and match.metadata: metadata = match.metadata logger.info( f"[PINECONE] Result #{idx}: mentor_id={match.id}, " f"score={match.score:.4f}, " f"rating={metadata.get('rating', 'N/A')}, " f"total_ratings={metadata.get('total_ratings', 0)}, " f"session_count={metadata.get('session_count', 0)}, " f"status={metadata.get('status', 'N/A')}, " f"career_id={metadata.get('career_id', 'N/A')}, " f"skill_ids={metadata.get('skill_ids', [])}, " f"domain_ids={metadata.get('domain_ids', [])}, " f"has_mentor_text={'mentor_text' in metadata}" ) else: logger.info(f"[PINECONE] Result #{idx}: mentor_id={match.id}, score={match.score:.4f}") logger.info(f"[PINECONE] Query completed in {query_time:.3f}s: found {len(results)} results") if results: scores = [r["score"] for r in results] logger.info( f"[PINECONE] Score statistics: min={min(scores):.4f}, " f"max={max(scores):.4f}, avg={sum(scores)/len(scores):.4f}" ) return results except Exception as e: logger.error(f"[PINECONE] Failed to query similar mentors: {str(e)}", exc_info=True) raise def delete_mentor(self, mentor_id: str) -> bool: try: PineconeService._index.delete(ids=[str(mentor_id)]) logger.info(f"Mentor {mentor_id} deleted successfully") return True except Exception as e: logger.error(f"Failed to delete mentor {mentor_id}: {str(e)}") raise def delete_mentors_batch(self, mentor_ids: List[str]) -> bool: try: PineconeService._index.delete(ids=[str(id) for id in mentor_ids]) logger.info(f"Batch deleted {len(mentor_ids)} mentors") return True except Exception as e: logger.error(f"Failed to batch delete mentors: {str(e)}") raise def get_index_stats(self) -> Dict[str, Any]: try: stats = PineconeService._index.describe_index_stats() return { "total_vectors": stats.total_vector_count, "dimension": stats.dimension, "index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else None } except Exception as e: logger.error(f"Failed to get index stats: {str(e)}") raise