Spaces:
Sleeping
Sleeping
| # data_sources/real_time_searcher.py | |
| """ | |
| Enhanced Real-Time Searcher with Multiple Data Sources and Smart Fallback System | |
| Now supports 9 data sources with priority-based searching | |
| """ | |
| import requests | |
| import time | |
| from typing import List, Dict, Optional | |
| from .pubmed_client import PubMedClient | |
| from .arxiv_client import ArXivClient | |
| from .europe_pmc_client import EuropePMCClient | |
| from .biorxiv_medrxiv_client import BiorxivMedrxivClient | |
| from .semantic_scholar_client import SemanticScholarClient | |
| from .crossref_client import CrossrefClient | |
| from .biomed_central_client import BioMedCentralClient | |
| from .core_client import CoreClient | |
| from processing.paper_processor import PaperProcessor | |
| class RealTimeSearcher: | |
| """ | |
| Advanced real-time searcher with primary sources and intelligent fallback | |
| Handles 9 different academic APIs with rate limiting and error recovery | |
| """ | |
| def __init__(self): | |
| # ==================== PRIMARY SOURCES ==================== | |
| # High-quality, reliable sources (always try these first) | |
| self.primary_clients = { | |
| 'pubmed': PubMedClient(), | |
| 'europe_pmc': EuropePMCClient(), | |
| 'biomed_central': BioMedCentralClient(), # NEW - High quality medical content | |
| 'arxiv': ArXivClient(), | |
| 'medrxiv': BiorxivMedrxivClient(), | |
| 'biorxiv': BiorxivMedrxivClient(), | |
| 'semantic_scholar': SemanticScholarClient(), | |
| 'crossref': CrossrefClient() | |
| } | |
| # ==================== FALLBACK SOURCES ==================== | |
| # Use only when primary sources return insufficient results | |
| self.fallback_clients = { | |
| 'core': CoreClient() # Massive but variable quality | |
| } | |
| self.processor = PaperProcessor() | |
| # ==================== SOURCE CONFIGURATION ==================== | |
| self.source_config = { | |
| # PRIMARY SOURCES - Tier 1: Most reliable and high-quality | |
| 'pubmed': { | |
| 'delay': 0.3, | |
| 'priority': 1, | |
| 'description': 'NIH biomedical database - High quality medical research', | |
| 'max_retries': 2 | |
| }, | |
| 'europe_pmc': { | |
| 'delay': 0.3, | |
| 'priority': 1, | |
| 'description': 'European biomedical repository - Alternative to PubMed', | |
| 'max_retries': 2 | |
| }, | |
| 'biomed_central': { | |
| 'delay': 0.4, | |
| 'priority': 1, | |
| 'description': 'Springer Nature open access - High quality medical journals', | |
| 'max_retries': 2, | |
| 'requires_key': True | |
| }, | |
| # PRIMARY SOURCES - Tier 2: Specialized but reliable | |
| 'arxiv': { | |
| 'delay': 0.5, | |
| 'priority': 2, | |
| 'description': 'Preprint server - Latest research before publication', | |
| 'max_retries': 1 | |
| }, | |
| 'medrxiv': { | |
| 'delay': 0.4, | |
| 'priority': 2, | |
| 'description': 'Medical preprints - Latest medical research', | |
| 'max_retries': 1 | |
| }, | |
| 'biorxiv': { | |
| 'delay': 0.4, | |
| 'priority': 2, | |
| 'description': 'Biology preprints - Latest biology research', | |
| 'max_retries': 1 | |
| }, | |
| # PRIMARY SOURCES - Tier 3: Supplementary sources | |
| 'semantic_scholar': { | |
| 'delay': 0.2, | |
| 'priority': 3, | |
| 'description': 'AI-powered search - Citation and recommendation engine', | |
| 'max_retries': 2 | |
| }, | |
| 'crossref': { | |
| 'delay': 0.2, | |
| 'priority': 3, | |
| 'description': 'DOI metadata - Reference and citation data', | |
| 'max_retries': 2 | |
| }, | |
| # FALLBACK SOURCES - Use only when needed | |
| 'core': { | |
| 'delay': 0.6, | |
| 'priority': 99, | |
| 'description': 'Massive repository - Comprehensive but variable quality', | |
| 'max_retries': 1, | |
| 'fallback_only': True, | |
| 'requires_key': True | |
| } | |
| } | |
| def search_user_query(self, user_query: str, domain: str, max_results: int = 20, use_fallback: bool = False) -> \ | |
| List[Dict]: | |
| """ | |
| Main search method with intelligent source selection and fallback logic | |
| Args: | |
| user_query: User's search query | |
| domain: Medical domain (e.g., 'medical_imaging', 'genomics') | |
| max_results: Maximum number of papers to return | |
| use_fallback: Whether to use fallback sources if primary sources return few results | |
| Returns: | |
| List of relevant papers with source metadata | |
| """ | |
| print(f"🔍 Real-time search for: '{user_query}'") | |
| print(f" Domain: {domain}") | |
| print(f" Max results: {max_results}") | |
| print(f" Fallback enabled: {use_fallback}") | |
| all_papers = [] | |
| search_start_time = time.time() | |
| try: | |
| # Get domain configuration to determine which sources to use | |
| from config.domains import get_domain_config | |
| domain_config = get_domain_config(domain) | |
| sources_to_use = domain_config.get('sources', ['pubmed', 'arxiv']) | |
| print(f" Sources configured: {', '.join(sources_to_use)}") | |
| # PHASE 1: Search primary sources | |
| primary_papers = self._search_primary_sources(sources_to_use, user_query, domain, max_results) | |
| all_papers.extend(primary_papers) | |
| primary_search_time = time.time() - search_start_time | |
| # PHASE 2: Conditionally use fallback sources | |
| fallback_papers = [] | |
| should_use_fallback = ( | |
| use_fallback and | |
| len(primary_papers) < max_results // 2 and | |
| primary_search_time < 10 # Don't use fallback if primary search was too slow | |
| ) | |
| if should_use_fallback: | |
| print(f" 🔄 Low results from primary sources ({len(primary_papers)}), activating fallback...") | |
| fallback_papers = self._search_fallback_sources(user_query, domain, max_results // 3) | |
| # Mark fallback papers for tracking | |
| for paper in fallback_papers: | |
| paper['is_fallback'] = True | |
| paper['fallback_reason'] = f"Primary sources returned only {len(primary_papers)} papers" | |
| all_papers.extend(fallback_papers) | |
| print(f" 📊 Fallback added {len(fallback_papers)} papers") | |
| # Process and rank results | |
| processed_papers = self.processor.process_papers(all_papers) | |
| relevant_papers = self._rank_by_relevance(processed_papers, user_query, domain) | |
| total_search_time = time.time() - search_start_time | |
| # Generate search statistics | |
| source_breakdown = self._generate_source_breakdown(processed_papers) | |
| print(f" ✅ Search completed in {total_search_time:.1f}s") | |
| print(f" 📈 Results: {len(relevant_papers)} relevant papers") | |
| print(f" 📚 Sources: {source_breakdown}") | |
| return relevant_papers[:max_results] | |
| except Exception as e: | |
| print(f"❌ Real-time search error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def _search_primary_sources(self, sources: List[str], query: str, domain: str, max_results: int) -> List[Dict]: | |
| """ | |
| Search primary sources with optimized query and error handling | |
| """ | |
| papers = [] | |
| successful_sources = 0 | |
| failed_sources = [] | |
| # Sort sources by priority | |
| sorted_sources = sorted(sources, key=lambda s: self.source_config.get(s, {}).get('priority', 999)) | |
| for source in sorted_sources: | |
| if source not in self.primary_clients: | |
| continue | |
| try: | |
| source_config = self.source_config.get(source, {}) | |
| max_source_results = max_results // len([s for s in sources if s in self.primary_clients]) | |
| print(f" 🔍 Primary: {source} (priority {source_config.get('priority', 'N/A')})") | |
| # Optimize query for this specific source | |
| optimized_query = self._optimize_query_for_source(query, domain, source) | |
| source_papers = self._search_single_source_with_retry( | |
| source, optimized_query, domain, max_source_results | |
| ) | |
| papers.extend(source_papers) | |
| successful_sources += 1 | |
| print(f" ✅ {source}: found {len(source_papers)} papers") | |
| # Respect rate limits | |
| time.sleep(source_config.get('delay', 0.3)) | |
| except Exception as e: | |
| error_msg = f"{source} failed: {e}" | |
| print(f" ❌ {error_msg}") | |
| failed_sources.append(error_msg) | |
| continue | |
| print( | |
| f" 📊 Primary search: {successful_sources}/{len([s for s in sources if s in self.primary_clients])} sources successful") | |
| if failed_sources: | |
| print(f" ⚠️ Failed: {', '.join(failed_sources)}") | |
| return papers | |
| def _search_fallback_sources(self, query: str, domain: str, max_results: int) -> List[Dict]: | |
| """ | |
| Search fallback sources - used only when primary sources are insufficient | |
| """ | |
| papers = [] | |
| successful_fallbacks = 0 | |
| for source_name, client in self.fallback_clients.items(): | |
| try: | |
| source_config = self.source_config.get(source_name, {}) | |
| print(f" 🛡️ Fallback: {source_name}") | |
| # Use simpler query for fallback sources (they're less precise) | |
| fallback_query = self._simplify_query_for_fallback(query) | |
| source_papers = self._search_single_source_with_retry( | |
| source_name, fallback_query, domain, max_results | |
| ) | |
| # Add fallback metadata | |
| for paper in source_papers: | |
| paper['is_fallback'] = True | |
| paper['fallback_source'] = source_name | |
| papers.extend(source_papers) | |
| successful_fallbacks += 1 | |
| print(f" ✅ {source_name}: found {len(source_papers)} papers") | |
| # Extra conservative delay for fallback sources | |
| time.sleep(source_config.get('delay', 0.6)) | |
| except Exception as e: | |
| print(f" ⚠️ Fallback source {source_name} failed: {e}") | |
| continue | |
| print(f" 📊 Fallback search: {successful_fallbacks}/{len(self.fallback_clients)} sources successful") | |
| return papers | |
| def _search_single_source_with_retry(self, source: str, query: str, domain: str, max_results: int, | |
| retry_count: int = 0) -> List[Dict]: | |
| """ | |
| Search a single source with retry logic and better error handling | |
| """ | |
| max_retries = self.source_config.get(source, {}).get('max_retries', 1) | |
| try: | |
| if source in self.primary_clients: | |
| client = self.primary_clients[source] | |
| elif source in self.fallback_clients: | |
| client = self.fallback_clients[source] | |
| else: | |
| return [] | |
| # Handle special cases for bioRxiv/medRxiv | |
| if source == 'medrxiv': | |
| papers = client.search_papers(query, server="medrxiv", max_results=max_results) | |
| elif source == 'biorxiv': | |
| papers = client.search_papers(query, server="biorxiv", max_results=max_results) | |
| else: | |
| papers = client.search_papers(query, max_results=max_results) | |
| # If we got papers, return them even if there were warnings | |
| if papers: | |
| return papers | |
| else: | |
| # No papers found, but no error - this is fine | |
| return [] | |
| except Exception as e: | |
| # Check if this is just an API key warning (not a real error) | |
| error_msg = str(e).lower() | |
| if 'api key' in error_msg or 'key' in error_msg: | |
| print(f" ⚠️ {source}: API key needed for full access") | |
| return [] # Return empty instead of failing completely | |
| if retry_count < max_retries: | |
| print(f" 🔄 Retrying {source} ({retry_count + 1}/{max_retries})...") | |
| time.sleep(1) | |
| return self._search_single_source_with_retry(source, query, domain, max_results, retry_count + 1) | |
| else: | |
| print(f" ❌ {source} failed after {max_retries} retries: {e}") | |
| return [] | |
| def _optimize_query_for_source(self, user_query: str, domain: str, source: str) -> str: | |
| """ | |
| Optimize search query for specific data source | |
| """ | |
| from config.domains import get_domain_config | |
| domain_config = get_domain_config(domain) | |
| # Get source-specific queries if available | |
| source_queries_key = f"{source}_queries" | |
| if source_queries_key in domain_config and domain_config[source_queries_key]: | |
| # Use the first source-specific query as base | |
| base_query = domain_config[source_queries_key][0] | |
| # Add user query terms | |
| important_terms = self._extract_important_terms(user_query) | |
| return f"({base_query}) AND ({important_terms})" | |
| # Fallback to general optimization | |
| if source == 'pubmed': | |
| return self._optimize_for_pubmed(user_query, domain) | |
| elif source == 'arxiv': | |
| return self._optimize_for_arxiv(user_query, domain) | |
| else: | |
| return user_query # For other sources, use original query | |
| def _simplify_query_for_fallback(self, user_query: str) -> str: | |
| """ | |
| Simplify query for fallback sources (they handle complex queries poorly) | |
| """ | |
| # Extract key terms and remove common stop words | |
| words = user_query.lower().split() | |
| important_words = [word for word in words if len(word) > 3 and word not in [ | |
| 'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this', | |
| 'research', 'study', 'studies', 'review', 'method', 'methods' | |
| ]] | |
| return ' '.join(important_words[:5]) # Limit to 5 most important terms | |
| def _extract_important_terms(self, user_query: str) -> str: | |
| """Extract important terms from user query""" | |
| words = user_query.lower().split() | |
| important_terms = [word for word in words if len(word) > 3 and word not in [ | |
| 'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this' | |
| ]] | |
| return ' '.join(important_terms) | |
| def _generate_source_breakdown(self, papers: List[Dict]) -> Dict[str, int]: | |
| """Generate breakdown of which sources contributed papers""" | |
| source_count = {} | |
| for paper in papers: | |
| source = paper.get('source', 'unknown') | |
| source_count[source] = source_count.get(source, 0) + 1 | |
| return source_count | |
| def get_system_status(self) -> Dict: | |
| """Get comprehensive status of all data sources""" | |
| primary_sources = list(self.primary_clients.keys()) | |
| fallback_sources = list(self.fallback_clients.keys()) | |
| sources_requiring_keys = [ | |
| source for source in primary_sources + fallback_sources | |
| if self.source_config.get(source, {}).get('requires_key', False) | |
| ] | |
| return { | |
| 'primary_sources_count': len(primary_sources), | |
| 'fallback_sources_count': len(fallback_sources), | |
| 'total_sources': len(primary_sources) + len(fallback_sources), | |
| 'primary_sources': primary_sources, | |
| 'fallback_sources': fallback_sources, | |
| 'sources_requiring_keys': sources_requiring_keys, | |
| 'source_descriptions': { | |
| source: config.get('description', 'No description') | |
| for source, config in self.source_config.items() | |
| } | |
| } | |
| def test_source_connectivity(self) -> Dict[str, bool]: | |
| """ | |
| Test connectivity to all data sources | |
| Useful for diagnostics and monitoring | |
| """ | |
| connectivity_results = {} | |
| test_query = "machine learning" # Simple test query | |
| # Test primary sources | |
| for source_name in self.primary_clients.keys(): | |
| try: | |
| print(f"Testing connectivity to {source_name}...") | |
| test_papers = self._search_single_source_with_retry( | |
| source_name, test_query, "medical_imaging", 2 | |
| ) | |
| connectivity_results[source_name] = len(test_papers) > 0 | |
| print(f" {source_name}: {'✅ OK' if connectivity_results[source_name] else '❌ FAILED'}") | |
| time.sleep(0.5) # Be nice to APIs | |
| except Exception as e: | |
| connectivity_results[source_name] = False | |
| print(f" {source_name}: ❌ FAILED - {e}") | |
| # Test fallback sources | |
| for source_name in self.fallback_clients.keys(): | |
| try: | |
| print(f"Testing connectivity to {source_name}...") | |
| test_papers = self._search_single_source_with_retry( | |
| source_name, test_query, "medical_imaging", 2 | |
| ) | |
| connectivity_results[source_name] = len(test_papers) > 0 | |
| print(f" {source_name}: {'✅ OK' if connectivity_results[source_name] else '❌ FAILED'}") | |
| time.sleep(1) # Extra delay for fallback sources | |
| except Exception as e: | |
| connectivity_results[source_name] = False | |
| print(f" {source_name}: ❌ FAILED - {e}") | |
| return connectivity_results | |
| # ==================== EXISTING OPTIMIZATION METHODS ==================== | |
| def _optimize_for_pubmed(self, user_query: str, domain: str) -> str: | |
| """Convert natural user query to effective PubMed search syntax""" | |
| words = user_query.lower().split() | |
| # Keep important keywords | |
| important_words = [word for word in words if len(word) > 3 and word not in [ | |
| 'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this' | |
| ]] | |
| # Add domain-specific terms to improve relevance | |
| domain_boosters = { | |
| "medical_imaging": ["imaging", "radiology", "medical"], | |
| "genomics": ["genomic", "sequencing", "DNA"], | |
| "drug_discovery": ["drug", "compound", "molecular"], | |
| "deep_learning_medicine": ["deep learning", "neural network", "AI"] | |
| } | |
| booster = domain_boosters.get(domain, []) | |
| all_terms = important_words + booster[:2] # Add 2 boosters | |
| # Create PubMed query - recent papers prioritized | |
| query = " AND ".join([f'"{term}"' for term in all_terms if term]) | |
| # Add recency filter for "latest" queries | |
| if "latest" in user_query.lower() or "recent" in user_query.lower(): | |
| query += " AND (\"2024\"[Date - Publication] : \"3000\"[Date - Publication])" | |
| return query if query else user_query | |
| def _optimize_for_arxiv(self, user_query: str, domain: str) -> str: | |
| """Convert natural user query to effective arXiv search""" | |
| # For arXiv, we mainly use categories from domain mapping | |
| # But we can also do text search within those categories | |
| words = user_query.lower().split() | |
| important_words = [word for word in words if len(word) > 3 and word not in [ | |
| 'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for' | |
| ]] | |
| return " ".join(important_words) | |
| def _rank_by_relevance(self, papers: List[Dict], user_query: str, domain: str) -> List[Dict]: | |
| """Rank papers by how relevant they are to the user's specific query""" | |
| if not papers: | |
| return [] | |
| query_terms = [term for term in user_query.lower().split() if len(term) > 3] | |
| scored_papers = [] | |
| for paper in papers: | |
| score = 0 | |
| title = paper.get('title', '').lower() | |
| abstract = paper.get('abstract', '').lower() | |
| content = f"{title} {abstract}" | |
| # Score based on term matches | |
| for term in query_terms: | |
| if term in title: | |
| score += 3 # Term in title is very important | |
| if term in abstract: | |
| score += 1 # Term in abstract is good | |
| # Boost recent papers for "latest" queries | |
| if "latest" in user_query.lower() and self._is_recent(paper): | |
| score += 2 | |
| # Boost papers from the correct domain | |
| if paper.get('domain') == domain: | |
| score += 1 | |
| # Penalize fallback papers slightly (they're lower quality) | |
| if paper.get('is_fallback', False): | |
| score -= 1 | |
| scored_papers.append((paper, score)) | |
| # Sort by relevance score (highest first) | |
| scored_papers.sort(key=lambda x: x[1], reverse=True) | |
| return [paper for paper, score in scored_papers if score > 0] | |
| def _is_recent(self, paper: Dict) -> bool: | |
| """Check if paper is recent (within last 6 months)""" | |
| # This is simplified - you'd want more robust date parsing | |
| pub_date = paper.get('publication_date', '') | |
| return '2024' in pub_date or '2025' in pub_date # Basic check for current years |