Spaces:
Sleeping
Sleeping
| # main.py (COMPREHENSIVE UPDATED VERSION - 55+ Medical Specialties) | |
| from data_sources.pubmed_client import PubMedClient | |
| from data_sources.arxiv_client import ArXivClient | |
| from data_sources.real_time_searcher import RealTimeSearcher | |
| from processing.paper_processor import PaperProcessor | |
| from config.domains import ( | |
| get_domain_config, get_all_domains, validate_domain, | |
| get_primary_sources, get_fallback_sources, get_sources_requiring_keys, | |
| get_domain_description, get_domain_display_name | |
| ) | |
| import time | |
| from typing import Dict, List | |
| import json | |
| class MedicalResearchEngine: | |
| """ | |
| COMPREHENSIVE Medical Research Chatbot Engine | |
| Now with 55+ medical specialties and intelligent domain detection | |
| """ | |
| def __init__(self): | |
| self.pubmed_client = PubMedClient() | |
| self.arxiv_client = ArXivClient() | |
| self.real_time_searcher = RealTimeSearcher() | |
| self.processor = PaperProcessor() | |
| self.pre_collected_papers = {} | |
| # Enhanced tracking for comprehensive domains | |
| self.search_stats = { | |
| 'total_searches': 0, | |
| 'successful_searches': 0, | |
| 'fallback_activations': 0, | |
| 'domains_used': {}, | |
| 'average_results': 0, | |
| 'comprehensive_domains': len(get_all_domains()) | |
| } | |
| def answer_user_query(self, user_query: str, domain: str, use_real_time: bool = True, | |
| use_fallback: bool = False) -> Dict: | |
| """ | |
| Enhanced main method with comprehensive domain support | |
| Args: | |
| user_query: User's search question | |
| domain: Medical domain to search in (35+ specialties) | |
| use_real_time: Whether to search APIs in real-time | |
| use_fallback: Whether to allow fallback sources | |
| Returns: | |
| Comprehensive response with papers, sources, and metadata | |
| """ | |
| self.search_stats['total_searches'] += 1 | |
| print(f"π― Processing user query: '{user_query}'") | |
| print(f" Domain: {domain} ({get_domain_display_name(domain)})") | |
| print(f" Description: {get_domain_description(domain)}") | |
| print(f" Real-time: {use_real_time}") | |
| print(f" Fallback: {use_fallback}") | |
| # Validate domain with comprehensive list | |
| if not validate_domain(domain): | |
| available_domains = get_all_domains() | |
| error_msg = f"Error: Unknown domain '{domain}'. Available domains: {', '.join(available_domains[:10])}... ({len(available_domains)} total)" | |
| return self._create_error_response(error_msg) | |
| # Track domain usage | |
| self.search_stats['domains_used'][domain] = self.search_stats['domains_used'].get(domain, 0) + 1 | |
| relevant_papers = [] | |
| search_start_time = time.time() | |
| if use_real_time: | |
| print(f" π Using real-time search for {get_domain_display_name(domain)}...") | |
| relevant_papers = self.real_time_searcher.search_user_query( | |
| user_query, domain, max_results=20, use_fallback=use_fallback | |
| ) | |
| # Tag papers with comprehensive domain info | |
| for paper in relevant_papers: | |
| paper['search_domain'] = domain | |
| paper['domain_display_name'] = get_domain_display_name(domain) | |
| paper['domain_description'] = get_domain_description(domain) | |
| # Track fallback usage | |
| if any(paper.get('is_fallback', False) for paper in relevant_papers): | |
| self.search_stats['fallback_activations'] += 1 | |
| else: | |
| print(f" πΎ Using pre-collected database for {domain}...") | |
| if not self.pre_collected_papers: | |
| self.pre_collected_papers = self.collect_all_domains() | |
| domain_papers = self.pre_collected_papers.get(domain, []) | |
| relevant_papers = self._filter_pre_collected(domain_papers, user_query) | |
| search_time = time.time() - search_start_time | |
| # Generate enhanced answer with domain context | |
| answer = self._generate_comprehensive_answer(user_query, relevant_papers, domain, search_time) | |
| # Update success statistics | |
| if relevant_papers: | |
| self.search_stats['successful_searches'] += 1 | |
| self.search_stats['average_results'] = ( | |
| (self.search_stats['average_results'] * (self.search_stats['successful_searches'] - 1) + len( | |
| relevant_papers)) | |
| / self.search_stats['successful_searches'] | |
| ) | |
| # Create comprehensive response | |
| return self._create_comprehensive_response( | |
| user_query, domain, answer, relevant_papers, search_time, use_fallback | |
| ) | |
| def _create_comprehensive_response(self, user_query: str, domain: str, answer: str, | |
| papers: List[Dict], search_time: float, use_fallback: bool) -> Dict: | |
| """Create a comprehensive response with all metadata""" | |
| source_breakdown = self._analyze_sources(papers) | |
| fallback_papers = [p for p in papers if p.get('is_fallback', False)] | |
| return { | |
| "query": user_query, | |
| "domain": domain, | |
| "domain_display_name": get_domain_display_name(domain), | |
| "domain_description": get_domain_description(domain), | |
| "answer": answer, | |
| "supporting_papers": papers[:15], # Increased to 15 for better coverage | |
| "total_papers_found": len(papers), | |
| "search_time_seconds": round(search_time, 2), | |
| "search_type": "real_time", | |
| "sources_used": source_breakdown, | |
| "fallback_used": len(fallback_papers) > 0, | |
| "fallback_papers_count": len(fallback_papers), | |
| "primary_papers_count": len(papers) - len(fallback_papers), | |
| "search_id": f"search_{int(time.time())}_{hash(user_query) % 10000}", | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "comprehensive_domain": True | |
| } | |
| def _create_error_response(self, error_message: str) -> Dict: | |
| """Create standardized error response""" | |
| return { | |
| "query": "", | |
| "domain": "", | |
| "answer": error_message, | |
| "supporting_papers": [], | |
| "total_papers_found": 0, | |
| "search_time_seconds": 0, | |
| "search_type": "error", | |
| "sources_used": {}, | |
| "fallback_used": False, | |
| "fallback_papers_count": 0, | |
| "primary_papers_count": 0, | |
| "domain_display_name": "", | |
| "search_id": f"error_{int(time.time())}", | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "comprehensive_domain": False | |
| } | |
| def _generate_comprehensive_answer(self, user_query: str, papers: List[Dict], domain: str, | |
| search_time: float) -> str: | |
| """Generate intelligent answer with domain-specific context""" | |
| if not papers: | |
| suggestions = self._get_search_suggestions(user_query, domain) | |
| return f"I couldn't find recent {get_domain_display_name(domain)} research papers specifically addressing '{user_query}'. {suggestions}" | |
| # Analyze paper characteristics | |
| recent_count = sum(1 for p in papers if self._is_recent(p)) | |
| preprint_count = sum(1 for p in papers if p.get('is_preprint', False)) | |
| fallback_count = sum(1 for p in papers if p.get('is_fallback', False)) | |
| # Get top papers for mention | |
| top_papers = papers[:3] | |
| paper_mentions = [] | |
| for i, paper in enumerate(top_papers): | |
| source_info = f"({paper['source']})" | |
| if paper.get('is_preprint', False): | |
| source_info += " π" # Preprint indicator | |
| if paper.get('is_fallback', False): | |
| source_info += " π‘οΈ" # Fallback indicator | |
| paper_mentions.append(f"'{paper['title']}' {source_info}") | |
| # Build comprehensive answer | |
| answer_parts = [] | |
| # Domain-specific header | |
| answer_parts.append(f"## {get_domain_display_name(domain)} Analysis\n") | |
| answer_parts.append(f"{get_domain_description(domain)}\n") | |
| # Results summary | |
| answer_parts.append(f"**Search Results:** Found {len(papers)} relevant papers ") | |
| # Add context about results | |
| if recent_count == len(papers): | |
| answer_parts.append("(all from 2024-2025) ") | |
| elif recent_count > 0: | |
| answer_parts.append(f"({recent_count} from 2024-2025) ") | |
| if preprint_count > 0: | |
| answer_parts.append(f"including {preprint_count} preprints ") | |
| if fallback_count > 0: | |
| answer_parts.append(f"(used {fallback_count} fallback sources) ") | |
| answer_parts.append(f"in {search_time:.1f}s.\n\n") | |
| # Add paper highlights | |
| answer_parts.append("**Key Papers Found:**\n") | |
| for i, mention in enumerate(paper_mentions, 1): | |
| answer_parts.append(f"{i}. {mention}\n") | |
| # Add domain-specific insights | |
| domain_insight = self._get_domain_insight(domain, papers) | |
| if domain_insight: | |
| answer_parts.append(f"\n**Domain Insight:** {domain_insight}\n") | |
| # Sources used | |
| sources = list(set(p.get('source', 'Unknown') for p in papers)) | |
| answer_parts.append(f"\n**Sources:** {', '.join(sources)}\n") | |
| # Next steps | |
| answer_parts.append( | |
| f"\n**Next:** Use the enhanced RAG engine for detailed {get_domain_display_name(domain)} analysis, comparisons, and clinical insights.") | |
| return "".join(answer_parts) | |
| def _get_search_suggestions(self, user_query: str, domain: str) -> str: | |
| """Provide helpful search suggestions when no papers are found""" | |
| domain_name = get_domain_display_name(domain) | |
| suggestions = [ | |
| f"Try using more specific {domain_name.lower()} terminology.", | |
| f"Consider broadening your search to related {domain_name.lower()} sub-specialties.", | |
| f"Check for spelling variations in {domain_name.lower()} terms.", | |
| f"Enable fallback sources for wider {domain_name.lower()} coverage." | |
| ] | |
| return " ".join(suggestions[:2]) | |
| def _get_domain_insight(self, domain: str, papers: List[Dict]) -> str: | |
| """Provide domain-specific insights based on found papers""" | |
| insights = { | |
| "oncology": f"Trend: {sum(1 for p in papers if 'immunotherapy' in p.get('title', '').lower() or 'immunotherapy' in p.get('abstract', '').lower())} papers focus on immunotherapy.", | |
| "cardiology": f"Focus: {sum(1 for p in papers if 'prevention' in p.get('title', '').lower() or 'prevention' in p.get('abstract', '').lower())} papers emphasize preventive cardiology.", | |
| "neurology": f"Note: {sum(1 for p in papers if 'Alzheimer' in p.get('title', '') or 'dementia' in p.get('title', '').lower())} papers address dementia research.", | |
| "infectious_disease": f"Observation: {sum(1 for p in papers if 'resistance' in p.get('title', '').lower() or 'resistance' in p.get('abstract', '').lower())} papers discuss antimicrobial resistance.", | |
| "endocrinology": f"Update: {sum(1 for p in papers if 'diabetes' in p.get('title', '').lower())} papers focus on diabetes management.", | |
| "pulmonology": f"Focus: {sum(1 for p in papers if 'COPD' in p.get('title', '') or 'asthma' in p.get('title', ''))} papers address chronic respiratory diseases.", | |
| "gastroenterology": f"Research: {sum(1 for p in papers if 'IBD' in p.get('title', '') or 'inflammatory bowel' in p.get('title', '').lower())} papers focus on inflammatory bowel disease.", | |
| "psychiatry": f"Trend: {sum(1 for p in papers if 'depression' in p.get('title', '').lower() or 'anxiety' in p.get('title', '').lower())} papers address mental health disorders.", | |
| "surgery": f"Advancement: {sum(1 for p in papers if 'robotic' in p.get('title', '').lower() or 'minimally invasive' in p.get('title', '').lower())} papers discuss surgical innovations.", | |
| "pediatrics": f"Focus: {sum(1 for p in papers if 'pediatric' in p.get('title', '').lower() or 'child' in p.get('title', '').lower())} papers address child health." | |
| } | |
| return insights.get(domain, | |
| f"Research spans {len(set(p['source'] for p in papers))} different sources with {len(papers)} relevant studies.") | |
| def _filter_pre_collected(self, papers: List[Dict], user_query: str) -> List[Dict]: | |
| """Filter pre-collected papers by user query relevance""" | |
| query_terms = [term for term in user_query.lower().split() if len(term) > 3] | |
| relevant = [] | |
| for paper in papers: | |
| content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower() | |
| if any(term in content for term in query_terms): | |
| relevant.append(paper) | |
| return relevant | |
| def _analyze_sources(self, papers: List[Dict]) -> Dict[str, int]: | |
| """Analyze which sources contributed papers""" | |
| source_count = {} | |
| for paper in papers: | |
| source = paper.get('source', 'unknown') | |
| source_count[source] = source_count.get(source, 0) + 1 | |
| return source_count | |
| def _is_recent(self, paper: Dict) -> bool: | |
| """Check if paper is recent (2024-2025)""" | |
| pub_date = paper.get('publication_date', '') | |
| return '2024' in pub_date or '2025' in pub_date | |
| def collect_domain_data(self, domain: str, max_papers: int = 100) -> List[Dict]: | |
| """Collect data for a specific domain""" | |
| print(f"Collecting data for domain: {get_domain_display_name(domain)}") | |
| if not validate_domain(domain): | |
| print(f"Unknown domain: {domain}") | |
| return [] | |
| config = get_domain_config(domain) | |
| all_papers = [] | |
| # PubMed papers | |
| if 'pubmed' in config.get('sources', []): | |
| for query in config.get('pubmed_queries', []): | |
| print(f" Searching PubMed: {query}") | |
| papers = self.pubmed_client.search_papers(query, max_results=20) | |
| all_papers.extend(papers) | |
| time.sleep(0.5) | |
| # ArXiv papers | |
| if 'arxiv' in config.get('sources', []): | |
| for category in config.get('arxiv_categories', []): | |
| print(f" Searching ArXiv: {category}") | |
| papers = self.arxiv_client.search_papers(category, max_results=20) | |
| all_papers.extend(papers) | |
| time.sleep(1) | |
| processed_papers = self.processor.process_papers(all_papers) | |
| print(f" Collected {len(processed_papers)} unique papers for {get_domain_display_name(domain)}") | |
| return processed_papers[:max_papers] | |
| def collect_all_domains(self) -> Dict[str, List[Dict]]: | |
| """Collect data for all domains""" | |
| domain_data = {} | |
| for domain in get_all_domains(): | |
| papers = self.collect_domain_data(domain) | |
| domain_data[domain] = papers | |
| return domain_data | |
| # ==================== ENHANCED TESTING METHODS ==================== | |
| def get_system_status(self) -> Dict: | |
| """Get comprehensive system status""" | |
| try: | |
| search_stats = self.real_time_searcher.get_system_status() | |
| except: | |
| search_stats = {"total_sources": 0, "primary_sources_count": 0, "fallback_sources_count": 0} | |
| return { | |
| **search_stats, | |
| "engine_stats": self.search_stats, | |
| "total_domains": len(get_all_domains()), | |
| "domains_available": get_all_domains(), | |
| "sources_requiring_keys": get_sources_requiring_keys(), | |
| "system_uptime": "Active", | |
| "last_search_time": self.search_stats.get('total_searches', 0), | |
| "comprehensive_support": True, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| def test_system_connectivity(self) -> Dict: | |
| """Test connectivity to all data sources""" | |
| print("π§ Testing System Connectivity...") | |
| print("=" * 60) | |
| try: | |
| connectivity_results = self.real_time_searcher.test_source_connectivity() | |
| except Exception as e: | |
| print(f"β Connectivity test failed: {e}") | |
| return { | |
| "connectivity_results": {}, | |
| "working_sources": [], | |
| "failed_sources": [], | |
| "success_rate": 0 | |
| } | |
| # Summarize results | |
| working_sources = [source for source, status in connectivity_results.items() if status] | |
| failed_sources = [source for source, status in connectivity_results.items() if not status] | |
| print(f"\nπ Connectivity Summary:") | |
| print(f" β Working: {len(working_sources)} sources") | |
| print(f" β Failed: {len(failed_sources)} sources") | |
| if working_sources: | |
| print(f" π’ Active: {', '.join(working_sources)}") | |
| if failed_sources: | |
| print(f" π΄ Issues: {', '.join(failed_sources)}") | |
| return { | |
| "connectivity_results": connectivity_results, | |
| "working_sources": working_sources, | |
| "failed_sources": failed_sources, | |
| "success_rate": len(working_sources) / len(connectivity_results) if connectivity_results else 0 | |
| } | |
| def test_comprehensive_domains(self, max_domains: int = 5): | |
| """Quick test to verify comprehensive domains work""" | |
| print("π§ͺ Testing Comprehensive Medical Domains") | |
| print("=" * 60) | |
| results = {} | |
| available_domains = get_all_domains() | |
| # Select a subset of domains for testing | |
| test_domains = available_domains[:min(max_domains, len(available_domains))] | |
| for domain in test_domains: | |
| print(f"\n㪠Testing: {get_domain_display_name(domain)}") | |
| print(f" Domain ID: {domain}") | |
| print(f" Description: {get_domain_description(domain)}") | |
| config = get_domain_config(domain) | |
| print(f" Sources: {config.get('sources', [])}") | |
| print(f" PubMed queries: {len(config.get('pubmed_queries', []))}") | |
| # Test a simple query | |
| domain_keywords = domain.replace('_', ' ') | |
| test_query = f"recent advances in {domain_keywords}" | |
| try: | |
| result = self.answer_user_query(test_query, domain, use_real_time=True, use_fallback=False) | |
| results[domain] = { | |
| 'papers_found': result['total_papers_found'], | |
| 'sources_used': result['sources_used'], | |
| 'search_time': result['search_time_seconds'], | |
| 'domain_display_name': get_domain_display_name(domain) | |
| } | |
| print(f" Papers found: {result['total_papers_found']}") | |
| print(f" Search time: {result['search_time_seconds']}s") | |
| print(f" Sources: {result['sources_used']}") | |
| except Exception as e: | |
| print(f" β Test failed: {e}") | |
| results[domain] = { | |
| 'papers_found': 0, | |
| 'sources_used': {}, | |
| 'search_time': 0, | |
| 'error': str(e) | |
| } | |
| time.sleep(2) # Be nice to APIs | |
| # Summary | |
| print(f"\nπ COMPREHENSIVE DOMAIN TESTING SUMMARY:") | |
| total_papers = sum(r.get('papers_found', 0) for r in results.values()) | |
| successful_tests = sum(1 for r in results.values() if r.get('papers_found', 0) > 0) | |
| avg_papers = total_papers / successful_tests if successful_tests > 0 else 0 | |
| print(f" Total papers found: {total_papers}") | |
| print(f" Average per domain: {avg_papers:.1f}") | |
| print(f" Domains tested: {len(results)}") | |
| print(f" Successful tests: {successful_tests}") | |
| print(f" Total domains available: {len(available_domains)}") | |
| return results | |
| def test_fallback_system(self): | |
| """Test the fallback source system""" | |
| print("π‘οΈ Testing Fallback System") | |
| print("=" * 50) | |
| # Test system status | |
| status = self.get_system_status() | |
| print(f"Primary sources: {status.get('primary_sources_count', 0)}") | |
| print(f"Fallback sources: {status.get('fallback_sources_count', 0)}") | |
| print(f"Total sources: {status.get('total_sources', 0)}") | |
| test_queries = [ | |
| ("oncology", "immunotherapy for lung cancer"), | |
| ("cardiology", "new treatments for heart failure"), | |
| ("neurology", "Alzheimer's disease biomarkers"), | |
| ("endocrinology", "SGLT2 inhibitors diabetes"), | |
| ("pulmonology", "COPD management guidelines") | |
| ] | |
| for domain, query in test_queries[:3]: # Test first 3 for speed | |
| print(f"\nπ Testing: {get_domain_display_name(domain)} - '{query}'") | |
| # Test without fallback | |
| print(" π WITHOUT fallback:") | |
| result_no_fallback = self.answer_user_query(query, domain, use_real_time=True, use_fallback=False) | |
| print(f" Papers: {result_no_fallback['total_papers_found']}") | |
| print(f" Sources: {result_no_fallback['sources_used']}") | |
| print(f" Fallback used: {result_no_fallback['fallback_used']}") | |
| time.sleep(2) | |
| # Test with fallback | |
| print(" π WITH fallback:") | |
| result_with_fallback = self.answer_user_query(query, domain, use_real_time=True, use_fallback=True) | |
| print(f" Papers: {result_with_fallback['total_papers_found']}") | |
| print(f" Sources: {result_with_fallback['sources_used']}") | |
| print(f" Fallback used: {result_with_fallback['fallback_used']}") | |
| print(f" Fallback papers: {result_with_fallback['fallback_papers_count']}") | |
| time.sleep(3) # Extra delay between domains | |
| def interactive_test(self): | |
| """Interactive testing mode with comprehensive domain support""" | |
| print("\n㪠COMPREHENSIVE INTERACTIVE TESTING MODE") | |
| print("=" * 60) | |
| available_domains = get_all_domains() | |
| print(f"π Available domains ({len(available_domains)} medical specialties):") | |
| # Display domains in categories | |
| domain_categories = { | |
| "π₯ Internal Medicine": ["internal_medicine", "endocrinology", "gastroenterology", "pulmonology", | |
| "nephrology", "hematology"], | |
| "π¦ Infectious": ["infectious_disease"], | |
| "πΆ Women's Health": ["obstetrics_gynecology"], | |
| "π¬ Lab & Pathology": ["pathology", "laboratory_medicine"], | |
| "𧬠Biomedical Sciences": ["bioinformatics", "genomics", "pharmacology"], | |
| "π©Ί Medical Specialties": ["medical_imaging", "oncology", "cardiology", "neurology", "psychiatry"], | |
| "πͺ Surgical Specialties": ["surgery", "orthopedics", "urology", "ophthalmology"], | |
| "πΆ Pediatrics": ["pediatrics"], | |
| "π Emergency & Critical Care": ["emergency_medicine", "critical_care"], | |
| "π©Ί Other Specialties": ["dermatology", "pain_medicine", "nutrition", "allergy_immunology", | |
| "rehabilitation_medicine"], | |
| "π Research & Public Health": ["clinical_research", "public_health"], | |
| "π General": ["general_medical", "auto"] | |
| } | |
| for category, domains in domain_categories.items(): | |
| print(f"\n{category}:") | |
| for domain in domains: | |
| if domain in available_domains: | |
| print(f" β’ {get_domain_display_name(domain)} ({domain})") | |
| print("\nπ Commands: 'quit' to exit, 'status' for system status, 'test' for connectivity test, 'domains' to list all domains") | |
| while True: | |
| print("\n" + "=" * 50) | |
| command = input("\nEnter domain name or command: ").strip().lower() | |
| if command == 'quit': | |
| break | |
| elif command == 'status': | |
| status = self.get_system_status() | |
| print(f"\nπ SYSTEM STATUS:") | |
| print(f" Total searches: {status['engine_stats']['total_searches']}") | |
| print( | |
| f" Successful searches: {status['engine_stats']['successful_searches']}/{status['engine_stats']['total_searches']}") | |
| print(f" Average results: {status['engine_stats'].get('average_results', 0):.1f}") | |
| print(f" Total domains: {status['total_domains']}") | |
| print(f" Sources: {status.get('total_sources', 0)} total") | |
| print(f" Comprehensive support: {'β ' if status.get('comprehensive_support', False) else 'β'}") | |
| continue | |
| elif command == 'test': | |
| self.test_system_connectivity() | |
| continue | |
| elif command == 'domains': | |
| print(f"\nπ ALL DOMAINS ({len(available_domains)}):") | |
| for i, domain in enumerate(available_domains, 1): | |
| print(f"{i:3d}. {get_domain_display_name(domain)} ({domain})") | |
| continue | |
| # Check if command is a valid domain | |
| domain = command | |
| if not validate_domain(domain): | |
| # Try to find domain by display name or partial match | |
| matching_domains = [d for d in available_domains if | |
| command in d or command in get_domain_display_name(d).lower()] | |
| if matching_domains: | |
| if len(matching_domains) == 1: | |
| domain = matching_domains[0] | |
| print(f"β Matched domain: {get_domain_display_name(domain)}") | |
| else: | |
| print(f"π Multiple matching domains found:") | |
| for match in matching_domains[:5]: | |
| print(f" β’ {get_domain_display_name(match)} ({match})") | |
| domain = None | |
| else: | |
| print(f"β Invalid domain. Available domains: {', '.join(available_domains[:8])}...") | |
| print(f" Type 'domains' to see all {len(available_domains)} specialties.") | |
| continue | |
| if domain is None: | |
| continue | |
| query = input(f"Enter your query for {get_domain_display_name(domain)}: ").strip() | |
| if not query: | |
| print("β Query cannot be empty") | |
| continue | |
| use_fallback_input = input("Use fallback sources? (y/n): ").strip().lower() | |
| use_fallback = use_fallback_input == 'y' | |
| print(f"\nπ Searching for: '{query}'") | |
| print(f" Domain: {get_domain_display_name(domain)}") | |
| print(f" Description: {get_domain_description(domain)}") | |
| print(f" Fallback: {'ENABLED' if use_fallback else 'DISABLED'}") | |
| result = self.answer_user_query(query, domain, use_real_time=True, use_fallback=use_fallback) | |
| print(f"\nπ RESULTS:") | |
| print(f" Answer: {result['answer'][:200]}...") | |
| print(f" Total papers: {result['total_papers_found']}") | |
| print(f" Search time: {result['search_time_seconds']}s") | |
| print(f" Sources used: {result['sources_used']}") | |
| print(f" Fallback used: {result['fallback_used']}") | |
| if result['supporting_papers']: | |
| print(f"\nπ Top papers:") | |
| for i, paper in enumerate(result['supporting_papers'][:5]): | |
| source_indicator = "π‘οΈ " if paper.get('is_fallback') else "" | |
| preprint_indicator = "π " if paper.get('is_preprint') else "" | |
| print(f" {i + 1}. {source_indicator}{preprint_indicator}{paper.get('title', 'Untitled')[:80]}...") | |
| print( | |
| f" Source: {paper.get('source', 'Unknown')} | Domain: {paper.get('search_domain', domain)}") | |
| def show_comprehensive_domain_summary(self): | |
| """Show comprehensive domain summary""" | |
| print("\nπ₯ COMPREHENSIVE MEDICAL DOMAIN SUMMARY") | |
| print("=" * 60) | |
| available_domains = get_all_domains() | |
| print(f"Total medical specialties: {len(available_domains)}") | |
| print(f"Comprehensive domain support: β ") | |
| # Group domains by category | |
| domain_categories = { | |
| "Internal Medicine": ["internal_medicine", "endocrinology", "gastroenterology", "pulmonology", | |
| "nephrology", "hematology"], | |
| "Surgical Specialties": ["surgery", "orthopedics", "urology", "ophthalmology"], | |
| "Medical Specialties": ["oncology", "cardiology", "neurology", "psychiatry", "dermatology"], | |
| "Women & Children": ["obstetrics_gynecology", "pediatrics"], | |
| "Emergency & Critical Care": ["emergency_medicine", "critical_care"], | |
| "Lab & Diagnostics": ["pathology", "laboratory_medicine", "medical_imaging"], | |
| "Biomedical Sciences": ["bioinformatics", "genomics", "pharmacology"], | |
| "Research & Public Health": ["clinical_research", "public_health"], | |
| "Other Specialties": ["infectious_disease", "pain_medicine", "nutrition", "allergy_immunology", | |
| "rehabilitation_medicine"], | |
| "General": ["general_medical", "auto"] | |
| } | |
| for category, domains in domain_categories.items(): | |
| category_domains = [d for d in domains if d in available_domains] | |
| if category_domains: | |
| print(f"\nπ {category} ({len(category_domains)} specialties):") | |
| for domain in category_domains: | |
| print(f" β’ {get_domain_display_name(domain)}") | |
| print(f"\nβ Ready for comprehensive medical research!") | |
| # ==================== MAIN EXECUTION ==================== | |
| def main(): | |
| """Main execution function with comprehensive domain testing""" | |
| engine = MedicalResearchEngine() | |
| print("π COMPREHENSIVE MEDICAL RESEARCH CHATBOT") | |
| print("=" * 70) | |
| print("π Features: 35+ medical specialties, 9+ data sources, intelligent domain support") | |
| print("π― Purpose: Real-time medical research across all medical domains") | |
| print("=" * 70) | |
| # Show comprehensive domain summary | |
| engine.show_comprehensive_domain_summary() | |
| # Phase 1: System Diagnostics | |
| print("\nπ§ PHASE 1: SYSTEM DIAGNOSTICS") | |
| print("-" * 40) | |
| # Test connectivity | |
| connectivity = engine.test_system_connectivity() | |
| # Show system status | |
| status = engine.get_system_status() | |
| print(f"\nπ SYSTEM STATUS:") | |
| print(f" Domains: {status['total_domains']}") | |
| print( | |
| f" Data sources: {status.get('total_sources', 0)} ({status.get('primary_sources_count', 0)} primary, {status.get('fallback_sources_count', 0)} fallback)") | |
| print(f" Sources needing API keys: {len(status.get('sources_requiring_keys', []))}") | |
| # Phase 2: Comprehensive Domain Testing | |
| print("\nπ§ͺ PHASE 2: COMPREHENSIVE DOMAIN TESTING") | |
| print("-" * 40) | |
| engine.test_comprehensive_domains(max_domains=5) | |
| # Phase 3: Fallback System Test | |
| print("\nπ‘οΈ PHASE 3: FALLBACK SYSTEM TEST") | |
| print("-" * 40) | |
| engine.test_fallback_system() | |
| # Phase 4: Interactive Mode | |
| print("\n㪠PHASE 4: INTERACTIVE MODE") | |
| print("-" * 40) | |
| print("Starting interactive mode...") | |
| engine.interactive_test() | |
| # Final Summary | |
| print("\nπ COMPREHENSIVE SYSTEM TESTING COMPLETE!") | |
| print("=" * 70) | |
| final_status = engine.get_system_status() | |
| print(f"π Final Statistics:") | |
| print(f" Total searches performed: {final_status['engine_stats']['total_searches']}") | |
| print(f" Successful searches: {final_status['engine_stats']['successful_searches']}") | |
| print(f" Average papers per search: {final_status['engine_stats'].get('average_results', 0):.1f}") | |
| print(f" Fallback activations: {final_status['engine_stats']['fallback_activations']}") | |
| print(f" Domains used: {len(final_status['engine_stats']['domains_used'])}") | |
| print(f" Total domains available: {final_status['total_domains']}") | |
| print(f" Comprehensive domain support: β ACTIVE") | |
| print(f"\nβ Comprehensive medical research system is fully operational!") | |
| print(f"π Ready for Phase 2 (Enhanced RAG implementation) with {final_status['total_domains']} medical specialties!") | |
| if __name__ == "__main__": | |
| main() |