Spaces:
Sleeping
Sleeping
File size: 7,410 Bytes
1904012 6a14fa9 1904012 6a14fa9 1904012 6a14fa9 1904012 6a14fa9 1904012 6a14fa9 1904012 ff5d801 1904012 ff5d801 1904012 6a14fa9 ff5d801 1904012 6a14fa9 1904012 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional, Any
import logging
import time
from config.settings import get_settings
logger = logging.getLogger(__name__)
class PineconeService:
_instance = None
_client = None
_index = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(PineconeService, cls).__new__(cls)
return cls._instance
def __init__(self):
if PineconeService._client is None:
self._initialize()
def _initialize(self):
settings = get_settings()
if not settings.PINECONE_API_KEY:
raise ValueError("PINECONE_API_KEY is required")
try:
PineconeService._client = Pinecone(api_key=settings.PINECONE_API_KEY)
index_name = settings.PINECONE_INDEX
existing_indexes = [idx.name for idx in PineconeService._client.list_indexes()]
if index_name not in existing_indexes:
logger.info(f"Creating Pinecone index: {index_name}")
PineconeService._client.create_index(
name=index_name,
dimension=settings.PINECONE_DIMENSION,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=settings.PINECONE_ENVIRONMENT
)
)
logger.info(f"Index {index_name} created successfully")
PineconeService._index = PineconeService._client.Index(index_name)
logger.info(f"Connected to Pinecone index: {index_name}")
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {str(e)}")
raise
def upsert_mentor(
self,
mentor_id: str,
vector: List[float],
metadata: Dict[str, Any]
) -> bool:
try:
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(vector) != expected_dim:
error_msg = f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}"
logger.error(error_msg)
raise ValueError(error_msg)
PineconeService._index.upsert(
vectors=[{
"id": str(mentor_id),
"values": vector,
"metadata": metadata
}]
)
logger.info(f"Mentor {mentor_id} upserted successfully")
return True
except Exception as e:
logger.error(f"Failed to upsert mentor {mentor_id}: {str(e)}")
raise
def upsert_mentors_batch(
self,
vectors: List[Dict[str, Any]]
) -> bool:
try:
PineconeService._index.upsert(vectors=vectors)
logger.info(f"Batch upserted {len(vectors)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch upsert mentors: {str(e)}")
raise
def query_similar(
self,
query_vector: List[float],
top_k: int = 30,
filter: Optional[Dict[str, Any]] = None,
include_metadata: bool = True
) -> List[Dict[str, Any]]:
try:
start_time = time.perf_counter()
settings = get_settings()
expected_dim = settings.PINECONE_DIMENSION
if len(query_vector) != expected_dim:
error_msg = f"Query vector dimension mismatch: expected {expected_dim}, got {len(query_vector)}"
logger.error(f"[PINECONE] {error_msg}")
raise ValueError(error_msg)
logger.info(f"[PINECONE] Querying similar mentors: top_k={top_k}, filter={filter}")
query_response = PineconeService._index.query(
vector=query_vector,
top_k=top_k,
filter=filter,
include_metadata=include_metadata
)
query_time = time.perf_counter() - start_time
results = []
for idx, match in enumerate(query_response.matches, 1):
mentor_data = {
"mentor_id": match.id,
"score": match.score,
"metadata": match.metadata if include_metadata else None
}
results.append(mentor_data)
if include_metadata and match.metadata:
metadata = match.metadata
logger.info(
f"[PINECONE] Result #{idx}: mentor_id={match.id}, "
f"score={match.score:.4f}, "
f"rating={metadata.get('rating', 'N/A')}, "
f"total_ratings={metadata.get('total_ratings', 0)}, "
f"session_count={metadata.get('session_count', 0)}, "
f"status={metadata.get('status', 'N/A')}, "
f"career_id={metadata.get('career_id', 'N/A')}, "
f"skill_ids={metadata.get('skill_ids', [])}, "
f"domain_ids={metadata.get('domain_ids', [])}, "
f"has_mentor_text={'mentor_text' in metadata}"
)
else:
logger.info(f"[PINECONE] Result #{idx}: mentor_id={match.id}, score={match.score:.4f}")
logger.info(f"[PINECONE] Query completed in {query_time:.3f}s: found {len(results)} results")
if results:
scores = [r["score"] for r in results]
logger.info(
f"[PINECONE] Score statistics: min={min(scores):.4f}, "
f"max={max(scores):.4f}, avg={sum(scores)/len(scores):.4f}"
)
return results
except Exception as e:
logger.error(f"[PINECONE] Failed to query similar mentors: {str(e)}", exc_info=True)
raise
def delete_mentor(self, mentor_id: str) -> bool:
try:
PineconeService._index.delete(ids=[str(mentor_id)])
logger.info(f"Mentor {mentor_id} deleted successfully")
return True
except Exception as e:
logger.error(f"Failed to delete mentor {mentor_id}: {str(e)}")
raise
def delete_mentors_batch(self, mentor_ids: List[str]) -> bool:
try:
PineconeService._index.delete(ids=[str(id) for id in mentor_ids])
logger.info(f"Batch deleted {len(mentor_ids)} mentors")
return True
except Exception as e:
logger.error(f"Failed to batch delete mentors: {str(e)}")
raise
def get_index_stats(self) -> Dict[str, Any]:
try:
stats = PineconeService._index.describe_index_stats()
return {
"total_vectors": stats.total_vector_count,
"dimension": stats.dimension,
"index_fullness": stats.index_fullness if hasattr(stats, 'index_fullness') else None
}
except Exception as e:
logger.error(f"Failed to get index stats: {str(e)}")
raise
|