Spaces:
Running
Running
| import time | |
| import logging | |
| from pinecone import Pinecone, ServerlessSpec | |
| from langchain_pinecone import PineconeVectorStore | |
| logger = logging.getLogger(__name__) | |
| class PineconeManager: | |
| def __init__(self, api_key: str): | |
| if not api_key: | |
| raise ValueError("Pinecone API Key is missing.") | |
| self.pc = Pinecone(api_key=api_key) | |
| def list_indexes(self): | |
| """Returns a list of all index names.""" | |
| try: | |
| return [i.name for i in self.pc.list_indexes()] | |
| except Exception as e: | |
| logger.error(f"Error listing indexes: {e}") | |
| return [] | |
| def check_dimension_compatibility(self, index_name: str, target_dim: int = 768) -> bool: | |
| """ | |
| SAFETY MECHANISM: Ensures the Index dimension matches the Model. | |
| all-mpnet-base-v2 output is 768. | |
| """ | |
| try: | |
| idx_info = self.pc.describe_index(index_name) | |
| idx_dim = int(idx_info.dimension) | |
| if idx_dim != target_dim: | |
| logger.warning(f"Dimension Mismatch! Index: {idx_dim}, Model: {target_dim}") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error checking dimension: {e}") | |
| return False | |
| def create_index(self, index_name: str, dimension: int = 384, metric: str = "cosine"): | |
| """Creates a new Serverless Index with a wait loop.""" | |
| existing = self.list_indexes() | |
| if index_name in existing: | |
| return True, "Index already exists." | |
| try: | |
| self.pc.create_index( | |
| name=index_name, | |
| dimension=dimension, | |
| metric=metric, | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") | |
| ) | |
| # Wait for initialization | |
| while not self.pc.describe_index(index_name).status['ready']: | |
| time.sleep(1) | |
| return True, f"Index {index_name} created successfully." | |
| except Exception as e: | |
| return False, str(e) | |
| def get_vectorstore(self, index_name: str, embedding_function, namespace: str): | |
| """Returns the LangChain VectorStore object.""" | |
| return PineconeVectorStore( | |
| index_name=index_name, | |
| embedding=embedding_function, | |
| namespace=namespace | |
| ) | |
| # --- THE FIX: SEARCH & DESTROY --- | |
| def delete_file(self, index_name, source_filename, namespace): | |
| """ | |
| Robust deletion that works on Starter & Serverless indexes. | |
| 1. Fetches IDs associated with the file. | |
| 2. Deletes those specific IDs. | |
| """ | |
| try: | |
| index = self.pc.Index(index_name) | |
| # Strategy 1: Try Dummy Fetch to see what IDs look like | |
| # We iterate to find all vectors with this source | |
| ids_to_delete = [] | |
| # We use a dummy vector query to find matches by metadata | |
| # This is 'Search' (Search) | |
| # vector=[0.1]*dim is just a dummy to satisfy the API | |
| dummy_vec = [0.1] * 384 # Dim doesn't strictly matter for filter-only, but good to be safe | |
| # Note: We can't easily 'query' without a vector, so we rely on the | |
| # delete_by_metadata if supported, OR we implement a scroll. | |
| # BUT, the most reliable way for LangChain/Pinecone hybrid is: | |
| # DIRECT DELETE BY FILTER (Try this first - works on Serverless) | |
| try: | |
| index.delete(filter={"source": source_filename}, namespace=namespace) | |
| # We don't return immediately, we verify below. | |
| except Exception as e: | |
| print(f"Metadata delete failed (expected on Starter tier): {e}") | |
| # Strategy 2: "The Clean Sweep" (Iterator) | |
| # If the above didn't catch them (or silently failed), we manually hunt them. | |
| # We look for the standard ID prefixes used by our app. | |
| # Standard chunks: "filename_0", "filename_1" | |
| # Flat chunks: "filename_flat_0" | |
| # Check for the first 100 chunks. If found, delete. | |
| # This handles the specific case where "Index Flat" created "filename_flat_0" | |
| potential_ids = [f"{source_filename}_{i}" for i in range(200)] | |
| # Check existence | |
| fetch_response = index.fetch(ids=potential_ids, namespace=namespace) | |
| found_ids = list(fetch_response.vectors.keys()) | |
| if found_ids: | |
| index.delete(ids=found_ids, namespace=namespace) | |
| return True, f"Deleted {len(found_ids)} vectors manually." | |
| return True, "Delete signal sent." | |
| except Exception as e: | |
| print(f"Delete failed: {e}") | |
| return False, str(e) | |
| # --- HELPER FOR RESYNC --- | |
| def get_all_ids(self, index_name, namespace): | |
| # This helper iterates via list_paginated (if available) or dummy query | |
| try: | |
| index = self.pc.Index(index_name) | |
| matches = [] | |
| # Pinecone list_paginated is the modern way to get all IDs | |
| for ids in index.list(namespace=namespace): | |
| matches.extend(ids) | |
| return matches | |
| except Exception as e: | |
| # Fallback for older clients | |
| print(f"List IDs failed: {e}") | |
| return [] | |
| def fetch_vectors(self, index_name: str, ids: list, namespace: str): | |
| """ | |
| Retrieves the actual data (metadata + text) for a list of IDs. | |
| """ | |
| try: | |
| idx = self.pc.Index(index_name) | |
| # Fetch has a limit of 1000 items per call usually, so we batch if needed | |
| # For simplicity in this specific app, simple fetch is okay for now | |
| return idx.fetch(ids=ids, namespace=namespace) | |
| except Exception as e: | |
| logger.error(f"Error fetching vectors: {e}") | |
| return {} |