Spaces:
Sleeping
Sleeping
File size: 6,009 Bytes
f642092 f076cab f642092 d1a6634 f642092 f076cab d1a6634 f642092 f076cab f642092 f076cab f642092 83af5a3 f642092 83af5a3 f642092 83af5a3 83d8092 83af5a3 83d8092 83af5a3 83d8092 83af5a3 83d8092 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import time
import logging
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
logger = logging.getLogger(__name__)
class PineconeManager:
def __init__(self, api_key: str):
if not api_key:
raise ValueError("Pinecone API Key is missing.")
self.pc = Pinecone(api_key=api_key)
def list_indexes(self):
"""Returns a list of all index names."""
try:
return [i.name for i in self.pc.list_indexes()]
except Exception as e:
logger.error(f"Error listing indexes: {e}")
return []
def check_dimension_compatibility(self, index_name: str, target_dim: int = 768) -> bool:
"""
SAFETY MECHANISM: Ensures the Index dimension matches the Model.
all-mpnet-base-v2 output is 768.
"""
try:
idx_info = self.pc.describe_index(index_name)
idx_dim = int(idx_info.dimension)
if idx_dim != target_dim:
logger.warning(f"Dimension Mismatch! Index: {idx_dim}, Model: {target_dim}")
return False
return True
except Exception as e:
logger.error(f"Error checking dimension: {e}")
return False
def create_index(self, index_name: str, dimension: int = 384, metric: str = "cosine"):
"""Creates a new Serverless Index with a wait loop."""
existing = self.list_indexes()
if index_name in existing:
return True, "Index already exists."
try:
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# Wait for initialization
while not self.pc.describe_index(index_name).status['ready']:
time.sleep(1)
return True, f"Index {index_name} created successfully."
except Exception as e:
return False, str(e)
def get_vectorstore(self, index_name: str, embedding_function, namespace: str):
"""Returns the LangChain VectorStore object."""
return PineconeVectorStore(
index_name=index_name,
embedding=embedding_function,
namespace=namespace
)
# --- THE FIX: SEARCH & DESTROY ---
def delete_file(self, index_name, source_filename, namespace):
"""
Robust deletion that works on Starter & Serverless indexes.
1. Fetches IDs associated with the file.
2. Deletes those specific IDs.
"""
try:
index = self.pc.Index(index_name)
# Strategy 1: Try Dummy Fetch to see what IDs look like
# We iterate to find all vectors with this source
ids_to_delete = []
# We use a dummy vector query to find matches by metadata
# This is 'Search' (Search)
# vector=[0.1]*dim is just a dummy to satisfy the API
dummy_vec = [0.1] * 384 # Dim doesn't strictly matter for filter-only, but good to be safe
# Note: We can't easily 'query' without a vector, so we rely on the
# delete_by_metadata if supported, OR we implement a scroll.
# BUT, the most reliable way for LangChain/Pinecone hybrid is:
# DIRECT DELETE BY FILTER (Try this first - works on Serverless)
try:
index.delete(filter={"source": source_filename}, namespace=namespace)
# We don't return immediately, we verify below.
except Exception as e:
print(f"Metadata delete failed (expected on Starter tier): {e}")
# Strategy 2: "The Clean Sweep" (Iterator)
# If the above didn't catch them (or silently failed), we manually hunt them.
# We look for the standard ID prefixes used by our app.
# Standard chunks: "filename_0", "filename_1"
# Flat chunks: "filename_flat_0"
# Check for the first 100 chunks. If found, delete.
# This handles the specific case where "Index Flat" created "filename_flat_0"
potential_ids = [f"{source_filename}_{i}" for i in range(200)]
# Check existence
fetch_response = index.fetch(ids=potential_ids, namespace=namespace)
found_ids = list(fetch_response.vectors.keys())
if found_ids:
index.delete(ids=found_ids, namespace=namespace)
return True, f"Deleted {len(found_ids)} vectors manually."
return True, "Delete signal sent."
except Exception as e:
print(f"Delete failed: {e}")
return False, str(e)
# --- HELPER FOR RESYNC ---
def get_all_ids(self, index_name, namespace):
# This helper iterates via list_paginated (if available) or dummy query
try:
index = self.pc.Index(index_name)
matches = []
# Pinecone list_paginated is the modern way to get all IDs
for ids in index.list(namespace=namespace):
matches.extend(ids)
return matches
except Exception as e:
# Fallback for older clients
print(f"List IDs failed: {e}")
return []
def fetch_vectors(self, index_name: str, ids: list, namespace: str):
"""
Retrieves the actual data (metadata + text) for a list of IDs.
"""
try:
idx = self.pc.Index(index_name)
# Fetch has a limit of 1000 items per call usually, so we batch if needed
# For simplicity in this specific app, simple fetch is okay for now
return idx.fetch(ids=ids, namespace=namespace)
except Exception as e:
logger.error(f"Error fetching vectors: {e}")
return {} |