ai-service / services /pinecone_service.py
Doanh Van Vu
Update embedding model to Vietnamese_Embedding and adjust related configurations. Replace FlagEmbedding with SentenceTransformer in embedding service, and ensure dimension checks for vectors in Pinecone service. Update requirements to reflect new dependencies.
b1f36a0
from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional, Any
import logging
from config.settings import get_settings
logger = logging.getLogger(__name__)
class PineconeService:
_instance = None
_client = None
_index = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(PineconeService, cls).__new__(cls)
return cls._instance
def __init__(self):
if PineconeService._client is None:
self._initialize()
def _initialize(self):
settings = get_settings()
if not settings.PINECONE_API_KEY:
raise ValueError("PINECONE_API_KEY is required")
try:
PineconeService._client = Pinecone(api_key=settings.PINECONE_API_KEY)
index_name = settings.PINECONE_INDEX
existing_indexes = [idx.name for idx in PineconeService._client.list_indexes()]
if index_name not in existing_indexes:
logger.info(f"Creating Pinecone index: {index_name}")
PineconeService._client.create_index(
name=index_name,
dimension=settings.PINECONE_DIMENSION,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=settings.PINECONE_ENVIRONMENT
)
)
logger.info(f"Index {index_name} created successfully")
PineconeService._index = PineconeService._client.Index(index_name)
logger.info(f"Connected to Pinecone index: {index_name}")
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {str(e)}")
raise
def upsert_mentor(
self,
mentor_id: str,
vector: List[float],
metadata: Dict[str, Any]
) -> bool:
try:
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(vector) != expected_dim:
error_msg = f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}"
logger.error(error_msg)
raise ValueError(error_msg)
PineconeService._index.upsert(
vectors=[{
"id": str(mentor_id),
"values": vector,
"metadata": metadata
}]
)
logger.info(f"Mentor {mentor_id} upserted successfully")
return True
except Exception as e:
logger.error(f"Failed to upsert mentor {mentor_id}: {str(e)}")
raise
def upsert_mentors_batch(
self,
vectors: List[Dict[str, Any]]
) -> bool:
try:
PineconeService._index.upsert(vectors=vectors)
logger.info(f"Batch upserted {len(vectors)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch upsert mentors: {str(e)}")
raise
def query_similar(
self,
query_vector: List[float],
top_k: int = 30,
filter: Optional[Dict[str, Any]] = None,
include_metadata: bool = True
) -> List[Dict[str, Any]]:
try:
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(query_vector) != expected_dim:
error_msg = f"Query vector dimension mismatch: expected {expected_dim}, got {len(query_vector)}"
logger.error(error_msg)
raise ValueError(error_msg)
query_response = PineconeService._index.query(
vector=query_vector,
top_k=top_k,
filter=filter,
include_metadata=include_metadata
)
results = []
for match in query_response.matches:
results.append({
"mentor_id": match.id,
"score": match.score,
"metadata": match.metadata if include_metadata else None
})
return results
except Exception as e:
logger.error(f"Failed to query similar mentors: {str(e)}")
raise
def delete_mentor(self, mentor_id: str) -> bool:
try:
PineconeService._index.delete(ids=[str(mentor_id)])
logger.info(f"Mentor {mentor_id} deleted successfully")
return True
except Exception as e:
logger.error(f"Failed to delete mentor {mentor_id}: {str(e)}")
raise
def delete_mentors_batch(self, mentor_ids: List[str]) -> bool:
try:
PineconeService._index.delete(ids=[str(id) for id in mentor_ids])
logger.info(f"Batch deleted {len(mentor_ids)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch delete mentors: {str(e)}")
raise
def get_index_stats(self) -> Dict[str, Any]:
try:
stats = PineconeService._index.describe_index_stats()
return {
"total_vectors": stats.total_vector_count,
"dimension": stats.dimension,
"index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else None
}
except Exception as e:
logger.error(f"Failed to get index stats: {str(e)}")
raise