Spaces:
Running
Running
| # processing/pinecone_manager.py | |
| """ | |
| Pinecone cloud vector database implementation | |
| Scalable cloud-based vector search | |
| """ | |
| import pinecone | |
| from typing import List, Dict, Any, Optional | |
| import time | |
| from embeddings.embedding_models import EmbeddingManager | |
| from embeddings.text_chunking import ResearchPaperChunker | |
| class PineconeManager: | |
| """Pinecone cloud vector database manager""" | |
| def __init__(self, | |
| api_key: str = None, | |
| environment: str = "us-west1-gcp", | |
| index_name: str = "medical-research-papers", | |
| embedding_model: str = "all-MiniLM-L6-v2", | |
| chunk_strategy: str = "semantic"): | |
| self.api_key = api_key | |
| self.environment = environment | |
| self.index_name = index_name | |
| self.embedding_manager = EmbeddingManager(embedding_model) | |
| self.chunker = ResearchPaperChunker(chunk_strategy) | |
| if not api_key: | |
| print("β οΈ Pinecone API key not provided. Please set PINECONE_API_KEY environment variable.") | |
| return | |
| # Initialize Pinecone | |
| try: | |
| pinecone.init(api_key=api_key, environment=environment) | |
| print(f"β Pinecone initialized: {environment}") | |
| # Create or connect to index | |
| if index_name not in pinecone.list_indexes(): | |
| print(f"π Creating new Pinecone index: {index_name}") | |
| self._create_index() | |
| else: | |
| print(f"π Connecting to existing index: {index_name}") | |
| self.index = pinecone.Index(index_name) | |
| print("β Pinecone index ready") | |
| except Exception as e: | |
| print(f"β Pinecone initialization error: {e}") | |
| raise | |
| def _create_index(self): | |
| """Create a new Pinecone index""" | |
| try: | |
| dimension = self.embedding_manager.get_embedding_dimensions() | |
| pinecone.create_index( | |
| name=self.index_name, | |
| dimension=dimension, | |
| metric="cosine", | |
| metadata_config={ | |
| "indexed": ["domain", "source", "publication_date"] | |
| } | |
| ) | |
| # Wait for index to be ready | |
| while not pinecone.describe_index(self.index_name).status['ready']: | |
| time.sleep(1) | |
| print(f"β Pinecone index created: {self.index_name} (dimension: {dimension})") | |
| except Exception as e: | |
| print(f"β Error creating Pinecone index: {e}") | |
| raise | |
| def add_papers(self, papers: List[Dict[str, Any]], batch_size: int = 100) -> bool: | |
| """Add papers to Pinecone""" | |
| try: | |
| if not hasattr(self, 'index'): | |
| print("β Pinecone not initialized properly") | |
| return False | |
| # Chunk all papers | |
| all_chunks = self.chunker.batch_chunk_papers(papers) | |
| if not all_chunks: | |
| print("β οΈ No chunks generated from papers") | |
| return False | |
| # Prepare vectors for Pinecone | |
| vectors = [] | |
| chunk_texts = [chunk['text'] for chunk in all_chunks] | |
| embeddings = self.embedding_manager.encode(chunk_texts) | |
| for i, chunk in enumerate(all_chunks): | |
| vector_id = f"{chunk['paper_id']}_chunk_{i}" | |
| metadata = { | |
| 'paper_id': chunk['paper_id'], | |
| 'paper_title': chunk['paper_title'], | |
| 'text': chunk['text'], | |
| 'source': chunk['source'], | |
| 'domain': chunk['domain'], | |
| 'publication_date': chunk.get('publication_date', ''), | |
| 'chunk_strategy': chunk.get('chunk_strategy', 'semantic'), | |
| 'chunk_index': i, | |
| 'start_char': chunk.get('start_char', 0), | |
| 'end_char': chunk.get('end_char', 0) | |
| } | |
| # Add authors if available | |
| if chunk.get('authors'): | |
| metadata['authors'] = ','.join(chunk['authors'][:3]) # Limit author list | |
| vectors.append((vector_id, embeddings[i].tolist(), metadata)) | |
| # Upload in batches | |
| total_vectors = len(vectors) | |
| for i in range(0, total_vectors, batch_size): | |
| batch_end = min(i + batch_size, total_vectors) | |
| batch_vectors = vectors[i:batch_end] | |
| self.index.upsert(vectors=batch_vectors) | |
| print(f"π¦ Uploaded batch {i // batch_size + 1}: {i}-{batch_end - 1} vectors") | |
| # Small delay to avoid rate limits | |
| time.sleep(0.1) | |
| print(f"β Successfully uploaded {total_vectors} vectors from {len(papers)} papers") | |
| return True | |
| except Exception as e: | |
| print(f"β Error adding papers to Pinecone: {e}") | |
| return False | |
| def search(self, | |
| query: str, | |
| domain: str = None, | |
| n_results: int = 10, | |
| include_metadata: bool = True, | |
| include_values: bool = False) -> List[Dict[str, Any]]: | |
| """Search for similar paper chunks in Pinecone""" | |
| try: | |
| if not hasattr(self, 'index'): | |
| print("β Pinecone not initialized properly") | |
| return [] | |
| # Encode query | |
| query_embedding = self.embedding_manager.encode([query])[0].tolist() | |
| # Build filter | |
| filter_dict = {} | |
| if domain: | |
| filter_dict['domain'] = {'$eq': domain} | |
| # Perform search | |
| results = self.index.query( | |
| vector=query_embedding, | |
| top_k=n_results, | |
| filter=filter_dict if filter_dict else None, | |
| include_metadata=include_metadata, | |
| include_values=include_values | |
| ) | |
| # Format results | |
| formatted_results = [] | |
| for match in results['matches']: | |
| formatted_results.append({ | |
| 'text': match['metadata']['text'], | |
| 'metadata': match['metadata'], | |
| 'distance': match['score'], # Pinecone uses score (cosine similarity) | |
| 'id': match['id'] | |
| }) | |
| return formatted_results | |
| except Exception as e: | |
| print(f"β Pinecone search error: {e}") | |
| return [] | |
| def get_collection_stats(self) -> Dict[str, Any]: | |
| """Get statistics about the Pinecone index""" | |
| try: | |
| if not hasattr(self, 'index'): | |
| return {"error": "Pinecone not initialized"} | |
| stats = self.index.describe_index_stats() | |
| return { | |
| "total_vectors": stats['total_vector_count'], | |
| "dimension": stats['dimension'], | |
| "index_fullness": stats.get('index_fullness', 0), | |
| "namespaces": stats.get('namespaces', {}), | |
| "embedding_model": self.embedding_manager.model_name | |
| } | |
| except Exception as e: | |
| print(f"β Error getting Pinecone stats: {e}") | |
| return {} | |
| def delete_paper(self, paper_id: str) -> bool: | |
| """Delete all vectors for a specific paper""" | |
| try: | |
| if not hasattr(self, 'index'): | |
| print("β Pinecone not initialized properly") | |
| return False | |
| # Find all vectors for this paper | |
| results = self.index.query( | |
| vector=[0] * self.embedding_manager.get_embedding_dimensions(), # Dummy vector | |
| filter={'paper_id': {'$eq': paper_id}}, | |
| top_k=10000, # Large number to get all matches | |
| include_metadata=False | |
| ) | |
| vector_ids = [match['id'] for match in results['matches']] | |
| if vector_ids: | |
| self.index.delete(ids=vector_ids) | |
| print(f"β Deleted {len(vector_ids)} vectors for paper {paper_id}") | |
| return True | |
| else: | |
| print(f"β οΈ No vectors found for paper {paper_id}") | |
| return False | |
| except Exception as e: | |
| print(f"β Error deleting paper {paper_id}: {e}") | |
| return False | |
| def update_paper(self, paper_id: str, paper_data: Dict[str, Any]) -> bool: | |
| """Update a paper's vectors""" | |
| try: | |
| # First delete existing vectors | |
| self.delete_paper(paper_id) | |
| # Then add updated paper | |
| return self.add_papers([paper_data]) | |
| except Exception as e: | |
| print(f"β Error updating paper {paper_id}: {e}") | |
| return False | |
| # Quick test (requires actual Pinecone API key) | |
| def test_pinecone_manager(): | |
| """Test Pinecone manager (requires API key)""" | |
| import os | |
| api_key = os.getenv('PINECONE_API_KEY') | |
| if not api_key: | |
| print("β Pinecone API key not found in environment variables") | |
| print(" Set PINECONE_API_KEY to test Pinecone functionality") | |
| return | |
| test_papers = [ | |
| { | |
| 'id': 'test_001', | |
| 'title': 'AI in Medical Imaging', | |
| 'abstract': 'Deep learning transforms medical image analysis with improved accuracy.', | |
| 'source': 'test', | |
| 'domain': 'medical_imaging', | |
| 'authors': ['John Doe', 'Jane Smith'] | |
| } | |
| ] | |
| print("π§ͺ Testing Pinecone Manager") | |
| print("=" * 50) | |
| try: | |
| manager = PineconeManager( | |
| api_key=api_key, | |
| index_name="test-medical-papers", | |
| embedding_model="all-MiniLM-L6-v2" | |
| ) | |
| # Add test papers | |
| success = manager.add_papers(test_papers) | |
| if success: | |
| print("β Papers added successfully") | |
| # Test search | |
| results = manager.search("medical image analysis", n_results=5) | |
| print(f"π Search results: {len(results)} chunks found") | |
| for result in results[:2]: | |
| print(f" - {result['metadata']['paper_title']} (score: {result['distance']:.3f})") | |
| # Get stats | |
| stats = manager.get_collection_stats() | |
| print(f"π Collection stats: {stats}") | |
| else: | |
| print("β Failed to add papers") | |
| except Exception as e: | |
| print(f"β Pinecone test failed: {e}") | |
| if __name__ == "__main__": | |
| test_pinecone_manager() |