# processing/pinecone_manager.py """ Pinecone cloud vector database implementation Scalable cloud-based vector search """ import pinecone from typing import List, Dict, Any, Optional import time from embeddings.embedding_models import EmbeddingManager from embeddings.text_chunking import ResearchPaperChunker class PineconeManager: """Pinecone cloud vector database manager""" def __init__(self, api_key: str = None, environment: str = "us-west1-gcp", index_name: str = "medical-research-papers", embedding_model: str = "all-MiniLM-L6-v2", chunk_strategy: str = "semantic"): self.api_key = api_key self.environment = environment self.index_name = index_name self.embedding_manager = EmbeddingManager(embedding_model) self.chunker = ResearchPaperChunker(chunk_strategy) if not api_key: print("โš ๏ธ Pinecone API key not provided. Please set PINECONE_API_KEY environment variable.") return # Initialize Pinecone try: pinecone.init(api_key=api_key, environment=environment) print(f"โœ… Pinecone initialized: {environment}") # Create or connect to index if index_name not in pinecone.list_indexes(): print(f"๐Ÿ†• Creating new Pinecone index: {index_name}") self._create_index() else: print(f"๐Ÿ“‚ Connecting to existing index: {index_name}") self.index = pinecone.Index(index_name) print("โœ… Pinecone index ready") except Exception as e: print(f"โŒ Pinecone initialization error: {e}") raise def _create_index(self): """Create a new Pinecone index""" try: dimension = self.embedding_manager.get_embedding_dimensions() pinecone.create_index( name=self.index_name, dimension=dimension, metric="cosine", metadata_config={ "indexed": ["domain", "source", "publication_date"] } ) # Wait for index to be ready while not pinecone.describe_index(self.index_name).status['ready']: time.sleep(1) print(f"โœ… Pinecone index created: {self.index_name} (dimension: {dimension})") except Exception as e: print(f"โŒ Error creating Pinecone index: {e}") raise def add_papers(self, papers: List[Dict[str, Any]], batch_size: int = 100) -> bool: """Add papers to Pinecone""" try: if not hasattr(self, 'index'): print("โŒ Pinecone not initialized properly") return False # Chunk all papers all_chunks = self.chunker.batch_chunk_papers(papers) if not all_chunks: print("โš ๏ธ No chunks generated from papers") return False # Prepare vectors for Pinecone vectors = [] chunk_texts = [chunk['text'] for chunk in all_chunks] embeddings = self.embedding_manager.encode(chunk_texts) for i, chunk in enumerate(all_chunks): vector_id = f"{chunk['paper_id']}_chunk_{i}" metadata = { 'paper_id': chunk['paper_id'], 'paper_title': chunk['paper_title'], 'text': chunk['text'], 'source': chunk['source'], 'domain': chunk['domain'], 'publication_date': chunk.get('publication_date', ''), 'chunk_strategy': chunk.get('chunk_strategy', 'semantic'), 'chunk_index': i, 'start_char': chunk.get('start_char', 0), 'end_char': chunk.get('end_char', 0) } # Add authors if available if chunk.get('authors'): metadata['authors'] = ','.join(chunk['authors'][:3]) # Limit author list vectors.append((vector_id, embeddings[i].tolist(), metadata)) # Upload in batches total_vectors = len(vectors) for i in range(0, total_vectors, batch_size): batch_end = min(i + batch_size, total_vectors) batch_vectors = vectors[i:batch_end] self.index.upsert(vectors=batch_vectors) print(f"๐Ÿ“ฆ Uploaded batch {i // batch_size + 1}: {i}-{batch_end - 1} vectors") # Small delay to avoid rate limits time.sleep(0.1) print(f"โœ… Successfully uploaded {total_vectors} vectors from {len(papers)} papers") return True except Exception as e: print(f"โŒ Error adding papers to Pinecone: {e}") return False def search(self, query: str, domain: str = None, n_results: int = 10, include_metadata: bool = True, include_values: bool = False) -> List[Dict[str, Any]]: """Search for similar paper chunks in Pinecone""" try: if not hasattr(self, 'index'): print("โŒ Pinecone not initialized properly") return [] # Encode query query_embedding = self.embedding_manager.encode([query])[0].tolist() # Build filter filter_dict = {} if domain: filter_dict['domain'] = {'$eq': domain} # Perform search results = self.index.query( vector=query_embedding, top_k=n_results, filter=filter_dict if filter_dict else None, include_metadata=include_metadata, include_values=include_values ) # Format results formatted_results = [] for match in results['matches']: formatted_results.append({ 'text': match['metadata']['text'], 'metadata': match['metadata'], 'distance': match['score'], # Pinecone uses score (cosine similarity) 'id': match['id'] }) return formatted_results except Exception as e: print(f"โŒ Pinecone search error: {e}") return [] def get_collection_stats(self) -> Dict[str, Any]: """Get statistics about the Pinecone index""" try: if not hasattr(self, 'index'): return {"error": "Pinecone not initialized"} stats = self.index.describe_index_stats() return { "total_vectors": stats['total_vector_count'], "dimension": stats['dimension'], "index_fullness": stats.get('index_fullness', 0), "namespaces": stats.get('namespaces', {}), "embedding_model": self.embedding_manager.model_name } except Exception as e: print(f"โŒ Error getting Pinecone stats: {e}") return {} def delete_paper(self, paper_id: str) -> bool: """Delete all vectors for a specific paper""" try: if not hasattr(self, 'index'): print("โŒ Pinecone not initialized properly") return False # Find all vectors for this paper results = self.index.query( vector=[0] * self.embedding_manager.get_embedding_dimensions(), # Dummy vector filter={'paper_id': {'$eq': paper_id}}, top_k=10000, # Large number to get all matches include_metadata=False ) vector_ids = [match['id'] for match in results['matches']] if vector_ids: self.index.delete(ids=vector_ids) print(f"โœ… Deleted {len(vector_ids)} vectors for paper {paper_id}") return True else: print(f"โš ๏ธ No vectors found for paper {paper_id}") return False except Exception as e: print(f"โŒ Error deleting paper {paper_id}: {e}") return False def update_paper(self, paper_id: str, paper_data: Dict[str, Any]) -> bool: """Update a paper's vectors""" try: # First delete existing vectors self.delete_paper(paper_id) # Then add updated paper return self.add_papers([paper_data]) except Exception as e: print(f"โŒ Error updating paper {paper_id}: {e}") return False # Quick test (requires actual Pinecone API key) def test_pinecone_manager(): """Test Pinecone manager (requires API key)""" import os api_key = os.getenv('PINECONE_API_KEY') if not api_key: print("โŒ Pinecone API key not found in environment variables") print(" Set PINECONE_API_KEY to test Pinecone functionality") return test_papers = [ { 'id': 'test_001', 'title': 'AI in Medical Imaging', 'abstract': 'Deep learning transforms medical image analysis with improved accuracy.', 'source': 'test', 'domain': 'medical_imaging', 'authors': ['John Doe', 'Jane Smith'] } ] print("๐Ÿงช Testing Pinecone Manager") print("=" * 50) try: manager = PineconeManager( api_key=api_key, index_name="test-medical-papers", embedding_model="all-MiniLM-L6-v2" ) # Add test papers success = manager.add_papers(test_papers) if success: print("โœ… Papers added successfully") # Test search results = manager.search("medical image analysis", n_results=5) print(f"๐Ÿ” Search results: {len(results)} chunks found") for result in results[:2]: print(f" - {result['metadata']['paper_title']} (score: {result['distance']:.3f})") # Get stats stats = manager.get_collection_stats() print(f"๐Ÿ“Š Collection stats: {stats}") else: print("โŒ Failed to add papers") except Exception as e: print(f"โŒ Pinecone test failed: {e}") if __name__ == "__main__": test_pinecone_manager()