MedSearchPro / data_sources /real_time_searcher.py
paulhemb's picture
Initial Backend Deployment
1367957
# data_sources/real_time_searcher.py
"""
Enhanced Real-Time Searcher with Multiple Data Sources and Smart Fallback System
Now supports 9 data sources with priority-based searching
"""
import requests
import time
from typing import List, Dict, Optional
from .pubmed_client import PubMedClient
from .arxiv_client import ArXivClient
from .europe_pmc_client import EuropePMCClient
from .biorxiv_medrxiv_client import BiorxivMedrxivClient
from .semantic_scholar_client import SemanticScholarClient
from .crossref_client import CrossrefClient
from .biomed_central_client import BioMedCentralClient
from .core_client import CoreClient
from processing.paper_processor import PaperProcessor
class RealTimeSearcher:
"""
Advanced real-time searcher with primary sources and intelligent fallback
Handles 9 different academic APIs with rate limiting and error recovery
"""
def __init__(self):
# ==================== PRIMARY SOURCES ====================
# High-quality, reliable sources (always try these first)
self.primary_clients = {
'pubmed': PubMedClient(),
'europe_pmc': EuropePMCClient(),
'biomed_central': BioMedCentralClient(), # NEW - High quality medical content
'arxiv': ArXivClient(),
'medrxiv': BiorxivMedrxivClient(),
'biorxiv': BiorxivMedrxivClient(),
'semantic_scholar': SemanticScholarClient(),
'crossref': CrossrefClient()
}
# ==================== FALLBACK SOURCES ====================
# Use only when primary sources return insufficient results
self.fallback_clients = {
'core': CoreClient() # Massive but variable quality
}
self.processor = PaperProcessor()
# ==================== SOURCE CONFIGURATION ====================
self.source_config = {
# PRIMARY SOURCES - Tier 1: Most reliable and high-quality
'pubmed': {
'delay': 0.3,
'priority': 1,
'description': 'NIH biomedical database - High quality medical research',
'max_retries': 2
},
'europe_pmc': {
'delay': 0.3,
'priority': 1,
'description': 'European biomedical repository - Alternative to PubMed',
'max_retries': 2
},
'biomed_central': {
'delay': 0.4,
'priority': 1,
'description': 'Springer Nature open access - High quality medical journals',
'max_retries': 2,
'requires_key': True
},
# PRIMARY SOURCES - Tier 2: Specialized but reliable
'arxiv': {
'delay': 0.5,
'priority': 2,
'description': 'Preprint server - Latest research before publication',
'max_retries': 1
},
'medrxiv': {
'delay': 0.4,
'priority': 2,
'description': 'Medical preprints - Latest medical research',
'max_retries': 1
},
'biorxiv': {
'delay': 0.4,
'priority': 2,
'description': 'Biology preprints - Latest biology research',
'max_retries': 1
},
# PRIMARY SOURCES - Tier 3: Supplementary sources
'semantic_scholar': {
'delay': 0.2,
'priority': 3,
'description': 'AI-powered search - Citation and recommendation engine',
'max_retries': 2
},
'crossref': {
'delay': 0.2,
'priority': 3,
'description': 'DOI metadata - Reference and citation data',
'max_retries': 2
},
# FALLBACK SOURCES - Use only when needed
'core': {
'delay': 0.6,
'priority': 99,
'description': 'Massive repository - Comprehensive but variable quality',
'max_retries': 1,
'fallback_only': True,
'requires_key': True
}
}
def search_user_query(self, user_query: str, domain: str, max_results: int = 20, use_fallback: bool = False) -> \
List[Dict]:
"""
Main search method with intelligent source selection and fallback logic
Args:
user_query: User's search query
domain: Medical domain (e.g., 'medical_imaging', 'genomics')
max_results: Maximum number of papers to return
use_fallback: Whether to use fallback sources if primary sources return few results
Returns:
List of relevant papers with source metadata
"""
print(f"🔍 Real-time search for: '{user_query}'")
print(f" Domain: {domain}")
print(f" Max results: {max_results}")
print(f" Fallback enabled: {use_fallback}")
all_papers = []
search_start_time = time.time()
try:
# Get domain configuration to determine which sources to use
from config.domains import get_domain_config
domain_config = get_domain_config(domain)
sources_to_use = domain_config.get('sources', ['pubmed', 'arxiv'])
print(f" Sources configured: {', '.join(sources_to_use)}")
# PHASE 1: Search primary sources
primary_papers = self._search_primary_sources(sources_to_use, user_query, domain, max_results)
all_papers.extend(primary_papers)
primary_search_time = time.time() - search_start_time
# PHASE 2: Conditionally use fallback sources
fallback_papers = []
should_use_fallback = (
use_fallback and
len(primary_papers) < max_results // 2 and
primary_search_time < 10 # Don't use fallback if primary search was too slow
)
if should_use_fallback:
print(f" 🔄 Low results from primary sources ({len(primary_papers)}), activating fallback...")
fallback_papers = self._search_fallback_sources(user_query, domain, max_results // 3)
# Mark fallback papers for tracking
for paper in fallback_papers:
paper['is_fallback'] = True
paper['fallback_reason'] = f"Primary sources returned only {len(primary_papers)} papers"
all_papers.extend(fallback_papers)
print(f" 📊 Fallback added {len(fallback_papers)} papers")
# Process and rank results
processed_papers = self.processor.process_papers(all_papers)
relevant_papers = self._rank_by_relevance(processed_papers, user_query, domain)
total_search_time = time.time() - search_start_time
# Generate search statistics
source_breakdown = self._generate_source_breakdown(processed_papers)
print(f" ✅ Search completed in {total_search_time:.1f}s")
print(f" 📈 Results: {len(relevant_papers)} relevant papers")
print(f" 📚 Sources: {source_breakdown}")
return relevant_papers[:max_results]
except Exception as e:
print(f"❌ Real-time search error: {e}")
import traceback
traceback.print_exc()
return []
def _search_primary_sources(self, sources: List[str], query: str, domain: str, max_results: int) -> List[Dict]:
"""
Search primary sources with optimized query and error handling
"""
papers = []
successful_sources = 0
failed_sources = []
# Sort sources by priority
sorted_sources = sorted(sources, key=lambda s: self.source_config.get(s, {}).get('priority', 999))
for source in sorted_sources:
if source not in self.primary_clients:
continue
try:
source_config = self.source_config.get(source, {})
max_source_results = max_results // len([s for s in sources if s in self.primary_clients])
print(f" 🔍 Primary: {source} (priority {source_config.get('priority', 'N/A')})")
# Optimize query for this specific source
optimized_query = self._optimize_query_for_source(query, domain, source)
source_papers = self._search_single_source_with_retry(
source, optimized_query, domain, max_source_results
)
papers.extend(source_papers)
successful_sources += 1
print(f" ✅ {source}: found {len(source_papers)} papers")
# Respect rate limits
time.sleep(source_config.get('delay', 0.3))
except Exception as e:
error_msg = f"{source} failed: {e}"
print(f" ❌ {error_msg}")
failed_sources.append(error_msg)
continue
print(
f" 📊 Primary search: {successful_sources}/{len([s for s in sources if s in self.primary_clients])} sources successful")
if failed_sources:
print(f" ⚠️ Failed: {', '.join(failed_sources)}")
return papers
def _search_fallback_sources(self, query: str, domain: str, max_results: int) -> List[Dict]:
"""
Search fallback sources - used only when primary sources are insufficient
"""
papers = []
successful_fallbacks = 0
for source_name, client in self.fallback_clients.items():
try:
source_config = self.source_config.get(source_name, {})
print(f" 🛡️ Fallback: {source_name}")
# Use simpler query for fallback sources (they're less precise)
fallback_query = self._simplify_query_for_fallback(query)
source_papers = self._search_single_source_with_retry(
source_name, fallback_query, domain, max_results
)
# Add fallback metadata
for paper in source_papers:
paper['is_fallback'] = True
paper['fallback_source'] = source_name
papers.extend(source_papers)
successful_fallbacks += 1
print(f" ✅ {source_name}: found {len(source_papers)} papers")
# Extra conservative delay for fallback sources
time.sleep(source_config.get('delay', 0.6))
except Exception as e:
print(f" ⚠️ Fallback source {source_name} failed: {e}")
continue
print(f" 📊 Fallback search: {successful_fallbacks}/{len(self.fallback_clients)} sources successful")
return papers
def _search_single_source_with_retry(self, source: str, query: str, domain: str, max_results: int,
retry_count: int = 0) -> List[Dict]:
"""
Search a single source with retry logic and better error handling
"""
max_retries = self.source_config.get(source, {}).get('max_retries', 1)
try:
if source in self.primary_clients:
client = self.primary_clients[source]
elif source in self.fallback_clients:
client = self.fallback_clients[source]
else:
return []
# Handle special cases for bioRxiv/medRxiv
if source == 'medrxiv':
papers = client.search_papers(query, server="medrxiv", max_results=max_results)
elif source == 'biorxiv':
papers = client.search_papers(query, server="biorxiv", max_results=max_results)
else:
papers = client.search_papers(query, max_results=max_results)
# If we got papers, return them even if there were warnings
if papers:
return papers
else:
# No papers found, but no error - this is fine
return []
except Exception as e:
# Check if this is just an API key warning (not a real error)
error_msg = str(e).lower()
if 'api key' in error_msg or 'key' in error_msg:
print(f" ⚠️ {source}: API key needed for full access")
return [] # Return empty instead of failing completely
if retry_count < max_retries:
print(f" 🔄 Retrying {source} ({retry_count + 1}/{max_retries})...")
time.sleep(1)
return self._search_single_source_with_retry(source, query, domain, max_results, retry_count + 1)
else:
print(f" ❌ {source} failed after {max_retries} retries: {e}")
return []
def _optimize_query_for_source(self, user_query: str, domain: str, source: str) -> str:
"""
Optimize search query for specific data source
"""
from config.domains import get_domain_config
domain_config = get_domain_config(domain)
# Get source-specific queries if available
source_queries_key = f"{source}_queries"
if source_queries_key in domain_config and domain_config[source_queries_key]:
# Use the first source-specific query as base
base_query = domain_config[source_queries_key][0]
# Add user query terms
important_terms = self._extract_important_terms(user_query)
return f"({base_query}) AND ({important_terms})"
# Fallback to general optimization
if source == 'pubmed':
return self._optimize_for_pubmed(user_query, domain)
elif source == 'arxiv':
return self._optimize_for_arxiv(user_query, domain)
else:
return user_query # For other sources, use original query
def _simplify_query_for_fallback(self, user_query: str) -> str:
"""
Simplify query for fallback sources (they handle complex queries poorly)
"""
# Extract key terms and remove common stop words
words = user_query.lower().split()
important_words = [word for word in words if len(word) > 3 and word not in [
'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this',
'research', 'study', 'studies', 'review', 'method', 'methods'
]]
return ' '.join(important_words[:5]) # Limit to 5 most important terms
def _extract_important_terms(self, user_query: str) -> str:
"""Extract important terms from user query"""
words = user_query.lower().split()
important_terms = [word for word in words if len(word) > 3 and word not in [
'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this'
]]
return ' '.join(important_terms)
def _generate_source_breakdown(self, papers: List[Dict]) -> Dict[str, int]:
"""Generate breakdown of which sources contributed papers"""
source_count = {}
for paper in papers:
source = paper.get('source', 'unknown')
source_count[source] = source_count.get(source, 0) + 1
return source_count
def get_system_status(self) -> Dict:
"""Get comprehensive status of all data sources"""
primary_sources = list(self.primary_clients.keys())
fallback_sources = list(self.fallback_clients.keys())
sources_requiring_keys = [
source for source in primary_sources + fallback_sources
if self.source_config.get(source, {}).get('requires_key', False)
]
return {
'primary_sources_count': len(primary_sources),
'fallback_sources_count': len(fallback_sources),
'total_sources': len(primary_sources) + len(fallback_sources),
'primary_sources': primary_sources,
'fallback_sources': fallback_sources,
'sources_requiring_keys': sources_requiring_keys,
'source_descriptions': {
source: config.get('description', 'No description')
for source, config in self.source_config.items()
}
}
def test_source_connectivity(self) -> Dict[str, bool]:
"""
Test connectivity to all data sources
Useful for diagnostics and monitoring
"""
connectivity_results = {}
test_query = "machine learning" # Simple test query
# Test primary sources
for source_name in self.primary_clients.keys():
try:
print(f"Testing connectivity to {source_name}...")
test_papers = self._search_single_source_with_retry(
source_name, test_query, "medical_imaging", 2
)
connectivity_results[source_name] = len(test_papers) > 0
print(f" {source_name}: {'✅ OK' if connectivity_results[source_name] else '❌ FAILED'}")
time.sleep(0.5) # Be nice to APIs
except Exception as e:
connectivity_results[source_name] = False
print(f" {source_name}: ❌ FAILED - {e}")
# Test fallback sources
for source_name in self.fallback_clients.keys():
try:
print(f"Testing connectivity to {source_name}...")
test_papers = self._search_single_source_with_retry(
source_name, test_query, "medical_imaging", 2
)
connectivity_results[source_name] = len(test_papers) > 0
print(f" {source_name}: {'✅ OK' if connectivity_results[source_name] else '❌ FAILED'}")
time.sleep(1) # Extra delay for fallback sources
except Exception as e:
connectivity_results[source_name] = False
print(f" {source_name}: ❌ FAILED - {e}")
return connectivity_results
# ==================== EXISTING OPTIMIZATION METHODS ====================
def _optimize_for_pubmed(self, user_query: str, domain: str) -> str:
"""Convert natural user query to effective PubMed search syntax"""
words = user_query.lower().split()
# Keep important keywords
important_words = [word for word in words if len(word) > 3 and word not in [
'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for', 'this'
]]
# Add domain-specific terms to improve relevance
domain_boosters = {
"medical_imaging": ["imaging", "radiology", "medical"],
"genomics": ["genomic", "sequencing", "DNA"],
"drug_discovery": ["drug", "compound", "molecular"],
"deep_learning_medicine": ["deep learning", "neural network", "AI"]
}
booster = domain_boosters.get(domain, [])
all_terms = important_words + booster[:2] # Add 2 boosters
# Create PubMed query - recent papers prioritized
query = " AND ".join([f'"{term}"' for term in all_terms if term])
# Add recency filter for "latest" queries
if "latest" in user_query.lower() or "recent" in user_query.lower():
query += " AND (\"2024\"[Date - Publication] : \"3000\"[Date - Publication])"
return query if query else user_query
def _optimize_for_arxiv(self, user_query: str, domain: str) -> str:
"""Convert natural user query to effective arXiv search"""
# For arXiv, we mainly use categories from domain mapping
# But we can also do text search within those categories
words = user_query.lower().split()
important_words = [word for word in words if len(word) > 3 and word not in [
'latest', 'papers', 'about', 'what', 'how', 'are', 'the', 'with', 'for'
]]
return " ".join(important_words)
def _rank_by_relevance(self, papers: List[Dict], user_query: str, domain: str) -> List[Dict]:
"""Rank papers by how relevant they are to the user's specific query"""
if not papers:
return []
query_terms = [term for term in user_query.lower().split() if len(term) > 3]
scored_papers = []
for paper in papers:
score = 0
title = paper.get('title', '').lower()
abstract = paper.get('abstract', '').lower()
content = f"{title} {abstract}"
# Score based on term matches
for term in query_terms:
if term in title:
score += 3 # Term in title is very important
if term in abstract:
score += 1 # Term in abstract is good
# Boost recent papers for "latest" queries
if "latest" in user_query.lower() and self._is_recent(paper):
score += 2
# Boost papers from the correct domain
if paper.get('domain') == domain:
score += 1
# Penalize fallback papers slightly (they're lower quality)
if paper.get('is_fallback', False):
score -= 1
scored_papers.append((paper, score))
# Sort by relevance score (highest first)
scored_papers.sort(key=lambda x: x[1], reverse=True)
return [paper for paper, score in scored_papers if score > 0]
def _is_recent(self, paper: Dict) -> bool:
"""Check if paper is recent (within last 6 months)"""
# This is simplified - you'd want more robust date parsing
pub_date = paper.get('publication_date', '')
return '2024' in pub_date or '2025' in pub_date # Basic check for current years