import time import logging from pinecone import Pinecone, ServerlessSpec from langchain_pinecone import PineconeVectorStore logger = logging.getLogger(__name__) class PineconeManager: def __init__(self, api_key: str): if not api_key: raise ValueError("Pinecone API Key is missing.") self.pc = Pinecone(api_key=api_key) def list_indexes(self): """Returns a list of all index names.""" try: return [i.name for i in self.pc.list_indexes()] except Exception as e: logger.error(f"Error listing indexes: {e}") return [] def check_dimension_compatibility(self, index_name: str, target_dim: int = 768) -> bool: """ SAFETY MECHANISM: Ensures the Index dimension matches the Model. all-mpnet-base-v2 output is 768. """ try: idx_info = self.pc.describe_index(index_name) idx_dim = int(idx_info.dimension) if idx_dim != target_dim: logger.warning(f"Dimension Mismatch! Index: {idx_dim}, Model: {target_dim}") return False return True except Exception as e: logger.error(f"Error checking dimension: {e}") return False def create_index(self, index_name: str, dimension: int = 384, metric: str = "cosine"): """Creates a new Serverless Index with a wait loop.""" existing = self.list_indexes() if index_name in existing: return True, "Index already exists." try: self.pc.create_index( name=index_name, dimension=dimension, metric=metric, spec=ServerlessSpec(cloud="aws", region="us-east-1") ) # Wait for initialization while not self.pc.describe_index(index_name).status['ready']: time.sleep(1) return True, f"Index {index_name} created successfully." except Exception as e: return False, str(e) def get_vectorstore(self, index_name: str, embedding_function, namespace: str): """Returns the LangChain VectorStore object.""" return PineconeVectorStore( index_name=index_name, embedding=embedding_function, namespace=namespace ) # --- THE FIX: SEARCH & DESTROY --- def delete_file(self, index_name, source_filename, namespace): """ Robust deletion that works on Starter & Serverless indexes. 1. Fetches IDs associated with the file. 2. Deletes those specific IDs. """ try: index = self.pc.Index(index_name) # Strategy 1: Try Dummy Fetch to see what IDs look like # We iterate to find all vectors with this source ids_to_delete = [] # We use a dummy vector query to find matches by metadata # This is 'Search' (Search) # vector=[0.1]*dim is just a dummy to satisfy the API dummy_vec = [0.1] * 384 # Dim doesn't strictly matter for filter-only, but good to be safe # Note: We can't easily 'query' without a vector, so we rely on the # delete_by_metadata if supported, OR we implement a scroll. # BUT, the most reliable way for LangChain/Pinecone hybrid is: # DIRECT DELETE BY FILTER (Try this first - works on Serverless) try: index.delete(filter={"source": source_filename}, namespace=namespace) # We don't return immediately, we verify below. except Exception as e: print(f"Metadata delete failed (expected on Starter tier): {e}") # Strategy 2: "The Clean Sweep" (Iterator) # If the above didn't catch them (or silently failed), we manually hunt them. # We look for the standard ID prefixes used by our app. # Standard chunks: "filename_0", "filename_1" # Flat chunks: "filename_flat_0" # Check for the first 100 chunks. If found, delete. # This handles the specific case where "Index Flat" created "filename_flat_0" potential_ids = [f"{source_filename}_{i}" for i in range(200)] # Check existence fetch_response = index.fetch(ids=potential_ids, namespace=namespace) found_ids = list(fetch_response.vectors.keys()) if found_ids: index.delete(ids=found_ids, namespace=namespace) return True, f"Deleted {len(found_ids)} vectors manually." return True, "Delete signal sent." except Exception as e: print(f"Delete failed: {e}") return False, str(e) # --- HELPER FOR RESYNC --- def get_all_ids(self, index_name, namespace): # This helper iterates via list_paginated (if available) or dummy query try: index = self.pc.Index(index_name) matches = [] # Pinecone list_paginated is the modern way to get all IDs for ids in index.list(namespace=namespace): matches.extend(ids) return matches except Exception as e: # Fallback for older clients print(f"List IDs failed: {e}") return [] def fetch_vectors(self, index_name: str, ids: list, namespace: str): """ Retrieves the actual data (metadata + text) for a list of IDs. """ try: idx = self.pc.Index(index_name) # Fetch has a limit of 1000 items per call usually, so we batch if needed # For simplicity in this specific app, simple fetch is okay for now return idx.fetch(ids=ids, namespace=namespace) except Exception as e: logger.error(f"Error fetching vectors: {e}") return {}