Spaces:
Sleeping
Sleeping
| from pinecone import Pinecone, ServerlessSpec | |
| from typing import List, Dict, Optional, Any | |
| import logging | |
| import time | |
| from config.settings import get_settings | |
| logger = logging.getLogger(__name__) | |
| class PineconeService: | |
| _instance = None | |
| _client = None | |
| _index = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(PineconeService, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if PineconeService._client is None: | |
| self._initialize() | |
| def _initialize(self): | |
| settings = get_settings() | |
| if not settings.PINECONE_API_KEY: | |
| raise ValueError("PINECONE_API_KEY is required") | |
| try: | |
| PineconeService._client = Pinecone(api_key=settings.PINECONE_API_KEY) | |
| index_name = settings.PINECONE_INDEX | |
| existing_indexes = [idx.name for idx in PineconeService._client.list_indexes()] | |
| if index_name not in existing_indexes: | |
| logger.info(f"Creating Pinecone index: {index_name}") | |
| PineconeService._client.create_index( | |
| name=index_name, | |
| dimension=settings.PINECONE_DIMENSION, | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud="aws", | |
| region=settings.PINECONE_ENVIRONMENT | |
| ) | |
| ) | |
| logger.info(f"Index {index_name} created successfully") | |
| PineconeService._index = PineconeService._client.Index(index_name) | |
| logger.info(f"Connected to Pinecone index: {index_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Pinecone: {str(e)}") | |
| raise | |
| def upsert_mentor( | |
| self, | |
| mentor_id: str, | |
| vector: List[float], | |
| metadata: Dict[str, Any] | |
| ) -> bool: | |
| try: | |
| settings = get_settings() | |
| expected_dim = settings.PINECONE_DIMENSION | |
| if len(vector) != expected_dim: | |
| error_msg = f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| PineconeService._index.upsert( | |
| vectors=[{ | |
| "id": str(mentor_id), | |
| "values": vector, | |
| "metadata": metadata | |
| }] | |
| ) | |
| logger.info(f"Mentor {mentor_id} upserted successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to upsert mentor {mentor_id}: {str(e)}") | |
| raise | |
| def upsert_mentors_batch( | |
| self, | |
| vectors: List[Dict[str, Any]] | |
| ) -> bool: | |
| try: | |
| PineconeService._index.upsert(vectors=vectors) | |
| logger.info(f"Batch upserted {len(vectors)} mentors") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to batch upsert mentors: {str(e)}") | |
| raise | |
| def query_similar( | |
| self, | |
| query_vector: List[float], | |
| top_k: int = 30, | |
| filter: Optional[Dict[str, Any]] = None, | |
| include_metadata: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| try: | |
| start_time = time.perf_counter() | |
| settings = get_settings() | |
| expected_dim = settings.PINECONE_DIMENSION | |
| if len(query_vector) != expected_dim: | |
| error_msg = f"Query vector dimension mismatch: expected {expected_dim}, got {len(query_vector)}" | |
| logger.error(f"[PINECONE] {error_msg}") | |
| raise ValueError(error_msg) | |
| logger.info(f"[PINECONE] Querying similar mentors: top_k={top_k}, filter={filter}") | |
| query_response = PineconeService._index.query( | |
| vector=query_vector, | |
| top_k=top_k, | |
| filter=filter, | |
| include_metadata=include_metadata | |
| ) | |
| query_time = time.perf_counter() - start_time | |
| results = [] | |
| for idx, match in enumerate(query_response.matches, 1): | |
| mentor_data = { | |
| "mentor_id": match.id, | |
| "score": match.score, | |
| "metadata": match.metadata if include_metadata else None | |
| } | |
| results.append(mentor_data) | |
| if include_metadata and match.metadata: | |
| metadata = match.metadata | |
| logger.info( | |
| f"[PINECONE] Result #{idx}: mentor_id={match.id}, " | |
| f"score={match.score:.4f}, " | |
| f"rating={metadata.get('rating', 'N/A')}, " | |
| f"total_ratings={metadata.get('total_ratings', 0)}, " | |
| f"session_count={metadata.get('session_count', 0)}, " | |
| f"status={metadata.get('status', 'N/A')}, " | |
| f"career_id={metadata.get('career_id', 'N/A')}, " | |
| f"skill_ids={metadata.get('skill_ids', [])}, " | |
| f"domain_ids={metadata.get('domain_ids', [])}, " | |
| f"has_mentor_text={'mentor_text' in metadata}" | |
| ) | |
| else: | |
| logger.info(f"[PINECONE] Result #{idx}: mentor_id={match.id}, score={match.score:.4f}") | |
| logger.info(f"[PINECONE] Query completed in {query_time:.3f}s: found {len(results)} results") | |
| if results: | |
| scores = [r["score"] for r in results] | |
| logger.info( | |
| f"[PINECONE] Score statistics: min={min(scores):.4f}, " | |
| f"max={max(scores):.4f}, avg={sum(scores)/len(scores):.4f}" | |
| ) | |
| return results | |
| except Exception as e: | |
| logger.error(f"[PINECONE] Failed to query similar mentors: {str(e)}", exc_info=True) | |
| raise | |
| def delete_mentor(self, mentor_id: str) -> bool: | |
| try: | |
| PineconeService._index.delete(ids=[str(mentor_id)]) | |
| logger.info(f"Mentor {mentor_id} deleted successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete mentor {mentor_id}: {str(e)}") | |
| raise | |
| def delete_mentors_batch(self, mentor_ids: List[str]) -> bool: | |
| try: | |
| PineconeService._index.delete(ids=[str(id) for id in mentor_ids]) | |
| logger.info(f"Batch deleted {len(mentor_ids)} mentors") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to batch delete mentors: {str(e)}") | |
| raise | |
| def get_index_stats(self) -> Dict[str, Any]: | |
| try: | |
| stats = PineconeService._index.describe_index_stats() | |
| return { | |
| "total_vectors": stats.total_vector_count, | |
| "dimension": stats.dimension, | |
| "index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else None | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get index stats: {str(e)}") | |
| raise | |