AI_Toolkit / src /core /PineconeManager.py
NavyDevilDoc's picture
Update src/core/PineconeManager.py
83af5a3 verified
import time
import logging
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
logger = logging.getLogger(__name__)
class PineconeManager:
def __init__(self, api_key: str):
if not api_key:
raise ValueError("Pinecone API Key is missing.")
self.pc = Pinecone(api_key=api_key)
def list_indexes(self):
"""Returns a list of all index names."""
try:
return [i.name for i in self.pc.list_indexes()]
except Exception as e:
logger.error(f"Error listing indexes: {e}")
return []
def check_dimension_compatibility(self, index_name: str, target_dim: int = 768) -> bool:
"""
SAFETY MECHANISM: Ensures the Index dimension matches the Model.
all-mpnet-base-v2 output is 768.
"""
try:
idx_info = self.pc.describe_index(index_name)
idx_dim = int(idx_info.dimension)
if idx_dim != target_dim:
logger.warning(f"Dimension Mismatch! Index: {idx_dim}, Model: {target_dim}")
return False
return True
except Exception as e:
logger.error(f"Error checking dimension: {e}")
return False
def create_index(self, index_name: str, dimension: int = 384, metric: str = "cosine"):
"""Creates a new Serverless Index with a wait loop."""
existing = self.list_indexes()
if index_name in existing:
return True, "Index already exists."
try:
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# Wait for initialization
while not self.pc.describe_index(index_name).status['ready']:
time.sleep(1)
return True, f"Index {index_name} created successfully."
except Exception as e:
return False, str(e)
def get_vectorstore(self, index_name: str, embedding_function, namespace: str):
"""Returns the LangChain VectorStore object."""
return PineconeVectorStore(
index_name=index_name,
embedding=embedding_function,
namespace=namespace
)
# --- THE FIX: SEARCH & DESTROY ---
def delete_file(self, index_name, source_filename, namespace):
"""
Robust deletion that works on Starter & Serverless indexes.
1. Fetches IDs associated with the file.
2. Deletes those specific IDs.
"""
try:
index = self.pc.Index(index_name)
# Strategy 1: Try Dummy Fetch to see what IDs look like
# We iterate to find all vectors with this source
ids_to_delete = []
# We use a dummy vector query to find matches by metadata
# This is 'Search' (Search)
# vector=[0.1]*dim is just a dummy to satisfy the API
dummy_vec = [0.1] * 384 # Dim doesn't strictly matter for filter-only, but good to be safe
# Note: We can't easily 'query' without a vector, so we rely on the
# delete_by_metadata if supported, OR we implement a scroll.
# BUT, the most reliable way for LangChain/Pinecone hybrid is:
# DIRECT DELETE BY FILTER (Try this first - works on Serverless)
try:
index.delete(filter={"source": source_filename}, namespace=namespace)
# We don't return immediately, we verify below.
except Exception as e:
print(f"Metadata delete failed (expected on Starter tier): {e}")
# Strategy 2: "The Clean Sweep" (Iterator)
# If the above didn't catch them (or silently failed), we manually hunt them.
# We look for the standard ID prefixes used by our app.
# Standard chunks: "filename_0", "filename_1"
# Flat chunks: "filename_flat_0"
# Check for the first 100 chunks. If found, delete.
# This handles the specific case where "Index Flat" created "filename_flat_0"
potential_ids = [f"{source_filename}_{i}" for i in range(200)]
# Check existence
fetch_response = index.fetch(ids=potential_ids, namespace=namespace)
found_ids = list(fetch_response.vectors.keys())
if found_ids:
index.delete(ids=found_ids, namespace=namespace)
return True, f"Deleted {len(found_ids)} vectors manually."
return True, "Delete signal sent."
except Exception as e:
print(f"Delete failed: {e}")
return False, str(e)
# --- HELPER FOR RESYNC ---
def get_all_ids(self, index_name, namespace):
# This helper iterates via list_paginated (if available) or dummy query
try:
index = self.pc.Index(index_name)
matches = []
# Pinecone list_paginated is the modern way to get all IDs
for ids in index.list(namespace=namespace):
matches.extend(ids)
return matches
except Exception as e:
# Fallback for older clients
print(f"List IDs failed: {e}")
return []
def fetch_vectors(self, index_name: str, ids: list, namespace: str):
"""
Retrieves the actual data (metadata + text) for a list of IDs.
"""
try:
idx = self.pc.Index(index_name)
# Fetch has a limit of 1000 items per call usually, so we batch if needed
# For simplicity in this specific app, simple fetch is okay for now
return idx.fetch(ids=ids, namespace=namespace)
except Exception as e:
logger.error(f"Error fetching vectors: {e}")
return {}