mentorme / services /pinecone_service.py
Doanh Van Vu
Refactor mentor availability handling across services
69934b0
from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional, Any
import logging
import time
from config.settings import get_settings
logger = logging.getLogger(__name__)
class PineconeService:
_instance = None
_client = None
_index = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(PineconeService, cls).__new__(cls)
return cls._instance
def __init__(self):
if PineconeService._client is None:
self._initialize()
def _initialize(self):
settings = get_settings()
if not settings.PINECONE_API_KEY:
raise ValueError("PINECONE_API_KEY is required")
try:
PineconeService._client = Pinecone(api_key=settings.PINECONE_API_KEY)
index_name = settings.PINECONE_INDEX
existing_indexes = [idx.name for idx in PineconeService._client.list_indexes()]
if index_name not in existing_indexes:
logger.info(f"Creating Pinecone index: {index_name}")
PineconeService._client.create_index(
name=index_name,
dimension=settings.PINECONE_DIMENSION,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=settings.PINECONE_ENVIRONMENT
)
)
logger.info(f"Index {index_name} created successfully")
PineconeService._index = PineconeService._client.Index(index_name)
logger.info(f"Connected to Pinecone index: {index_name}")
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {str(e)}")
raise
def upsert_mentor(
self,
mentor_id: str,
vector: List[float],
metadata: Dict[str, Any]
) -> bool:
try:
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(vector) != expected_dim:
error_msg = f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}"
logger.error(error_msg)
raise ValueError(error_msg)
PineconeService._index.upsert(
vectors=[{
"id": str(mentor_id),
"values": vector,
"metadata": metadata
}]
)
logger.info(f"Mentor {mentor_id} upserted successfully")
return True
except Exception as e:
logger.error(f"Failed to upsert mentor {mentor_id}: {str(e)}")
raise
def upsert_mentors_batch(
self,
vectors: List[Dict[str, Any]]
) -> bool:
try:
PineconeService._index.upsert(vectors=vectors)
logger.info(f"Batch upserted {len(vectors)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch upsert mentors: {str(e)}")
raise
def query_similar(
self,
query_vector: List[float],
top_k: int = 30,
filter: Optional[Dict[str, Any]] = None,
include_metadata: bool = True
) -> List[Dict[str, Any]]:
try:
start_time = time.perf_counter()
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(query_vector) != expected_dim:
error_msg = f"Query vector dimension mismatch: expected {expected_dim}, got {len(query_vector)}"
logger.error(f"[PINECONE] {error_msg}")
raise ValueError(error_msg)
logger.info(f"[PINECONE] Querying similar mentors: top_k={top_k}, filter={filter}")
query_response = PineconeService._index.query(
vector=query_vector,
top_k=top_k,
filter=filter,
include_metadata=include_metadata
)
query_time = time.perf_counter() - start_time
results = []
for idx, match in enumerate(query_response.matches, 1):
mentor_data = {
"mentor_id": match.id,
"score": match.score,
"metadata": match.metadata if include_metadata else None
}
results.append(mentor_data)
if include_metadata and match.metadata:
metadata = match.metadata
logger.info(
f"[PINECONE] Result #{idx}: mentor_id={match.id}, "
f"score={match.score:.4f}, "
f"rating={metadata.get('rating', 'N/A')}, "
f"total_ratings={metadata.get('total_ratings', 0)}, "
f"session_count={metadata.get('session_count', 0)}, "
f"status={metadata.get('status', 'N/A')}, "
f"career_id={metadata.get('career_id', 'N/A')}, "
f"skill_ids={metadata.get('skill_ids', [])}, "
f"domain_ids={metadata.get('domain_ids', [])}, "
f"has_mentor_text={'mentor_text' in metadata}"
)
else:
logger.info(f"[PINECONE] Result #{idx}: mentor_id={match.id}, score={match.score:.4f}")
logger.info(f"[PINECONE] Query completed in {query_time:.3f}s: found {len(results)} results")
if results:
scores = [r["score"] for r in results]
logger.info(
f"[PINECONE] Score statistics: min={min(scores):.4f}, "
f"max={max(scores):.4f}, avg={sum(scores)/len(scores):.4f}"
)
return results
except Exception as e:
logger.error(f"[PINECONE] Failed to query similar mentors: {str(e)}", exc_info=True)
raise
def delete_mentor(self, mentor_id: str) -> bool:
try:
PineconeService._index.delete(ids=[str(mentor_id)])
logger.info(f"Mentor {mentor_id} deleted successfully")
return True
except Exception as e:
logger.error(f"Failed to delete mentor {mentor_id}: {str(e)}")
raise
def delete_mentors_batch(self, mentor_ids: List[str]) -> bool:
try:
PineconeService._index.delete(ids=[str(id) for id in mentor_ids])
logger.info(f"Batch deleted {len(mentor_ids)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch delete mentors: {str(e)}")
raise
def get_index_stats(self) -> Dict[str, Any]:
try:
stats = PineconeService._index.describe_index_stats()
return {
"total_vectors": stats.total_vector_count,
"dimension": stats.dimension,
"index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else None
}
except Exception as e:
logger.error(f"Failed to get index stats: {str(e)}")
raise