Spaces:
Runtime error
Runtime error
| from llama_index.core import VectorStoreIndex | |
| from llama_index.core import StorageContext | |
| # from llama_index.core import Settings | |
| from pinecone import Pinecone, ServerlessSpec | |
| from llama_index.vector_stores.pinecone import PineconeVectorStore | |
| from fastapi import HTTPException, status | |
| from fastapi.responses import JSONResponse | |
| from config import PINECONE_CONFIG | |
| from math import ceil | |
| import numpy as np | |
| import logging | |
| class IndexManager: | |
| # def __init__(self, index_name: str = "chatbook-bot"): | |
| def __init__(self, index_name: str = "multimodal-index"): | |
| self.vector_index = None | |
| self.index_name = index_name | |
| self.client = self._get_pinecone_client() | |
| self.pinecone_index = self._create_pinecone_index() | |
| def _get_pinecone_client(self): | |
| """Initialize and return the Pinecone client.""" | |
| # api_key = os.getenv("PINECONE_API_KEY") | |
| api_key = PINECONE_CONFIG.PINECONE_API_KEY | |
| if not api_key: | |
| raise ValueError( | |
| "Pinecone API key is missing. Please set it in environment variables." | |
| ) | |
| return Pinecone(api_key=api_key) | |
| def _create_pinecone_index(self): | |
| """Create Pinecone index if it doesn't already exist.""" | |
| if self.index_name not in self.client.list_indexes().names(): | |
| self.client.create_index( | |
| name=self.index_name, | |
| dimension=3072, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| return self.client.Index(self.index_name) | |
| def _initialize_vector_store(self) -> StorageContext: | |
| """Initialize and return the vector store with the Pinecone index.""" | |
| vector_store = PineconeVectorStore(pinecone_index=self.pinecone_index) | |
| return StorageContext.from_defaults(vector_store=vector_store) | |
| def build_indexes(self, nodes): | |
| """Build vector and tree indexes from nodes.""" | |
| try: | |
| storage_context = self._initialize_vector_store() | |
| self.vector_index = VectorStoreIndex(nodes, storage_context=storage_context) | |
| except HTTPException as http_exc: | |
| raise http_exc # Re-return JSONResponses to ensure FastAPI handles them | |
| except Exception as e: | |
| print("Error building index : ",e) | |
| raise JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content=f"Error loading existing indexes: {str(e)}" | |
| ) | |
| def get_ids_from_query(self, input_vector, title): | |
| print("Searching Pinecone...") | |
| print(title) | |
| new_ids = set() # Initialize new_ids outside the loop | |
| while True: | |
| results = self.pinecone_index.query( | |
| vector=input_vector, | |
| top_k=10000, | |
| filter={ | |
| "title": {"$eq": f"{title}"}, | |
| }, | |
| ) | |
| ids = set() | |
| for result in results['matches']: | |
| ids.add(result['id']) | |
| # Check if there's any overlap between ids and new_ids | |
| if ids.issubset(new_ids): | |
| break | |
| else: | |
| new_ids.update(ids) # Add all new ids to new_ids | |
| return new_ids | |
| def get_all_ids_from_index(self, title): | |
| num_dimensions = 1536 | |
| num_vectors = self.pinecone_index.describe_index_stats( | |
| )["total_vector_count"] | |
| input_vector = np.random.rand(num_dimensions).tolist() | |
| ids = self.get_ids_from_query(input_vector, title) | |
| return ids | |
| def delete_vector_database(self, title): | |
| try : | |
| batch_size = 1000 | |
| all_ids = self.get_all_ids_from_index(title) | |
| all_ids = list(all_ids) | |
| # Split ids into chunks of batch_size | |
| num_batches = ceil(len(all_ids) / batch_size) | |
| for i in range(num_batches): | |
| # Fetch a batch of IDs | |
| batch_ids = all_ids[i * batch_size: (i + 1) * batch_size] | |
| self.pinecone_index.delete(ids=batch_ids) | |
| logging.info(f"delete from id {i * batch_size} to {(i + 1) * batch_size} successful") | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content="An error occurred while delete metadata") | |
| def update_vector_database(self, current_reference, new_reference): | |
| reference = new_reference | |
| all_ids = self.get_all_ids_from_index(current_reference['title']) | |
| all_ids = list(all_ids) | |
| for id in all_ids: | |
| self.pinecone_index.update( | |
| id=id, | |
| set_metadata=reference | |
| ) | |
| def load_existing_indexes(self): | |
| """Load existing indexes from Pinecone.""" | |
| try: | |
| client = self._get_pinecone_client() | |
| pinecone_index = client.Index(self.index_name) | |
| vector_store = PineconeVectorStore(pinecone_index=pinecone_index) | |
| retriever = VectorStoreIndex.from_vector_store(vector_store) | |
| return retriever | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content=f"Error loading existing indexes: {str(e)}" | |
| ) | |