MedSearchPro / main.py
paulhemb's picture
added main.py
4343ac0
# main.py (COMPREHENSIVE UPDATED VERSION - 55+ Medical Specialties)
from data_sources.pubmed_client import PubMedClient
from data_sources.arxiv_client import ArXivClient
from data_sources.real_time_searcher import RealTimeSearcher
from processing.paper_processor import PaperProcessor
from config.domains import (
get_domain_config, get_all_domains, validate_domain,
get_primary_sources, get_fallback_sources, get_sources_requiring_keys,
get_domain_description, get_domain_display_name
)
import time
from typing import Dict, List
import json
class MedicalResearchEngine:
"""
COMPREHENSIVE Medical Research Chatbot Engine
Now with 55+ medical specialties and intelligent domain detection
"""
def __init__(self):
self.pubmed_client = PubMedClient()
self.arxiv_client = ArXivClient()
self.real_time_searcher = RealTimeSearcher()
self.processor = PaperProcessor()
self.pre_collected_papers = {}
# Enhanced tracking for comprehensive domains
self.search_stats = {
'total_searches': 0,
'successful_searches': 0,
'fallback_activations': 0,
'domains_used': {},
'average_results': 0,
'comprehensive_domains': len(get_all_domains())
}
def answer_user_query(self, user_query: str, domain: str, use_real_time: bool = True,
use_fallback: bool = False) -> Dict:
"""
Enhanced main method with comprehensive domain support
Args:
user_query: User's search question
domain: Medical domain to search in (35+ specialties)
use_real_time: Whether to search APIs in real-time
use_fallback: Whether to allow fallback sources
Returns:
Comprehensive response with papers, sources, and metadata
"""
self.search_stats['total_searches'] += 1
print(f"🎯 Processing user query: '{user_query}'")
print(f" Domain: {domain} ({get_domain_display_name(domain)})")
print(f" Description: {get_domain_description(domain)}")
print(f" Real-time: {use_real_time}")
print(f" Fallback: {use_fallback}")
# Validate domain with comprehensive list
if not validate_domain(domain):
available_domains = get_all_domains()
error_msg = f"Error: Unknown domain '{domain}'. Available domains: {', '.join(available_domains[:10])}... ({len(available_domains)} total)"
return self._create_error_response(error_msg)
# Track domain usage
self.search_stats['domains_used'][domain] = self.search_stats['domains_used'].get(domain, 0) + 1
relevant_papers = []
search_start_time = time.time()
if use_real_time:
print(f" πŸ” Using real-time search for {get_domain_display_name(domain)}...")
relevant_papers = self.real_time_searcher.search_user_query(
user_query, domain, max_results=20, use_fallback=use_fallback
)
# Tag papers with comprehensive domain info
for paper in relevant_papers:
paper['search_domain'] = domain
paper['domain_display_name'] = get_domain_display_name(domain)
paper['domain_description'] = get_domain_description(domain)
# Track fallback usage
if any(paper.get('is_fallback', False) for paper in relevant_papers):
self.search_stats['fallback_activations'] += 1
else:
print(f" πŸ’Ύ Using pre-collected database for {domain}...")
if not self.pre_collected_papers:
self.pre_collected_papers = self.collect_all_domains()
domain_papers = self.pre_collected_papers.get(domain, [])
relevant_papers = self._filter_pre_collected(domain_papers, user_query)
search_time = time.time() - search_start_time
# Generate enhanced answer with domain context
answer = self._generate_comprehensive_answer(user_query, relevant_papers, domain, search_time)
# Update success statistics
if relevant_papers:
self.search_stats['successful_searches'] += 1
self.search_stats['average_results'] = (
(self.search_stats['average_results'] * (self.search_stats['successful_searches'] - 1) + len(
relevant_papers))
/ self.search_stats['successful_searches']
)
# Create comprehensive response
return self._create_comprehensive_response(
user_query, domain, answer, relevant_papers, search_time, use_fallback
)
def _create_comprehensive_response(self, user_query: str, domain: str, answer: str,
papers: List[Dict], search_time: float, use_fallback: bool) -> Dict:
"""Create a comprehensive response with all metadata"""
source_breakdown = self._analyze_sources(papers)
fallback_papers = [p for p in papers if p.get('is_fallback', False)]
return {
"query": user_query,
"domain": domain,
"domain_display_name": get_domain_display_name(domain),
"domain_description": get_domain_description(domain),
"answer": answer,
"supporting_papers": papers[:15], # Increased to 15 for better coverage
"total_papers_found": len(papers),
"search_time_seconds": round(search_time, 2),
"search_type": "real_time",
"sources_used": source_breakdown,
"fallback_used": len(fallback_papers) > 0,
"fallback_papers_count": len(fallback_papers),
"primary_papers_count": len(papers) - len(fallback_papers),
"search_id": f"search_{int(time.time())}_{hash(user_query) % 10000}",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"comprehensive_domain": True
}
def _create_error_response(self, error_message: str) -> Dict:
"""Create standardized error response"""
return {
"query": "",
"domain": "",
"answer": error_message,
"supporting_papers": [],
"total_papers_found": 0,
"search_time_seconds": 0,
"search_type": "error",
"sources_used": {},
"fallback_used": False,
"fallback_papers_count": 0,
"primary_papers_count": 0,
"domain_display_name": "",
"search_id": f"error_{int(time.time())}",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"comprehensive_domain": False
}
def _generate_comprehensive_answer(self, user_query: str, papers: List[Dict], domain: str,
search_time: float) -> str:
"""Generate intelligent answer with domain-specific context"""
if not papers:
suggestions = self._get_search_suggestions(user_query, domain)
return f"I couldn't find recent {get_domain_display_name(domain)} research papers specifically addressing '{user_query}'. {suggestions}"
# Analyze paper characteristics
recent_count = sum(1 for p in papers if self._is_recent(p))
preprint_count = sum(1 for p in papers if p.get('is_preprint', False))
fallback_count = sum(1 for p in papers if p.get('is_fallback', False))
# Get top papers for mention
top_papers = papers[:3]
paper_mentions = []
for i, paper in enumerate(top_papers):
source_info = f"({paper['source']})"
if paper.get('is_preprint', False):
source_info += " πŸ“„" # Preprint indicator
if paper.get('is_fallback', False):
source_info += " πŸ›‘οΈ" # Fallback indicator
paper_mentions.append(f"'{paper['title']}' {source_info}")
# Build comprehensive answer
answer_parts = []
# Domain-specific header
answer_parts.append(f"## {get_domain_display_name(domain)} Analysis\n")
answer_parts.append(f"{get_domain_description(domain)}\n")
# Results summary
answer_parts.append(f"**Search Results:** Found {len(papers)} relevant papers ")
# Add context about results
if recent_count == len(papers):
answer_parts.append("(all from 2024-2025) ")
elif recent_count > 0:
answer_parts.append(f"({recent_count} from 2024-2025) ")
if preprint_count > 0:
answer_parts.append(f"including {preprint_count} preprints ")
if fallback_count > 0:
answer_parts.append(f"(used {fallback_count} fallback sources) ")
answer_parts.append(f"in {search_time:.1f}s.\n\n")
# Add paper highlights
answer_parts.append("**Key Papers Found:**\n")
for i, mention in enumerate(paper_mentions, 1):
answer_parts.append(f"{i}. {mention}\n")
# Add domain-specific insights
domain_insight = self._get_domain_insight(domain, papers)
if domain_insight:
answer_parts.append(f"\n**Domain Insight:** {domain_insight}\n")
# Sources used
sources = list(set(p.get('source', 'Unknown') for p in papers))
answer_parts.append(f"\n**Sources:** {', '.join(sources)}\n")
# Next steps
answer_parts.append(
f"\n**Next:** Use the enhanced RAG engine for detailed {get_domain_display_name(domain)} analysis, comparisons, and clinical insights.")
return "".join(answer_parts)
def _get_search_suggestions(self, user_query: str, domain: str) -> str:
"""Provide helpful search suggestions when no papers are found"""
domain_name = get_domain_display_name(domain)
suggestions = [
f"Try using more specific {domain_name.lower()} terminology.",
f"Consider broadening your search to related {domain_name.lower()} sub-specialties.",
f"Check for spelling variations in {domain_name.lower()} terms.",
f"Enable fallback sources for wider {domain_name.lower()} coverage."
]
return " ".join(suggestions[:2])
def _get_domain_insight(self, domain: str, papers: List[Dict]) -> str:
"""Provide domain-specific insights based on found papers"""
insights = {
"oncology": f"Trend: {sum(1 for p in papers if 'immunotherapy' in p.get('title', '').lower() or 'immunotherapy' in p.get('abstract', '').lower())} papers focus on immunotherapy.",
"cardiology": f"Focus: {sum(1 for p in papers if 'prevention' in p.get('title', '').lower() or 'prevention' in p.get('abstract', '').lower())} papers emphasize preventive cardiology.",
"neurology": f"Note: {sum(1 for p in papers if 'Alzheimer' in p.get('title', '') or 'dementia' in p.get('title', '').lower())} papers address dementia research.",
"infectious_disease": f"Observation: {sum(1 for p in papers if 'resistance' in p.get('title', '').lower() or 'resistance' in p.get('abstract', '').lower())} papers discuss antimicrobial resistance.",
"endocrinology": f"Update: {sum(1 for p in papers if 'diabetes' in p.get('title', '').lower())} papers focus on diabetes management.",
"pulmonology": f"Focus: {sum(1 for p in papers if 'COPD' in p.get('title', '') or 'asthma' in p.get('title', ''))} papers address chronic respiratory diseases.",
"gastroenterology": f"Research: {sum(1 for p in papers if 'IBD' in p.get('title', '') or 'inflammatory bowel' in p.get('title', '').lower())} papers focus on inflammatory bowel disease.",
"psychiatry": f"Trend: {sum(1 for p in papers if 'depression' in p.get('title', '').lower() or 'anxiety' in p.get('title', '').lower())} papers address mental health disorders.",
"surgery": f"Advancement: {sum(1 for p in papers if 'robotic' in p.get('title', '').lower() or 'minimally invasive' in p.get('title', '').lower())} papers discuss surgical innovations.",
"pediatrics": f"Focus: {sum(1 for p in papers if 'pediatric' in p.get('title', '').lower() or 'child' in p.get('title', '').lower())} papers address child health."
}
return insights.get(domain,
f"Research spans {len(set(p['source'] for p in papers))} different sources with {len(papers)} relevant studies.")
def _filter_pre_collected(self, papers: List[Dict], user_query: str) -> List[Dict]:
"""Filter pre-collected papers by user query relevance"""
query_terms = [term for term in user_query.lower().split() if len(term) > 3]
relevant = []
for paper in papers:
content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower()
if any(term in content for term in query_terms):
relevant.append(paper)
return relevant
def _analyze_sources(self, papers: List[Dict]) -> Dict[str, int]:
"""Analyze which sources contributed papers"""
source_count = {}
for paper in papers:
source = paper.get('source', 'unknown')
source_count[source] = source_count.get(source, 0) + 1
return source_count
def _is_recent(self, paper: Dict) -> bool:
"""Check if paper is recent (2024-2025)"""
pub_date = paper.get('publication_date', '')
return '2024' in pub_date or '2025' in pub_date
def collect_domain_data(self, domain: str, max_papers: int = 100) -> List[Dict]:
"""Collect data for a specific domain"""
print(f"Collecting data for domain: {get_domain_display_name(domain)}")
if not validate_domain(domain):
print(f"Unknown domain: {domain}")
return []
config = get_domain_config(domain)
all_papers = []
# PubMed papers
if 'pubmed' in config.get('sources', []):
for query in config.get('pubmed_queries', []):
print(f" Searching PubMed: {query}")
papers = self.pubmed_client.search_papers(query, max_results=20)
all_papers.extend(papers)
time.sleep(0.5)
# ArXiv papers
if 'arxiv' in config.get('sources', []):
for category in config.get('arxiv_categories', []):
print(f" Searching ArXiv: {category}")
papers = self.arxiv_client.search_papers(category, max_results=20)
all_papers.extend(papers)
time.sleep(1)
processed_papers = self.processor.process_papers(all_papers)
print(f" Collected {len(processed_papers)} unique papers for {get_domain_display_name(domain)}")
return processed_papers[:max_papers]
def collect_all_domains(self) -> Dict[str, List[Dict]]:
"""Collect data for all domains"""
domain_data = {}
for domain in get_all_domains():
papers = self.collect_domain_data(domain)
domain_data[domain] = papers
return domain_data
# ==================== ENHANCED TESTING METHODS ====================
def get_system_status(self) -> Dict:
"""Get comprehensive system status"""
try:
search_stats = self.real_time_searcher.get_system_status()
except:
search_stats = {"total_sources": 0, "primary_sources_count": 0, "fallback_sources_count": 0}
return {
**search_stats,
"engine_stats": self.search_stats,
"total_domains": len(get_all_domains()),
"domains_available": get_all_domains(),
"sources_requiring_keys": get_sources_requiring_keys(),
"system_uptime": "Active",
"last_search_time": self.search_stats.get('total_searches', 0),
"comprehensive_support": True,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
def test_system_connectivity(self) -> Dict:
"""Test connectivity to all data sources"""
print("πŸ”§ Testing System Connectivity...")
print("=" * 60)
try:
connectivity_results = self.real_time_searcher.test_source_connectivity()
except Exception as e:
print(f"❌ Connectivity test failed: {e}")
return {
"connectivity_results": {},
"working_sources": [],
"failed_sources": [],
"success_rate": 0
}
# Summarize results
working_sources = [source for source, status in connectivity_results.items() if status]
failed_sources = [source for source, status in connectivity_results.items() if not status]
print(f"\nπŸ“Š Connectivity Summary:")
print(f" βœ… Working: {len(working_sources)} sources")
print(f" ❌ Failed: {len(failed_sources)} sources")
if working_sources:
print(f" 🟒 Active: {', '.join(working_sources)}")
if failed_sources:
print(f" πŸ”΄ Issues: {', '.join(failed_sources)}")
return {
"connectivity_results": connectivity_results,
"working_sources": working_sources,
"failed_sources": failed_sources,
"success_rate": len(working_sources) / len(connectivity_results) if connectivity_results else 0
}
def test_comprehensive_domains(self, max_domains: int = 5):
"""Quick test to verify comprehensive domains work"""
print("πŸ§ͺ Testing Comprehensive Medical Domains")
print("=" * 60)
results = {}
available_domains = get_all_domains()
# Select a subset of domains for testing
test_domains = available_domains[:min(max_domains, len(available_domains))]
for domain in test_domains:
print(f"\nπŸ”¬ Testing: {get_domain_display_name(domain)}")
print(f" Domain ID: {domain}")
print(f" Description: {get_domain_description(domain)}")
config = get_domain_config(domain)
print(f" Sources: {config.get('sources', [])}")
print(f" PubMed queries: {len(config.get('pubmed_queries', []))}")
# Test a simple query
domain_keywords = domain.replace('_', ' ')
test_query = f"recent advances in {domain_keywords}"
try:
result = self.answer_user_query(test_query, domain, use_real_time=True, use_fallback=False)
results[domain] = {
'papers_found': result['total_papers_found'],
'sources_used': result['sources_used'],
'search_time': result['search_time_seconds'],
'domain_display_name': get_domain_display_name(domain)
}
print(f" Papers found: {result['total_papers_found']}")
print(f" Search time: {result['search_time_seconds']}s")
print(f" Sources: {result['sources_used']}")
except Exception as e:
print(f" ❌ Test failed: {e}")
results[domain] = {
'papers_found': 0,
'sources_used': {},
'search_time': 0,
'error': str(e)
}
time.sleep(2) # Be nice to APIs
# Summary
print(f"\nπŸ“ˆ COMPREHENSIVE DOMAIN TESTING SUMMARY:")
total_papers = sum(r.get('papers_found', 0) for r in results.values())
successful_tests = sum(1 for r in results.values() if r.get('papers_found', 0) > 0)
avg_papers = total_papers / successful_tests if successful_tests > 0 else 0
print(f" Total papers found: {total_papers}")
print(f" Average per domain: {avg_papers:.1f}")
print(f" Domains tested: {len(results)}")
print(f" Successful tests: {successful_tests}")
print(f" Total domains available: {len(available_domains)}")
return results
def test_fallback_system(self):
"""Test the fallback source system"""
print("πŸ›‘οΈ Testing Fallback System")
print("=" * 50)
# Test system status
status = self.get_system_status()
print(f"Primary sources: {status.get('primary_sources_count', 0)}")
print(f"Fallback sources: {status.get('fallback_sources_count', 0)}")
print(f"Total sources: {status.get('total_sources', 0)}")
test_queries = [
("oncology", "immunotherapy for lung cancer"),
("cardiology", "new treatments for heart failure"),
("neurology", "Alzheimer's disease biomarkers"),
("endocrinology", "SGLT2 inhibitors diabetes"),
("pulmonology", "COPD management guidelines")
]
for domain, query in test_queries[:3]: # Test first 3 for speed
print(f"\nπŸ” Testing: {get_domain_display_name(domain)} - '{query}'")
# Test without fallback
print(" πŸ”’ WITHOUT fallback:")
result_no_fallback = self.answer_user_query(query, domain, use_real_time=True, use_fallback=False)
print(f" Papers: {result_no_fallback['total_papers_found']}")
print(f" Sources: {result_no_fallback['sources_used']}")
print(f" Fallback used: {result_no_fallback['fallback_used']}")
time.sleep(2)
# Test with fallback
print(" πŸ”“ WITH fallback:")
result_with_fallback = self.answer_user_query(query, domain, use_real_time=True, use_fallback=True)
print(f" Papers: {result_with_fallback['total_papers_found']}")
print(f" Sources: {result_with_fallback['sources_used']}")
print(f" Fallback used: {result_with_fallback['fallback_used']}")
print(f" Fallback papers: {result_with_fallback['fallback_papers_count']}")
time.sleep(3) # Extra delay between domains
def interactive_test(self):
"""Interactive testing mode with comprehensive domain support"""
print("\nπŸ’¬ COMPREHENSIVE INTERACTIVE TESTING MODE")
print("=" * 60)
available_domains = get_all_domains()
print(f"πŸ“š Available domains ({len(available_domains)} medical specialties):")
# Display domains in categories
domain_categories = {
"πŸ₯ Internal Medicine": ["internal_medicine", "endocrinology", "gastroenterology", "pulmonology",
"nephrology", "hematology"],
"🦠 Infectious": ["infectious_disease"],
"πŸ‘Ά Women's Health": ["obstetrics_gynecology"],
"πŸ”¬ Lab & Pathology": ["pathology", "laboratory_medicine"],
"🧬 Biomedical Sciences": ["bioinformatics", "genomics", "pharmacology"],
"🩺 Medical Specialties": ["medical_imaging", "oncology", "cardiology", "neurology", "psychiatry"],
"πŸ”ͺ Surgical Specialties": ["surgery", "orthopedics", "urology", "ophthalmology"],
"πŸ‘Ά Pediatrics": ["pediatrics"],
"πŸš‘ Emergency & Critical Care": ["emergency_medicine", "critical_care"],
"🩺 Other Specialties": ["dermatology", "pain_medicine", "nutrition", "allergy_immunology",
"rehabilitation_medicine"],
"πŸ“Š Research & Public Health": ["clinical_research", "public_health"],
"🌐 General": ["general_medical", "auto"]
}
for category, domains in domain_categories.items():
print(f"\n{category}:")
for domain in domains:
if domain in available_domains:
print(f" β€’ {get_domain_display_name(domain)} ({domain})")
print("\nπŸ“ Commands: 'quit' to exit, 'status' for system status, 'test' for connectivity test, 'domains' to list all domains")
while True:
print("\n" + "=" * 50)
command = input("\nEnter domain name or command: ").strip().lower()
if command == 'quit':
break
elif command == 'status':
status = self.get_system_status()
print(f"\nπŸ“Š SYSTEM STATUS:")
print(f" Total searches: {status['engine_stats']['total_searches']}")
print(
f" Successful searches: {status['engine_stats']['successful_searches']}/{status['engine_stats']['total_searches']}")
print(f" Average results: {status['engine_stats'].get('average_results', 0):.1f}")
print(f" Total domains: {status['total_domains']}")
print(f" Sources: {status.get('total_sources', 0)} total")
print(f" Comprehensive support: {'βœ…' if status.get('comprehensive_support', False) else '❌'}")
continue
elif command == 'test':
self.test_system_connectivity()
continue
elif command == 'domains':
print(f"\nπŸ“‹ ALL DOMAINS ({len(available_domains)}):")
for i, domain in enumerate(available_domains, 1):
print(f"{i:3d}. {get_domain_display_name(domain)} ({domain})")
continue
# Check if command is a valid domain
domain = command
if not validate_domain(domain):
# Try to find domain by display name or partial match
matching_domains = [d for d in available_domains if
command in d or command in get_domain_display_name(d).lower()]
if matching_domains:
if len(matching_domains) == 1:
domain = matching_domains[0]
print(f"βœ… Matched domain: {get_domain_display_name(domain)}")
else:
print(f"πŸ” Multiple matching domains found:")
for match in matching_domains[:5]:
print(f" β€’ {get_domain_display_name(match)} ({match})")
domain = None
else:
print(f"❌ Invalid domain. Available domains: {', '.join(available_domains[:8])}...")
print(f" Type 'domains' to see all {len(available_domains)} specialties.")
continue
if domain is None:
continue
query = input(f"Enter your query for {get_domain_display_name(domain)}: ").strip()
if not query:
print("❌ Query cannot be empty")
continue
use_fallback_input = input("Use fallback sources? (y/n): ").strip().lower()
use_fallback = use_fallback_input == 'y'
print(f"\nπŸ” Searching for: '{query}'")
print(f" Domain: {get_domain_display_name(domain)}")
print(f" Description: {get_domain_description(domain)}")
print(f" Fallback: {'ENABLED' if use_fallback else 'DISABLED'}")
result = self.answer_user_query(query, domain, use_real_time=True, use_fallback=use_fallback)
print(f"\nπŸ“ˆ RESULTS:")
print(f" Answer: {result['answer'][:200]}...")
print(f" Total papers: {result['total_papers_found']}")
print(f" Search time: {result['search_time_seconds']}s")
print(f" Sources used: {result['sources_used']}")
print(f" Fallback used: {result['fallback_used']}")
if result['supporting_papers']:
print(f"\nπŸ“„ Top papers:")
for i, paper in enumerate(result['supporting_papers'][:5]):
source_indicator = "πŸ›‘οΈ " if paper.get('is_fallback') else ""
preprint_indicator = "πŸ“„ " if paper.get('is_preprint') else ""
print(f" {i + 1}. {source_indicator}{preprint_indicator}{paper.get('title', 'Untitled')[:80]}...")
print(
f" Source: {paper.get('source', 'Unknown')} | Domain: {paper.get('search_domain', domain)}")
def show_comprehensive_domain_summary(self):
"""Show comprehensive domain summary"""
print("\nπŸ₯ COMPREHENSIVE MEDICAL DOMAIN SUMMARY")
print("=" * 60)
available_domains = get_all_domains()
print(f"Total medical specialties: {len(available_domains)}")
print(f"Comprehensive domain support: βœ…")
# Group domains by category
domain_categories = {
"Internal Medicine": ["internal_medicine", "endocrinology", "gastroenterology", "pulmonology",
"nephrology", "hematology"],
"Surgical Specialties": ["surgery", "orthopedics", "urology", "ophthalmology"],
"Medical Specialties": ["oncology", "cardiology", "neurology", "psychiatry", "dermatology"],
"Women & Children": ["obstetrics_gynecology", "pediatrics"],
"Emergency & Critical Care": ["emergency_medicine", "critical_care"],
"Lab & Diagnostics": ["pathology", "laboratory_medicine", "medical_imaging"],
"Biomedical Sciences": ["bioinformatics", "genomics", "pharmacology"],
"Research & Public Health": ["clinical_research", "public_health"],
"Other Specialties": ["infectious_disease", "pain_medicine", "nutrition", "allergy_immunology",
"rehabilitation_medicine"],
"General": ["general_medical", "auto"]
}
for category, domains in domain_categories.items():
category_domains = [d for d in domains if d in available_domains]
if category_domains:
print(f"\nπŸ“Œ {category} ({len(category_domains)} specialties):")
for domain in category_domains:
print(f" β€’ {get_domain_display_name(domain)}")
print(f"\nβœ… Ready for comprehensive medical research!")
# ==================== MAIN EXECUTION ====================
def main():
"""Main execution function with comprehensive domain testing"""
engine = MedicalResearchEngine()
print("πŸš€ COMPREHENSIVE MEDICAL RESEARCH CHATBOT")
print("=" * 70)
print("πŸ“š Features: 35+ medical specialties, 9+ data sources, intelligent domain support")
print("🎯 Purpose: Real-time medical research across all medical domains")
print("=" * 70)
# Show comprehensive domain summary
engine.show_comprehensive_domain_summary()
# Phase 1: System Diagnostics
print("\nπŸ”§ PHASE 1: SYSTEM DIAGNOSTICS")
print("-" * 40)
# Test connectivity
connectivity = engine.test_system_connectivity()
# Show system status
status = engine.get_system_status()
print(f"\nπŸ“Š SYSTEM STATUS:")
print(f" Domains: {status['total_domains']}")
print(
f" Data sources: {status.get('total_sources', 0)} ({status.get('primary_sources_count', 0)} primary, {status.get('fallback_sources_count', 0)} fallback)")
print(f" Sources needing API keys: {len(status.get('sources_requiring_keys', []))}")
# Phase 2: Comprehensive Domain Testing
print("\nπŸ§ͺ PHASE 2: COMPREHENSIVE DOMAIN TESTING")
print("-" * 40)
engine.test_comprehensive_domains(max_domains=5)
# Phase 3: Fallback System Test
print("\nπŸ›‘οΈ PHASE 3: FALLBACK SYSTEM TEST")
print("-" * 40)
engine.test_fallback_system()
# Phase 4: Interactive Mode
print("\nπŸ’¬ PHASE 4: INTERACTIVE MODE")
print("-" * 40)
print("Starting interactive mode...")
engine.interactive_test()
# Final Summary
print("\nπŸŽ‰ COMPREHENSIVE SYSTEM TESTING COMPLETE!")
print("=" * 70)
final_status = engine.get_system_status()
print(f"πŸ“ˆ Final Statistics:")
print(f" Total searches performed: {final_status['engine_stats']['total_searches']}")
print(f" Successful searches: {final_status['engine_stats']['successful_searches']}")
print(f" Average papers per search: {final_status['engine_stats'].get('average_results', 0):.1f}")
print(f" Fallback activations: {final_status['engine_stats']['fallback_activations']}")
print(f" Domains used: {len(final_status['engine_stats']['domains_used'])}")
print(f" Total domains available: {final_status['total_domains']}")
print(f" Comprehensive domain support: βœ… ACTIVE")
print(f"\nβœ… Comprehensive medical research system is fully operational!")
print(f"πŸš€ Ready for Phase 2 (Enhanced RAG implementation) with {final_status['total_domains']} medical specialties!")
if __name__ == "__main__":
main()