MedSearchPro / processing /pinecone_manager.py
paulhemb's picture
Initial Backend Deployment
1367957
# processing/pinecone_manager.py
"""
Pinecone cloud vector database implementation
Scalable cloud-based vector search
"""
import pinecone
from typing import List, Dict, Any, Optional
import time
from embeddings.embedding_models import EmbeddingManager
from embeddings.text_chunking import ResearchPaperChunker
class PineconeManager:
"""Pinecone cloud vector database manager"""
def __init__(self,
api_key: str = None,
environment: str = "us-west1-gcp",
index_name: str = "medical-research-papers",
embedding_model: str = "all-MiniLM-L6-v2",
chunk_strategy: str = "semantic"):
self.api_key = api_key
self.environment = environment
self.index_name = index_name
self.embedding_manager = EmbeddingManager(embedding_model)
self.chunker = ResearchPaperChunker(chunk_strategy)
if not api_key:
print("⚠️ Pinecone API key not provided. Please set PINECONE_API_KEY environment variable.")
return
# Initialize Pinecone
try:
pinecone.init(api_key=api_key, environment=environment)
print(f"βœ… Pinecone initialized: {environment}")
# Create or connect to index
if index_name not in pinecone.list_indexes():
print(f"πŸ†• Creating new Pinecone index: {index_name}")
self._create_index()
else:
print(f"πŸ“‚ Connecting to existing index: {index_name}")
self.index = pinecone.Index(index_name)
print("βœ… Pinecone index ready")
except Exception as e:
print(f"❌ Pinecone initialization error: {e}")
raise
def _create_index(self):
"""Create a new Pinecone index"""
try:
dimension = self.embedding_manager.get_embedding_dimensions()
pinecone.create_index(
name=self.index_name,
dimension=dimension,
metric="cosine",
metadata_config={
"indexed": ["domain", "source", "publication_date"]
}
)
# Wait for index to be ready
while not pinecone.describe_index(self.index_name).status['ready']:
time.sleep(1)
print(f"βœ… Pinecone index created: {self.index_name} (dimension: {dimension})")
except Exception as e:
print(f"❌ Error creating Pinecone index: {e}")
raise
def add_papers(self, papers: List[Dict[str, Any]], batch_size: int = 100) -> bool:
"""Add papers to Pinecone"""
try:
if not hasattr(self, 'index'):
print("❌ Pinecone not initialized properly")
return False
# Chunk all papers
all_chunks = self.chunker.batch_chunk_papers(papers)
if not all_chunks:
print("⚠️ No chunks generated from papers")
return False
# Prepare vectors for Pinecone
vectors = []
chunk_texts = [chunk['text'] for chunk in all_chunks]
embeddings = self.embedding_manager.encode(chunk_texts)
for i, chunk in enumerate(all_chunks):
vector_id = f"{chunk['paper_id']}_chunk_{i}"
metadata = {
'paper_id': chunk['paper_id'],
'paper_title': chunk['paper_title'],
'text': chunk['text'],
'source': chunk['source'],
'domain': chunk['domain'],
'publication_date': chunk.get('publication_date', ''),
'chunk_strategy': chunk.get('chunk_strategy', 'semantic'),
'chunk_index': i,
'start_char': chunk.get('start_char', 0),
'end_char': chunk.get('end_char', 0)
}
# Add authors if available
if chunk.get('authors'):
metadata['authors'] = ','.join(chunk['authors'][:3]) # Limit author list
vectors.append((vector_id, embeddings[i].tolist(), metadata))
# Upload in batches
total_vectors = len(vectors)
for i in range(0, total_vectors, batch_size):
batch_end = min(i + batch_size, total_vectors)
batch_vectors = vectors[i:batch_end]
self.index.upsert(vectors=batch_vectors)
print(f"πŸ“¦ Uploaded batch {i // batch_size + 1}: {i}-{batch_end - 1} vectors")
# Small delay to avoid rate limits
time.sleep(0.1)
print(f"βœ… Successfully uploaded {total_vectors} vectors from {len(papers)} papers")
return True
except Exception as e:
print(f"❌ Error adding papers to Pinecone: {e}")
return False
def search(self,
query: str,
domain: str = None,
n_results: int = 10,
include_metadata: bool = True,
include_values: bool = False) -> List[Dict[str, Any]]:
"""Search for similar paper chunks in Pinecone"""
try:
if not hasattr(self, 'index'):
print("❌ Pinecone not initialized properly")
return []
# Encode query
query_embedding = self.embedding_manager.encode([query])[0].tolist()
# Build filter
filter_dict = {}
if domain:
filter_dict['domain'] = {'$eq': domain}
# Perform search
results = self.index.query(
vector=query_embedding,
top_k=n_results,
filter=filter_dict if filter_dict else None,
include_metadata=include_metadata,
include_values=include_values
)
# Format results
formatted_results = []
for match in results['matches']:
formatted_results.append({
'text': match['metadata']['text'],
'metadata': match['metadata'],
'distance': match['score'], # Pinecone uses score (cosine similarity)
'id': match['id']
})
return formatted_results
except Exception as e:
print(f"❌ Pinecone search error: {e}")
return []
def get_collection_stats(self) -> Dict[str, Any]:
"""Get statistics about the Pinecone index"""
try:
if not hasattr(self, 'index'):
return {"error": "Pinecone not initialized"}
stats = self.index.describe_index_stats()
return {
"total_vectors": stats['total_vector_count'],
"dimension": stats['dimension'],
"index_fullness": stats.get('index_fullness', 0),
"namespaces": stats.get('namespaces', {}),
"embedding_model": self.embedding_manager.model_name
}
except Exception as e:
print(f"❌ Error getting Pinecone stats: {e}")
return {}
def delete_paper(self, paper_id: str) -> bool:
"""Delete all vectors for a specific paper"""
try:
if not hasattr(self, 'index'):
print("❌ Pinecone not initialized properly")
return False
# Find all vectors for this paper
results = self.index.query(
vector=[0] * self.embedding_manager.get_embedding_dimensions(), # Dummy vector
filter={'paper_id': {'$eq': paper_id}},
top_k=10000, # Large number to get all matches
include_metadata=False
)
vector_ids = [match['id'] for match in results['matches']]
if vector_ids:
self.index.delete(ids=vector_ids)
print(f"βœ… Deleted {len(vector_ids)} vectors for paper {paper_id}")
return True
else:
print(f"⚠️ No vectors found for paper {paper_id}")
return False
except Exception as e:
print(f"❌ Error deleting paper {paper_id}: {e}")
return False
def update_paper(self, paper_id: str, paper_data: Dict[str, Any]) -> bool:
"""Update a paper's vectors"""
try:
# First delete existing vectors
self.delete_paper(paper_id)
# Then add updated paper
return self.add_papers([paper_data])
except Exception as e:
print(f"❌ Error updating paper {paper_id}: {e}")
return False
# Quick test (requires actual Pinecone API key)
def test_pinecone_manager():
"""Test Pinecone manager (requires API key)"""
import os
api_key = os.getenv('PINECONE_API_KEY')
if not api_key:
print("❌ Pinecone API key not found in environment variables")
print(" Set PINECONE_API_KEY to test Pinecone functionality")
return
test_papers = [
{
'id': 'test_001',
'title': 'AI in Medical Imaging',
'abstract': 'Deep learning transforms medical image analysis with improved accuracy.',
'source': 'test',
'domain': 'medical_imaging',
'authors': ['John Doe', 'Jane Smith']
}
]
print("πŸ§ͺ Testing Pinecone Manager")
print("=" * 50)
try:
manager = PineconeManager(
api_key=api_key,
index_name="test-medical-papers",
embedding_model="all-MiniLM-L6-v2"
)
# Add test papers
success = manager.add_papers(test_papers)
if success:
print("βœ… Papers added successfully")
# Test search
results = manager.search("medical image analysis", n_results=5)
print(f"πŸ” Search results: {len(results)} chunks found")
for result in results[:2]:
print(f" - {result['metadata']['paper_title']} (score: {result['distance']:.3f})")
# Get stats
stats = manager.get_collection_stats()
print(f"πŸ“Š Collection stats: {stats}")
else:
print("❌ Failed to add papers")
except Exception as e:
print(f"❌ Pinecone test failed: {e}")
if __name__ == "__main__":
test_pinecone_manager()