import os import json from dataclasses import dataclass from typing import List, Optional, Dict, Tuple from bs4 import BeautifulSoup import requests import time import arxiv from dotenv import load_dotenv from openai import OpenAI from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import torch from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np from datetime import datetime import streamlit as st # Load environment variables first load_dotenv() # Import Kaggle only when needed (lazy import to avoid auto-authentication) try: from kaggle.api.kaggle_api_extended import KaggleApi KAGGLE_AVAILABLE = True except Exception as e: print(f"โš ๏ธ Kaggle API not available: {str(e)}") KAGGLE_AVAILABLE = False @st.cache_resource(show_spinner="Loading Gap Analysis Engine (Flan-T5-Large)... This may take 1-2 minutes on first run") def load_gap_analyzer(): """Load and cache the Gap Analysis Engine""" print("๐Ÿ”„ Initializing Gap Analysis Engine...") try: analyzer = GapAnalysisEngine() print("โœ… Gap Analysis Engine loaded successfully!") return analyzer except Exception as e: print(f"โŒ Failed to load Gap Analysis Engine: {str(e)}") return None @st.cache_resource(show_spinner="Loading Hypothesis Generator...") def load_hypothesis_generator(): """Load and cache the Hypothesis Generator""" print("๐Ÿ”„ Initializing Hypothesis Generator...") try: generator = HypothesisGenerator() print("โœ… Hypothesis Generator loaded successfully!") return generator except Exception as e: print(f"โŒ Failed to load Hypothesis Generator: {str(e)}") return None @st.cache_resource(show_spinner="Loading Experiment Generator...") def load_experiment_generator(): """Load and cache the Experiment Generator""" print("๐Ÿ”„ Initializing Experiment Generator...") try: generator = ExperimentGenerator() print("โœ… Experiment Generator loaded successfully!") return generator except Exception as e: print(f"โŒ Failed to load Experiment Generator: {str(e)}") return None class DatasetFetcher: """Fetch real, verified datasets from Kaggle and HuggingFace""" def __init__(self): """Initialize Kaggle API and HuggingFace client""" self.kaggle_api = None self.hf_token = os.getenv('HF_TOKEN') # Initialize Kaggle API only if available and credentials exist if not KAGGLE_AVAILABLE: print("โš ๏ธ Kaggle library not available. Dataset verification will be limited.") return try: kaggle_username = os.getenv('KAGGLE_USERNAME') kaggle_key = os.getenv('KAGGLE_KEY') if kaggle_username and kaggle_key: # Create .kaggle directory if it doesn't exist kaggle_dir = os.path.expanduser('~/.kaggle') os.makedirs(kaggle_dir, exist_ok=True) # Create kaggle.json file kaggle_json_path = os.path.join(kaggle_dir, 'kaggle.json') kaggle_config = { "username": kaggle_username, "key": kaggle_key } with open(kaggle_json_path, 'w') as f: json.dump(kaggle_config, f) # Set proper permissions (read/write for owner only) try: os.chmod(kaggle_json_path, 0o600) except Exception: pass # Windows doesn't support chmod # Now initialize Kaggle API self.kaggle_api = KaggleApi() self.kaggle_api.authenticate() print("โœ… Kaggle API initialized successfully") else: print("โš ๏ธ Kaggle credentials not found in .env file. Add KAGGLE_USERNAME and KAGGLE_KEY to enable dataset verification.") except Exception as e: print(f"โš ๏ธ Kaggle API initialization failed: {str(e)}") print(" Dataset links will be AI-generated (may need manual verification)") self.kaggle_api = None def search_kaggle_datasets(self, query: str, max_results: int = 3) -> List[Dict]: """Search for real Kaggle datasets matching query""" if not self.kaggle_api: return [] try: print(f"๐Ÿ” Searching Kaggle for: {query}") # Fetch datasets using Kaggle API datasets = list(self.kaggle_api.dataset_list(search=query)) results = [] for dataset in datasets[:max_results]: # Limit to top results results.append({ "name": dataset.title, "description": dataset.subtitle or "No description available", "url": f"https://www.kaggle.com/datasets/{dataset.ref}", "type": "CSV/Tabular", "size": f"{dataset.totalBytes // (1024*1024)}MB" if dataset.totalBytes else "Unknown", "downloads": dataset.downloadCount, "usability": dataset.usabilityRating }) print(f"โœ… Found {len(results)} Kaggle datasets") return results except Exception as e: print(f"โŒ Kaggle search error: {str(e)}") return [] def search_huggingface_datasets(self, query: str, max_results: int = 3) -> List[Dict]: """Search for real HuggingFace datasets matching query""" try: print(f"๐Ÿ” Searching HuggingFace for: {query}") # Use HuggingFace API to search datasets url = "https://huggingface.co/api/datasets" params = { "search": query, "limit": max_results, "sort": "downloads" } headers = {} if self.hf_token: headers["Authorization"] = f"Bearer {self.hf_token}" response = requests.get(url, params=params, headers=headers, timeout=10) if response.status_code == 200: datasets = response.json() results = [] for dataset in datasets[:max_results]: dataset_id = dataset.get('id', '') results.append({ "name": dataset.get('id', 'Unknown'), "description": dataset.get('description', 'No description available')[:200], "url": f"https://huggingface.co/datasets/{dataset_id}", "type": "Text/Images/Audio", "size": dataset.get('size', 'Unknown'), "downloads": dataset.get('downloads', 0), "likes": dataset.get('likes', 0) }) print(f"โœ… Found {len(results)} HuggingFace datasets") return results else: print(f"โš ๏ธ HuggingFace API returned status {response.status_code}") return [] except Exception as e: print(f"โŒ HuggingFace search error: {str(e)}") return [] def get_datasets_for_topic(self, topic: str, hypothesis: str = "", dataset_keywords: List[str] = None) -> List[Dict]: """Get verified datasets from Kaggle (with AI fallback if none found)""" # Use provided keywords or construct from topic if dataset_keywords: search_query = " ".join(dataset_keywords[:2]) else: search_query = f"{topic} {hypothesis}"[:80] # Search Kaggle (primary source) kaggle_datasets = self.search_kaggle_datasets(search_query, max_results=2) # If Kaggle found datasets, return them if kaggle_datasets: return kaggle_datasets # Fallback: Generate AI-suggested datasets if Kaggle fails print("โš ๏ธ No datasets found from Kaggle API. Using AI-suggested datasets.") ai_datasets = [ { "name": f"Suggested: {topic.title()} Dataset", "description": f"Recommended dataset for {topic}. Search on Kaggle or academic repositories.", "url": f"https://www.kaggle.com/search?q={topic.replace(' ', '+')}", "type": "Dataset", "size": "Varies", "downloads": "N/A", "usability": None, "source": "AI Suggested" } ] return ai_datasets @dataclass class SearchResult: """Data class to store search results""" title: str link: str = "" snippet: str = "" body: str = "" authors: List[str] = None published: str = "" abstract: str = "" pdf_url: str = "" arxiv_url: str = "" # Clinical trials specific fields nct_id: str = "" status: str = "" study_type: str = "" conditions: List[str] = None interventions: List[str] = None phase: str = "" enrollment: int = 0 locations: List[str] = None class SearchTool: """Tool for performing Google, arXiv, and ClinicalTrials.gov searches""" def __init__(self): self.api_key = os.getenv('GOOGLE_API_KEY') self.search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID') def google_search(self, query: str, num_results: int = 2, max_chars: int = 500) -> List[SearchResult]: """ Perform Google search and return enriched results """ if not self.api_key or not self.search_engine_id: print("Missing API credentials") return [] try: url = "https://www.googleapis.com/customsearch/v1" params = { "key": self.api_key, "cx": self.search_engine_id, "q": query, "num": num_results } response = requests.get(url, params=params) if response.status_code != 200: print(f"Google Search API error: {response.status_code}") print(f"Response content: {response.text}") return [] results = response.json().get("items", []) enriched_results = [] for item in results: body = self._get_page_content(item["link"], max_chars) result = SearchResult( title=item.get("title", "No title"), # Added .get() with default link=item.get("link", ""), snippet=item.get("snippet", ""), # FIX: Added .get() with default empty string body=body ) enriched_results.append(result) time.sleep(1) # Rate limiting return enriched_results except Exception as e: print(f"Error in Google search: {str(e)}") return [] def arxiv_search(self, query: str, max_results: int = 2) -> List[SearchResult]: """ Perform arXiv search and return results """ try: client = arxiv.Client() search = arxiv.Search( query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance ) results = [] for paper in client.results(search): paper_id = paper.entry_id.split('/')[-1] arxiv_url = f"https://arxiv.org/abs/{paper_id}" result = SearchResult( title=paper.title, authors=[author.name for author in paper.authors], published=paper.published.strftime("%Y-%m-%d"), abstract=paper.summary, pdf_url=paper.pdf_url, arxiv_url=arxiv_url ) results.append(result) time.sleep(0.5) # Gentle rate limiting return results except Exception as e: print(f"Error in Arxiv search: {str(e)}") return [] def clinicaltrials_search_beta_api(self, query: str, max_results: int = 2) -> List[SearchResult]: """ Perform ClinicalTrials.gov search using their current Beta API https://clinicaltrials.gov/data-api/ui """ try: # Base URL for the ClinicalTrials.gov Beta API base_url = "https://clinicaltrials.gov/api/v2/studies" # Parameters for the API request params = { "query.term": query, "pageSize": max_results, "format": "json" } # Make the API request response = requests.get(base_url, params=params, timeout=15) if response.status_code != 200: print(f"ClinicalTrials.gov API error: {response.status_code}") print(f"Response content: {response.text}") return [] # Parse the JSON response data = response.json() studies = data.get("studies", []) if not studies: print(f"No clinical trial studies found for query: {query}") return [] results = [] for study in studies: # Extract study details protocol_section = study.get("protocolSection", {}) identification = protocol_section.get("identificationModule", {}) status_module = protocol_section.get("statusModule", {}) design_module = protocol_section.get("designModule", {}) conditions_module = protocol_section.get("conditionsModule", {}) description_module = protocol_section.get("descriptionModule", {}) # Extract NCT ID nct_id = identification.get("nctId", "") if not nct_id: continue # Extract title title = identification.get("briefTitle", "") official_title = identification.get("officialTitle", "") # Extract study details study_type = design_module.get("studyType", "") phase_list = design_module.get("phases", []) phase = ", ".join(phase_list) if phase_list else "Not Specified" # Extract status status = status_module.get("overallStatus", "") # Extract conditions conditions = conditions_module.get("conditions", []) # Extract description/summary summary = description_module.get("briefSummary", "") detailed_desc = description_module.get("detailedDescription", "") # Create the study URL - using the stable URL format # Changed from /study/ to /ct2/show/ which is more stable study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}" result = SearchResult( title=title if title else official_title, link=study_url, snippet=summary[:200] + "..." if len(summary) > 200 else summary, abstract=summary if summary else detailed_desc, nct_id=nct_id, status=status, study_type=study_type, phase=phase, conditions=conditions ) results.append(result) # Get full study details if needed if not summary and not detailed_desc: detailed_study = self._get_clinical_trial_details_api(nct_id) if detailed_study: result.abstract = detailed_study.abstract result.conditions = detailed_study.conditions result.interventions = detailed_study.interventions time.sleep(2) # Increased rate limiting for API requests return results except Exception as e: print(f"Error in ClinicalTrials.gov Beta API search: {str(e)}") return [] def _get_clinical_trial_details_api(self, nct_id: str) -> Optional[SearchResult]: """ Fetch detailed information for a specific study using the Beta API """ # Validate NCT ID format if not nct_id or not nct_id.startswith("NCT"): print(f"Invalid NCT ID format: {nct_id}") return None try: # Base URL for the ClinicalTrials.gov Beta API - single study endpoint base_url = f"https://clinicaltrials.gov/api/v2/studies/{nct_id}" # Make the API request response = requests.get(base_url, params={"format": "json"}, timeout=15) if response.status_code != 200: print(f"ClinicalTrials.gov API error for study {nct_id}: {response.status_code}") return None # Parse the JSON response study = response.json() # Extract study details protocol_section = study.get("protocolSection", {}) identification = protocol_section.get("identificationModule", {}) description_module = protocol_section.get("descriptionModule", {}) conditions_module = protocol_section.get("conditionsModule", {}) intervention_module = protocol_section.get("armsInterventionsModule", {}) # Extract title title = identification.get("briefTitle", "") # Extract summary summary = description_module.get("briefSummary", "") detailed_desc = description_module.get("detailedDescription", "") # Extract conditions conditions = conditions_module.get("conditions", []) # Extract interventions interventions_list = [] for intervention in intervention_module.get("interventions", []): intervention_name = intervention.get("name", "") if intervention_name: interventions_list.append(intervention_name) # Create the study URL using stable format study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}" return SearchResult( title=title, link=study_url, nct_id=nct_id, abstract=summary if summary else detailed_desc, conditions=conditions, interventions=interventions_list ) except Exception as e: print(f"Error fetching details for study {nct_id}: {str(e)}") return None def clinicaltrials_search_scrape(self, query: str, max_results: int = 2) -> List[SearchResult]: """ Perform ClinicalTrials.gov search by scraping the website Use this as a fallback if the API method doesn't work """ try: # Construct the search URL - updated to use v2 search base_url = "https://clinicaltrials.gov/search" params = { "term": query, "draw": 1, "rank": 1 } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1" } # Make the initial request to get the search results page response = requests.get(base_url, params=params, headers=headers, timeout=15) if response.status_code != 200: print(f"ClinicalTrials.gov search error: {response.status_code}") return [] soup = BeautifulSoup(response.content, "html.parser") study_links = soup.select(".ct-search-result a.ct-search-result__title-link")[:max_results] if not study_links: print("No study links found on the search results page") return [] results = [] for link in study_links: href = link.get('href', '') if not href: continue # Extract NCT ID from href and validate try: # Different ways the NCT ID might appear in the URL if '/study/' in href: nct_id = href.split('/')[-1] elif '/ct2/show/' in href: nct_id = href.split('/')[-1] else: # Try to find NCT pattern (NCTXXXXXXXX) import re nct_match = re.search(r'(NCT\d{8})', href) if nct_match: nct_id = nct_match.group(1) else: print(f"Could not extract NCT ID from href: {href}") continue except Exception as e: print(f"Error extracting NCT ID from {href}: {str(e)}") continue # Ensure proper URL format study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}" # Get detailed information from the study page study_info = self._get_clinical_trial_details_scrape(study_url) if study_info: results.append(study_info) time.sleep(2) # Increased rate limiting return results except Exception as e: print(f"Error in ClinicalTrials.gov scrape search: {str(e)}") return [] def _get_clinical_trial_details_scrape(self, url: str) -> Optional[SearchResult]: """ Extract detailed information from a ClinicalTrials.gov study page using web scraping """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5" } response = requests.get(url, headers=headers, timeout=15) if response.status_code != 200: print(f"Error accessing {url}: {response.status_code}") return None soup = BeautifulSoup(response.content, "html.parser") # Extract NCT ID - handle different URL formats try: if '/ct2/show/' in url: nct_id = url.split('/')[-1] else: # Try to find NCT pattern import re nct_match = re.search(r'(NCT\d{8})', url) if nct_match: nct_id = nct_match.group(1) else: # Last resort - check the page content for NCT ID nct_elem = soup.find(string=re.compile(r'NCT\d{8}')) if nct_elem: nct_match = re.search(r'(NCT\d{8})', nct_elem) nct_id = nct_match.group(1) if nct_match else "Unknown" else: nct_id = "Unknown" except Exception: nct_id = "Unknown" # Extract title - handle different page structures title = "Unknown Title" title_selectors = ["h1.tr-h1", "h1.ct-title", ".headline-title"] for selector in title_selectors: title_elem = soup.select_one(selector) if title_elem: title = title_elem.get_text().strip() break # Extract status status = "Unknown" status_selectors = [".ct-recruitment-status div.ct-recruitment-status__label", ".statusLabel", "p:contains('Recruitment Status:')"] for selector in status_selectors: status_elem = None try: if ':contains' in selector: # Handle custom contains selector text = selector.split(':contains(')[1].strip("')") for p in soup.find_all('p'): if text in p.get_text(): status_elem = p break else: status_elem = soup.select_one(selector) except Exception: continue if status_elem: status_text = status_elem.get_text().strip() if "Status:" in status_text: status = status_text.split("Status:")[1].strip() else: status = status_text break # Extract summary - try different selectors summary = "" summary_selectors = ["#brief-summary div.tr-indent2", ".ct-body__section div.tr-indent1", "section#brief-summary"] for selector in summary_selectors: summary_elem = soup.select_one(selector) if summary_elem: summary = summary_elem.get_text().strip() break # Extract study type study_type = "" study_type_selectors = [ lambda s: s.find(string="Study Type:"), lambda s: s.find("th", string="Study Type") ] for selector_func in study_type_selectors: study_type_label = selector_func(soup) if study_type_label: if study_type_label.parent: value_elem = None if study_type_label.parent.name == "th": # Handle table format value_elem = study_type_label.parent.find_next("td") else: # Handle div format value_elem = study_type_label.parent.find_next("div", class_="ct-data-elem__value") if not value_elem: value_elem = study_type_label.parent.find_next_sibling("div") if value_elem: study_type = value_elem.get_text().strip() break # Extract phase phase = "" phase_selectors = [ lambda s: s.find(string="Phase:"), lambda s: s.find("th", string="Phase") ] for selector_func in phase_selectors: phase_label = selector_func(soup) if phase_label: if phase_label.parent: value_elem = None if phase_label.parent.name == "th": # Handle table format value_elem = phase_label.parent.find_next("td") else: # Handle div format value_elem = phase_label.parent.find_next("div", class_="ct-data-elem__value") if not value_elem: value_elem = phase_label.parent.find_next_sibling("div") if value_elem: phase = value_elem.get_text().strip() break # Extract conditions conditions = [] conditions_selectors = ["#conditions", "section#conditions", "section:contains('Condition')"] for selector in conditions_selectors: conditions_section = None try: if ':contains' in selector: # Handle custom contains selector text = selector.split(':contains(')[1].strip("')") for section in soup.find_all('section'): if text in section.get_text(): conditions_section = section break else: conditions_section = soup.select_one(selector) except Exception: continue if conditions_section: # Try to find conditions in list items condition_items = conditions_section.select("li") if condition_items: conditions = [item.get_text().strip() for item in condition_items] else: # If no list items, try to get text content conditions_text = conditions_section.get_text().strip() # Remove section title if present if ":" in conditions_text: conditions_text = conditions_text.split(":", 1)[1].strip() conditions = [cond.strip() for cond in conditions_text.split(",")] break return SearchResult( title=title, link=url, snippet=summary[:200] + "..." if summary and len(summary) > 200 else summary, abstract=summary, nct_id=nct_id, status=status, study_type=study_type, phase=phase, conditions=conditions ) except Exception as e: print(f"Error extracting details from {url}: {str(e)}") return None def _get_page_content(self, url: str, max_chars: int) -> str: """ Fetch and extract text content from a webpage """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5" } response = requests.get(url, headers=headers, timeout=15) soup = BeautifulSoup(response.content, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.extract() text = soup.get_text(separator=" ", strip=True) return text[:max_chars] except Exception as e: print(f"Error fetching {url}: {str(e)}") return "" # class GapAnalysisEngine: # """ # Research Gap Analysis Engine using Flan-T5-Large model # Identifies knowledge, methodological, dataset, and temporal gaps in research papers # """ # def __init__(self): # """Initialize the Flan-T5 model and tokenizer""" # print("Loading Flan-T5-Large model for gap analysis...") # self.model_name = "google/flan-t5-large" # try: # # Load tokenizer and model from Hugging Face # self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) # self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name) # # Set device (GPU if available, else CPU) # self.device = "cuda" if torch.cuda.is_available() else "cpu" # self.model.to(self.device) # print(f"Model loaded successfully on {self.device}") # except Exception as e: # print(f"Error loading model: {str(e)}") # self.model = None # self.tokenizer = None # def _create_few_shot_prompt(self, papers: List[SearchResult], topic: str) -> str: # """ # Create a few-shot prompt with examples to guide the model # Args: # papers: List of SearchResult objects containing paper information # topic: The research topic being analyzed # Returns: # Formatted prompt string with few-shot examples # """ # # Format the input papers with more detail # papers_text = "" # for i, paper in enumerate(papers[:12], 1): # Increased from 10 to 12 papers # papers_text += f"\n--- Paper {i} ---\n" # papers_text += f"Title: {paper.title}\n" # # Add authors if available # if paper.authors: # papers_text += f"Authors: {', '.join(paper.authors[:3])}\n" # First 3 authors # # Add publication date # if paper.published: # papers_text += f"Published: {paper.published}\n" # # Add abstract/summary with more characters # if paper.abstract: # papers_text += f"Abstract: {paper.abstract[:500]}...\n" # Increased from 300 # elif paper.snippet: # papers_text += f"Summary: {paper.snippet[:500]}...\n" # # Add study type for clinical trials # if paper.study_type: # papers_text += f"Study Type: {paper.study_type}\n" # if paper.phase: # papers_text += f"Phase: {paper.phase}\n" # # Enhanced few-shot prompt with detailed, insightful examples # prompt = f"""You are a PhD-level research analyst with expertise in identifying critical research gaps. Analyze the papers deeply and provide SPECIFIC, ACTIONABLE gaps that researchers can build upon. # EXAMPLE 1: # Topic: Federated Learning Privacy # Papers: # Paper 1: Differential Privacy in Federated Learning (2022) # Abstract: We apply differential privacy mechanisms to federated learning, achieving epsilon=0.5 privacy guarantees with 3% accuracy loss on MNIST... # Paper 2: Secure Aggregation Protocols (2021) # Abstract: Novel cryptographic protocols for secure parameter aggregation in federated settings, tested on 100 devices... # Paper 3: Privacy Budget Allocation (2020) # Abstract: Adaptive privacy budget allocation across federated learning rounds, optimizing utility-privacy tradeoff... # DEEP ANALYSIS: # {{ # "knowledge_gaps": [ # "CRITICAL: No empirical studies on privacy leakage through gradient updates in heterogeneous device environments (IoT + mobile + edge)", # "UNEXPLORED: Privacy guarantees degrade with model size - no theoretical bounds for models >1B parameters in federated settings", # "MISSING: Real-world privacy attack success rates beyond academic datasets - what about medical records, financial data?", # "UNKNOWN: Privacy-utility tradeoffs in non-IID data distributions with extreme class imbalance (e.g., rare disease detection)" # ], # "methodological_gaps": [ # "NO FRAMEWORK: Combining differential privacy + secure aggregation + homomorphic encryption in single unified system - existing work addresses only 1-2", # "MISSING EVALUATION: Longitudinal privacy analysis across 100+ training rounds - current studies stop at 10-20 rounds", # "ABSENT: Privacy-preserving techniques for vertical federated learning (different features per client) - all work focuses on horizontal FL", # "LACKING: Adaptive privacy mechanisms that adjust epsilon based on attack risk in real-time during training" # ], # "dataset_gaps": [ # "CRITICAL MISSING: Standardized privacy attack benchmark suite with diverse attack vectors (membership inference, model inversion, gradient leakage)", # "NO BENCHMARK: Real federated datasets with known privacy violations - current benchmarks use synthetic privacy labels", # "LACKING: Heterogeneous device capability datasets showing computation/communication/privacy tradeoffs across 1000+ devices", # "ABSENT: Privacy audit trails from production federated learning deployments" # ], # "temporal_gaps": [ # "OUTDATED (pre-2023): Privacy analysis doesn't account for LLM-scale models (GPT-4, Llama-2) in federated settings", # "OBSOLETE HARDWARE: Studies assume 2019-era mobile devices - modern edge TPUs and neural engines change privacy-performance dynamics", # "MISSING 2024 CONTEXT: New privacy regulations (EU AI Act, US state laws) not reflected in federated learning design" # ] # }} # EXAMPLE 2: # Topic: Explainable AI in Medical Diagnosis # Papers: # Paper 1: SHAP for Medical Imaging (2021) # Abstract: Applying SHAP values to explain CNN predictions in chest X-ray diagnosis, achieving correlation with radiologist attention... # Paper 2: LIME in Clinical Decision Support (2020) # Abstract: Local interpretable model-agnostic explanations for electronic health record-based predictions... # Paper 3: Attention Visualization in Diagnosis (2022) # Abstract: Visualizing transformer attention patterns for disease classification from medical images... # DEEP ANALYSIS: # {{ # "knowledge_gaps": [ # "CRITICAL: Zero studies on whether physicians ACTUALLY change treatment decisions based on AI explanations - all measure 'trust' not clinical outcomes", # "UNEXPLORED: Explanation quality for rare diseases (<1% prevalence) where models have insufficient training data - current work focuses on common conditions", # "MISSING: Conflicting explanation scenarios - what happens when SHAP suggests feature X but physician believes Y? No resolution frameworks exist", # "UNKNOWN: Cognitive load of processing AI explanations during emergency medicine - explanations may slow critical decisions" # ], # "methodological_gaps": [ # "NO GOLD STANDARD: Evaluating explanation correctness requires ground truth (which features TRULY matter) - current methods use proxy metrics like 'plausibility'", # "MISSING FRAMEWORK: Comparative explanation methods - should we use SHAP vs LIME vs attention? No decision tree exists for medical contexts", # "ABSENT: Real-time explanation generation for time-critical diagnoses - current methods take 30+ seconds, unacceptable in ER settings", # "LACKING: Multi-modal explanations combining imaging + EHR + genomics - existing work explains single modality only" # ], # "dataset_gaps": [ # "CRITICAL MISSING: Datasets with physician-annotated 'ground truth' explanations for 1000+ diagnoses - current datasets lack expert labels", # "NO BENCHMARK: Longitudinal patient data showing how explanations affected treatment outcomes over months/years", # "LACKING: Adversarial explanation datasets - cases where explanations are intentionally misleading yet appear valid", # "ABSENT: Cross-hospital explanation generalization datasets - do explanations transfer between institutions?" # ], # "temporal_gaps": [ # "OUTDATED (pre-2023): Explanation methods designed for CNNs, not foundation models (MedPaLM, GPT-4 for medicine) with emergent reasoning", # "OBSOLETE REGULATORY: FDA guidance on AI explainability from 2019 - new 2024 requirements for transparency not addressed", # "MISSING CURRENT CONTEXT: Post-COVID telehealth adoption means remote explanation delivery - no studies on explaining AI over video consultations" # ] # }} # NOW ANALYZE THIS RESEARCH AREA WITH THE SAME DEPTH AND SPECIFICITY: # Topic: {topic} # Papers:{papers_text} # REQUIREMENTS FOR YOUR ANALYSIS: # 1. Each gap must be SPECIFIC with technical details, numbers, or clear scenarios - NOT generic statements # 2. Identify CRITICAL gaps that block real-world deployment or advancement # 3. Point out CONTRADICTIONS or CONFLICTS in existing research approaches # 4. Consider PRACTICAL implications (cost, time, feasibility, scalability) # 5. Include WHY each gap matters - what problem does it cause? What opportunities does it create? # 6. Mention specific technologies, datasets, methods, or evaluation metrics that are missing # 7. Be ACTIONABLE - researchers should know exactly what to investigate next # 8. FIND AT LEAST 2-3 GAPS PER CATEGORY - there are ALWAYS gaps in research! # IMPORTANT: Even well-researched areas have gaps! Look for: # - Unstudied combinations of techniques # - Missing benchmarks or standardized evaluations # - Lack of real-world deployment studies # - Outdated assumptions from older papers # - Unexplored edge cases or failure modes # - Missing cross-domain applications # You MUST respond with ONLY valid JSON in this exact format (no extra text): # {{ # "knowledge_gaps": ["gap1", "gap2", "gap3"], # "methodological_gaps": ["gap1", "gap2", "gap3"], # "dataset_gaps": ["gap1", "gap2", "gap3"], # "temporal_gaps": ["gap1", "gap2"] # }} # Generate the JSON now:""" # return prompt # def analyze_gaps(self, papers: List[SearchResult], topic: str) -> Dict: # """ # Analyze research papers to identify gaps # Args: # papers: List of SearchResult objects from searches # topic: The research topic being analyzed # Returns: # Dictionary containing identified gaps in four categories # """ # if not self.model or not self.tokenizer: # return { # "error": "Model not loaded", # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # } # print(f"Analyzing {len(papers)} papers for research gaps...") # # Create the few-shot prompt # prompt = self._create_few_shot_prompt(papers, topic) # try: # # Tokenize the input # inputs = self.tokenizer( # prompt, # return_tensors="pt", # max_length=1024, # truncation=True # ).to(self.device) # # Generate the analysis with better parameters for detailed output # outputs = self.model.generate( # **inputs, # max_length=1024, # Increased from 512 for more detailed gaps # min_length=200, # Ensure substantial output # num_beams=5, # Increased from 4 for better quality # temperature=0.8, # Slightly higher for more creative insights # do_sample=True, # top_p=0.92, # Slightly higher for more diversity # repetition_penalty=1.2, # Avoid repetitive gaps # length_penalty=1.0 # ) # # Decode the output # analysis_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # print("Raw model output:", analysis_text[:200]) # # Try to parse as JSON # try: # gaps = json.loads(analysis_text) # except json.JSONDecodeError: # # If not valid JSON, try to extract structured information # gaps = self._parse_unstructured_output(analysis_text) # # Ensure all required fields exist # gaps.setdefault("knowledge_gaps", []) # gaps.setdefault("methodological_gaps", []) # gaps.setdefault("dataset_gaps", []) # gaps.setdefault("temporal_gaps", []) # return gaps # except Exception as e: # print(f"Error during gap analysis: {str(e)}") # return { # "error": str(e), # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # } # def _parse_unstructured_output(self, text: str) -> Dict: # """ # Parse unstructured model output into structured format # Args: # text: Raw text output from the model # Returns: # Dictionary with gap categories # """ # gaps = { # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # } # # Simple parsing logic - extract bullet points or numbered lists # lines = text.split('\n') # current_category = None # for line in lines: # line = line.strip() # if 'knowledge' in line.lower(): # current_category = 'knowledge_gaps' # elif 'methodological' in line.lower() or 'method' in line.lower(): # current_category = 'methodological_gaps' # elif 'dataset' in line.lower() or 'data' in line.lower(): # current_category = 'dataset_gaps' # elif 'temporal' in line.lower() or 'time' in line.lower(): # current_category = 'temporal_gaps' # elif current_category and (line.startswith('-') or line.startswith('โ€ข') or any(line.startswith(f"{i}.") for i in range(10))): # # Clean the line # cleaned = line.lstrip('-โ€ข0123456789. ').strip() # if cleaned and len(cleaned) > 10: # gaps[current_category].append(cleaned) # return gaps # def format_gaps_for_display(self, gaps: Dict) -> str: # """ # Format the gaps analysis for display in Streamlit # Args: # gaps: Dictionary of identified gaps # Returns: # Formatted markdown string # """ # output = "## ๐Ÿ” Research Gap Analysis\n\n" # if gaps.get("error"): # output += f"โš ๏ธ Error: {gaps['error']}\n\n" # # Knowledge Gaps # output += "### ๐Ÿ“š Knowledge Gaps\n" # output += "*Questions or areas that haven't been studied yet*\n\n" # if gaps.get("knowledge_gaps"): # for i, gap in enumerate(gaps["knowledge_gaps"], 1): # output += f"{i}. {gap}\n" # else: # output += "*No significant knowledge gaps identified*\n" # output += "\n" # # Methodological Gaps # output += "### ๐Ÿ”ฌ Methodological Gaps\n" # output += "*Research approaches or methods not tried*\n\n" # if gaps.get("methodological_gaps"): # for i, gap in enumerate(gaps["methodological_gaps"], 1): # output += f"{i}. {gap}\n" # else: # output += "*No significant methodological gaps identified*\n" # output += "\n" # # Dataset Gaps # output += "### ๐Ÿ’พ Dataset Gaps\n" # output += "*Missing or under-explored datasets*\n\n" # if gaps.get("dataset_gaps"): # for i, gap in enumerate(gaps["dataset_gaps"], 1): # output += f"{i}. {gap}\n" # else: # output += "*No significant dataset gaps identified*\n" # output += "\n" # # Temporal Gaps # output += "### โฐ Temporal Gaps\n" # output += "*Areas that are outdated or need updating*\n\n" # if gaps.get("temporal_gaps"): # for i, gap in enumerate(gaps["temporal_gaps"], 1): # output += f"{i}. {gap}\n" # else: # output += "*No significant temporal gaps identified*\n" # output += "\n" # return output class GapAnalysisEngine: """ Research Gap Analysis Engine using OpenAI GPT-4 Identifies knowledge, methodological, dataset, and temporal gaps in research papers """ def __init__(self, progress_callback=None): """Initialize the Gap Analysis Engine with OpenAI only Args: progress_callback: Optional callback function(message, progress) to report loading progress """ print("Initializing Gap Analysis Engine (OpenAI-powered)...") self.progress_callback = progress_callback # OpenAI client for gap analysis self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) # Comment out Flan-T5 model loading # self.model_name = "google/flan-t5-large" # self.tokenizer = None # self.model = None # self.device = "cuda" if torch.cuda.is_available() else "cpu" # self._models_loaded = False print("โœ… Gap Analysis Engine ready (OpenAI GPT-4 mode)") # Comment out the model loading method since we're not using Flan-T5 # def _ensure_models_loaded(self): # """Load models only when first needed""" # pass def analyze_gaps(self, papers: List[SearchResult], topic: str) -> Dict: """ Analyze research papers to identify gaps using OpenAI GPT-4 only Args: papers: List of research papers to analyze topic: Research topic Returns: Dictionary containing identified gaps by category """ try: print(f" โ†’ Analyzing {len(papers)} papers for research gaps...") # Skip Flan-T5 analysis entirely # print(" โ†’ Running Flan-T5-Large analysis...") # flan_gaps = self._analyze_with_flan_t5(papers, topic) # Use only OpenAI analysis print(" โ†’ Running GPT-4 gap analysis...") gpt_gaps = self._analyze_with_openai(papers, topic) print(f" โœ“ GPT-4 found {len(gpt_gaps.get('knowledge_gaps', []))} knowledge gaps") print(f" โœ“ Total unique gaps identified") return gpt_gaps except Exception as e: print(f" โŒ Error in gap analysis: {str(e)}") return { "knowledge_gaps": [], "methodological_gaps": [], "dataset_gaps": [], "temporal_gaps": [] } # Comment out the Flan-T5 analysis method # def _analyze_with_flan_t5(self, papers: List[SearchResult], topic: str) -> Dict: # """Analyze gaps using Flan-T5 model""" # # ... commented out ... # pass def _analyze_with_openai(self, papers: List[SearchResult], topic: str) -> Dict: """ Analyze gaps using OpenAI GPT-4 Args: papers: List of research papers topic: Research topic Returns: Dictionary with categorized gaps """ try: # Prepare paper summaries papers_text = self._prepare_papers_context(papers, max_papers=12) # Create comprehensive prompt prompt = f"""You are an expert research analyst. Analyze the following research papers on "{topic}" and identify research gaps. PAPERS TO ANALYZE: {papers_text} Identify gaps in these categories: 1. **Knowledge Gaps**: Unexplored research questions or phenomena 2. **Methodological Gaps**: Missing or inadequate research approaches 3. **Dataset Gaps**: Lack of suitable datasets or data limitations 4. **Temporal Gaps**: Outdated assumptions or need for updated research For each gap, provide: - Clear description - Why it's important - Potential impact if addressed Return your analysis in this JSON format: {{ "knowledge_gaps": [ {{"description": "...", "importance": "...", "impact": "..."}} ], "methodological_gaps": [ {{"description": "...", "importance": "...", "impact": "..."}} ], "dataset_gaps": [ {{"description": "...", "importance": "...", "impact": "..."}} ], "temporal_gaps": [ {{"description": "...", "importance": "...", "impact": "..."}} ] }} Provide 2-4 gaps per category. Be specific and actionable.""" # Call OpenAI API response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are an expert research analyst specializing in identifying research gaps. Return only valid JSON." }, { "role": "user", "content": prompt } ], temperature=0.7, max_tokens=2000 ) # Parse response response_text = response.choices[0].message.content.strip() # Clean markdown if present if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() gaps = json.loads(response_text) return gaps except json.JSONDecodeError as e: print(f" โš ๏ธ JSON parsing error: {str(e)}") return self._fallback_gap_structure() except Exception as e: print(f" โš ๏ธ OpenAI API error: {str(e)}") return self._fallback_gap_structure() def _prepare_papers_context(self, papers: List[SearchResult], max_papers: int = 12) -> str: """Prepare papers context for the prompt""" papers_text = "" for i, paper in enumerate(papers[:max_papers], 1): papers_text += f"\n{'='*60}\n" papers_text += f"PAPER {i}:\n" papers_text += f"Title: {paper.title}\n" if paper.authors: papers_text += f"Authors: {', '.join(paper.authors[:3])}\n" if paper.published: papers_text += f"Published: {paper.published}\n" # Use abstract or snippet content = paper.abstract if paper.abstract else paper.snippet if content: papers_text += f"Abstract: {content[:500]}...\n" return papers_text def _fallback_gap_structure(self) -> Dict: """Return fallback gap structure when analysis fails""" return { "knowledge_gaps": [ { "description": "Limited understanding of underlying mechanisms", "importance": "Critical for theoretical advancement", "impact": "Could lead to breakthrough insights" } ], "methodological_gaps": [ { "description": "Need for novel experimental approaches", "importance": "Current methods have limitations", "impact": "Enable more robust research" } ], "dataset_gaps": [ { "description": "Insufficient diverse datasets", "importance": "Limits generalizability", "impact": "Improve model performance" } ], "temporal_gaps": [ { "description": "Research needs updating with recent developments", "importance": "Field has evolved significantly", "impact": "More relevant findings" } ] } # Keep the formatting method def _format_gaps_output(self, gaps: Dict, papers_analyzed: int) -> str: """Format gaps into readable markdown output""" output = f"## ๐Ÿ” Research Gap Analysis\n\n" output += f"**Papers Analyzed:** {papers_analyzed}\n\n" if gaps.get('knowledge_gaps'): output += "### ๐Ÿ“š Knowledge Gaps\n" for i, gap in enumerate(gaps['knowledge_gaps'], 1): output += f"{i}. **{gap.get('description', 'N/A')}**\n" output += f" - *Importance:* {gap.get('importance', 'N/A')}\n" output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n" if gaps.get('methodological_gaps'): output += "### ๐Ÿ”ฌ Methodological Gaps\n" for i, gap in enumerate(gaps['methodological_gaps'], 1): output += f"{i}. **{gap.get('description', 'N/A')}**\n" output += f" - *Importance:* {gap.get('importance', 'N/A')}\n" output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n" if gaps.get('dataset_gaps'): output += "### ๐Ÿ’พ Dataset Gaps\n" for i, gap in enumerate(gaps['dataset_gaps'], 1): output += f"{i}. **{gap.get('description', 'N/A')}**\n" output += f" - *Importance:* {gap.get('importance', 'N/A')}\n" output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n" if gaps.get('temporal_gaps'): output += "### โฐ Temporal Gaps\n" for i, gap in enumerate(gaps['temporal_gaps'], 1): output += f"{i}. **{gap.get('description', 'N/A')}**\n" output += f" - *Importance:* {gap.get('importance', 'N/A')}\n" output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n" return output class HypothesisGenerator: """ Hypothesis Generation and Scoring Engine Converts research gaps into actionable hypotheses with novelty, impact, and feasibility scores """ def __init__(self): """Initialize OpenAI client and embedding model""" print("Initializing Hypothesis Generator...") # OpenAI for hypothesis generation self.api_key = os.getenv('OPENAI_API_KEY') if not self.api_key: print("Warning: OPENAI_API_KEY not found. Hypothesis generation will be limited.") self.client = None else: self.client = OpenAI(api_key=self.api_key) # Load embedding model for novelty scoring try: print("Loading embedding model for novelty scoring...") self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("Embedding model loaded successfully") except Exception as e: print(f"Warning: Could not load embedding model: {str(e)}") self.embedding_model = None def _create_hypothesis_prompt(self, gaps: Dict, topic: str) -> str: """ Create few-shot prompt for hypothesis generation Args: gaps: Dictionary of identified research gaps topic: Research topic Returns: Formatted prompt for GPT-4 """ # Format gaps for the prompt gaps_text = "IDENTIFIED RESEARCH GAPS:\n\n" if gaps.get("knowledge_gaps"): gaps_text += "๐Ÿ”ฌ Knowledge Gaps:\n" for i, gap in enumerate(gaps["knowledge_gaps"][:5], 1): gaps_text += f"{i}. {gap}\n" gaps_text += "\n" if gaps.get("methodological_gaps"): gaps_text += "๐Ÿ› ๏ธ Methodological Gaps:\n" for i, gap in enumerate(gaps["methodological_gaps"][:5], 1): gaps_text += f"{i}. {gap}\n" gaps_text += "\n" if gaps.get("dataset_gaps"): gaps_text += "๐Ÿ’พ Dataset Gaps:\n" for i, gap in enumerate(gaps["dataset_gaps"][:5], 1): gaps_text += f"{i}. {gap}\n" gaps_text += "\n" if gaps.get("temporal_gaps"): gaps_text += "โฐ Temporal Gaps:\n" for i, gap in enumerate(gaps["temporal_gaps"][:5], 1): gaps_text += f"{i}. {gap}\n" gaps_text += "\n" # Few-shot prompt with detailed examples prompt = f"""You are a world-class research strategist. Convert research gaps into actionable, fundable research hypotheses. EXAMPLE INPUT: Gap: "CRITICAL: No empirical studies on privacy leakage through gradient updates in heterogeneous device environments (IoT + mobile + edge)" EXAMPLE OUTPUT: {{ "hypothesis_id": 1, "title": "Privacy-Preserving Gradient Sharing in Heterogeneous Federated Learning", "gap_addressed": "No empirical studies on privacy leakage in heterogeneous federated environments", "problem_statement": "Current federated learning privacy studies assume homogeneous devices (all mobile or all edge), but real deployments mix IoT sensors, mobile phones, and edge servers. Privacy guarantees derived for homogeneous settings may not hold when devices have vastly different computational capabilities, leading to asymmetric privacy leakage where weaker devices reveal more information through gradient updates.", "proposed_solution": "Design and implement a heterogeneous federated learning testbed with 1000+ devices (IoT sensors, smartphones, edge servers) to empirically measure privacy leakage across device types. Develop adaptive differential privacy mechanisms that adjust noise levels based on device computational capacity. Create theoretical framework proving privacy bounds for heterogeneous settings.", "expected_impact": "First empirical evidence of device-specific privacy risks in real federated deployments. Will enable $50M+ industry deployments of federated learning in healthcare and smart cities by providing provable privacy guarantees for mixed-device environments. Expected to reduce privacy leakage by 40-60% compared to one-size-fits-all approaches.", "methodology": "1) Deploy federated learning across 1000 heterogeneous devices, 2) Conduct membership inference and gradient inversion attacks tailored to each device type, 3) Measure information leakage using mutual information and attack success rates, 4) Develop and evaluate adaptive DP mechanisms, 5) Prove theoretical privacy bounds", "required_resources": {{ "compute": "Access to 1000 heterogeneous devices (or simulation), 10 GPUs for attack models", "data": "Federated datasets (FEMNIST, Shakespeare, medical imaging if available)", "tools": "PyTorch, PySyft for federated learning, privacy attack libraries", "team": "2-3 researchers (1 privacy expert, 1 systems engineer, 1 ML researcher)", "timeline": "12-18 months", "estimated_cost": "$80K-120K (device access, compute, personnel)" }}, "novelty_score": 9, "impact_score": 9, "feasibility_score": 7, "priority": "HIGH", "target_venue": "USENIX Security, IEEE S&P, NeurIPS (privacy track)", "potential_collaborators": "Federated learning research groups at Google, Meta, academic labs working on privacy-preserving ML" }} EXAMPLE INPUT 2: Gap: "MISSING FRAMEWORK: Real-time carbon footprint tracking during training with <5% overhead" EXAMPLE OUTPUT 2: {{ "hypothesis_id": 2, "title": "ZeroCarbon: Real-Time Energy and Carbon Tracking for ML Training", "gap_addressed": "No real-time carbon tracking framework with <5% overhead during model training", "problem_statement": "Existing carbon tracking tools (CodeCarbon, experiment-impact-tracker) add 20-30% training overhead, making them impractical for production use. Researchers cannot monitor carbon emissions in real-time during training, leading to wasteful hyperparameter searches and missed optimization opportunities. Need lightweight system that tracks energy/carbon with <5% overhead while providing actionable insights.", "proposed_solution": "Develop ZeroCarbon: a GPU kernel-level energy monitoring system that hooks into CUDA/PyTorch at minimal overhead. Use hardware performance counters instead of external power meters. Implement predictive modeling to estimate full training carbon cost after first 100 iterations. Provide real-time dashboard showing carbon/hour and projected total emissions with recommendations to stop wasteful runs.", "expected_impact": "Enable carbon-aware ML training at scale. If adopted by 10% of ML researchers, could reduce global ML training carbon by 5-8% (estimated 50K-100K tons CO2/year). Will become standard tool in MLOps pipelines at major tech companies. Expected to identify and stop 30-40% of wasteful training runs early.", "methodology": "1) Develop CUDA kernel hooks for energy monitoring, 2) Benchmark overhead across 20+ model architectures (ResNet, BERT, GPT), 3) Build predictive models for carbon estimation, 4) Create real-time dashboard with FastAPI backend, 5) Conduct user studies with 50+ ML practitioners, 6) Integrate with popular frameworks (PyTorch Lightning, HuggingFace Trainer)", "required_resources": {{ "compute": "4-8 GPUs (A100/H100) for benchmarking, cloud credits for testing", "data": "Training logs from diverse ML workloads", "tools": "CUDA, PyTorch, FastAPI, React for dashboard", "team": "2-3 developers (1 systems engineer, 1 ML engineer, 1 full-stack dev)", "timeline": "6-9 months", "estimated_cost": "$40K-60K (compute, personnel)" }}, "novelty_score": 8, "impact_score": 9, "feasibility_score": 8, "priority": "HIGH", "target_venue": "MLSys, SysML, NeurIPS (datasets/benchmarks track)", "potential_collaborators": "Green AI labs, cloud providers (AWS, Google Cloud), MLOps startups" }} NOW GENERATE HYPOTHESES FOR THIS RESEARCH AREA: Topic: {topic} {gaps_text} Generate EXACTLY 5 HIGH-QUALITY, ACTIONABLE research hypotheses that: 1. Address the most CRITICAL gaps identified above 2. Are SPECIFIC with technical details, timelines, and resource estimates 3. Include realistic impact projections (numbers, percentages, cost savings) 4. Provide clear methodology and required resources 5. Can realistically be executed by a research team or PhD student 6. Have clear success metrics and target publication venues 7. Cover DIVERSE approaches - don't repeat similar ideas CRITICAL: Generate EXACTLY 5 hypotheses, not 3-4. Each should be unique and address different gaps. IMPORTANT: Your response must be ONLY valid JSON in this exact format: {{ "hypotheses": [ {{ "hypothesis_id": 1, "title": "...", "gap_addressed": "...", "problem_statement": "...", "proposed_solution": "...", "expected_impact": "...", "methodology": "...", "required_resources": {{...}}, "novelty_score": 8, "impact_score": 9, "feasibility_score": 7, "priority": "HIGH", "target_venue": "...", "potential_collaborators": "..." }}, {{ "hypothesis_id": 2, ... }}, {{ "hypothesis_id": 3, ... }}, {{ "hypothesis_id": 4, ... }}, {{ "hypothesis_id": 5, ... }} ] }} Respond with ONLY the JSON containing EXACTLY 5 hypotheses, no additional text.""" return prompt def _calculate_novelty_score(self, hypothesis_text: str, papers: List[SearchResult]) -> float: """ Calculate novelty score by comparing hypothesis to existing papers using embeddings Args: hypothesis_text: The hypothesis text to evaluate papers: List of existing papers Returns: Novelty score from 1-10 (10 = highly novel) """ if not self.embedding_model or not papers: return 7.0 # Default moderate novelty if can't calculate try: # Combine title and proposed solution for embedding hypothesis_embedding = self.embedding_model.encode([hypothesis_text]) # Get embeddings for existing paper abstracts paper_texts = [] for paper in papers[:15]: # Limit to first 15 papers if paper.abstract: paper_texts.append(paper.abstract[:500]) elif paper.snippet: paper_texts.append(paper.snippet[:500]) if not paper_texts: return 7.0 paper_embeddings = self.embedding_model.encode(paper_texts) # Calculate cosine similarities similarities = cosine_similarity(hypothesis_embedding, paper_embeddings)[0] # Novelty score: inverse of max similarity, scaled to 1-10 # High similarity = low novelty, low similarity = high novelty max_similarity = np.max(similarities) novelty_score = (1 - max_similarity) * 10 # Ensure score is between 1 and 10 novelty_score = max(1.0, min(10.0, novelty_score)) return round(novelty_score, 1) except Exception as e: print(f"Error calculating novelty score: {str(e)}") return 7.0 async def generate_hypotheses( self, gaps: Dict, topic: str, papers: List[SearchResult] = None ) -> Dict: """ Generate research hypotheses from identified gaps Args: gaps: Dictionary of research gaps from gap analysis topic: Research topic papers: Optional list of papers for novelty scoring Returns: Dictionary containing generated hypotheses with scores """ if not self.client: return { "error": "OpenAI API not configured", "hypotheses": [] } print(f"\n๐Ÿ’ก Generating research hypotheses for: {topic}") # Create the prompt prompt = self._create_hypothesis_prompt(gaps, topic) try: # Generate hypotheses using GPT-4 print("Calling GPT-4 to generate hypotheses...") response = self.client.chat.completions.create( model="gpt-4", # Use GPT-4 for highest quality and better scoring messages=[ {"role": "system", "content": "You are a world-class research strategist and grant proposal expert. Generate detailed, actionable research hypotheses with HIGH scores (7-10 range). Always respond with valid JSON only."}, {"role": "user", "content": prompt} ], temperature=0.8, # Higher for creativity max_tokens=4000 # Enough for 5 detailed hypotheses ) # Parse the response hypotheses_json = json.loads(response.choices[0].message.content) # Handle both single hypothesis and array formats if "hypotheses" in hypotheses_json: hypotheses_list = hypotheses_json["hypotheses"] elif isinstance(hypotheses_json, list): hypotheses_list = hypotheses_json else: # Wrap single hypothesis in array hypotheses_list = [hypotheses_json] # Calculate novelty scores using embeddings and overall scores if papers and self.embedding_model: print("Calculating novelty scores using embeddings...") for hyp in hypotheses_list: # Create text for novelty comparison hyp_text = f"{hyp.get('title', '')} {hyp.get('proposed_solution', '')}" novelty = self._calculate_novelty_score(hyp_text, papers) hyp['calculated_novelty_score'] = novelty # Use embedding-based novelty as primary score (more objective) hyp['novelty_score'] = novelty # Ensure all scores exist and are in valid range if 'impact_score' not in hyp or hyp['impact_score'] == 0: hyp['impact_score'] = 8 # Default high impact if 'feasibility_score' not in hyp or hyp['feasibility_score'] == 0: hyp['feasibility_score'] = 7 # Default good feasibility # Calculate overall score as weighted average # Novelty: 40%, Impact: 40%, Feasibility: 20% hyp['overall_score'] = round( (hyp['novelty_score'] * 0.4 + hyp['impact_score'] * 0.4 + hyp['feasibility_score'] * 0.2), 1 ) else: # No papers for novelty calculation, use GPT-4's scores and calculate overall for hyp in hypotheses_list: if 'impact_score' not in hyp or hyp['impact_score'] == 0: hyp['impact_score'] = 8 if 'feasibility_score' not in hyp or hyp['feasibility_score'] == 0: hyp['feasibility_score'] = 7 if 'novelty_score' not in hyp or hyp['novelty_score'] == 0: hyp['novelty_score'] = 7 hyp['overall_score'] = round( (hyp['novelty_score'] * 0.4 + hyp['impact_score'] * 0.4 + hyp['feasibility_score'] * 0.2), 1 ) # Sort hypotheses by overall score (highest first) hypotheses_list.sort(key=lambda h: h.get('overall_score', 0), reverse=True) print(f"โœ“ Generated {len(hypotheses_list)} hypotheses (sorted by overall score)") print(f" Top hypothesis overall score: {hypotheses_list[0].get('overall_score', 0)}/10") return { "hypotheses": hypotheses_list, "count": len(hypotheses_list), "topic": topic } except Exception as e: print(f"Error generating hypotheses: {str(e)}") return { "error": str(e), "hypotheses": [] } def format_hypotheses_for_display(self, hypotheses_data: Dict) -> str: """ Format hypotheses for display in Streamlit Args: hypotheses_data: Dictionary containing hypotheses Returns: Formatted markdown string """ if hypotheses_data.get("error"): return f"โš ๏ธ Error: {hypotheses_data['error']}" hypotheses = hypotheses_data.get("hypotheses", []) if not hypotheses: return "No hypotheses generated" output = "## ๐Ÿ’ก Generated Research Hypotheses\n\n" output += f"*Generated {len(hypotheses)} actionable research proposals*\n\n" output += "---\n\n" for i, hyp in enumerate(hypotheses, 1): # Header with title and priority priority = hyp.get('priority', 'MEDIUM') priority_emoji = "๐Ÿ”ด" if priority == "HIGH" else "๐ŸŸก" if priority == "MEDIUM" else "๐ŸŸข" output += f"### {priority_emoji} Hypothesis {i}: {hyp.get('title', 'Untitled')}\n\n" # Scores display novelty = hyp.get('novelty_score', 'N/A') impact = hyp.get('impact_score', 'N/A') feasibility = hyp.get('feasibility_score', 'N/A') output += f"**๐Ÿ“Š Scores:** Novelty: {novelty}/10 | Impact: {impact}/10 | Feasibility: {feasibility}/10\n\n" # Gap addressed if hyp.get('gap_addressed'): output += f"**๐ŸŽฏ Gap Addressed:** {hyp['gap_addressed']}\n\n" # Problem statement if hyp.get('problem_statement'): output += f"**โ“ Problem Statement:**\n{hyp['problem_statement']}\n\n" # Proposed solution if hyp.get('proposed_solution'): output += f"**๐Ÿ’ก Proposed Solution:**\n{hyp['proposed_solution']}\n\n" # Expected impact if hyp.get('expected_impact'): output += f"**๐Ÿš€ Expected Impact:**\n{hyp['expected_impact']}\n\n" # Methodology if hyp.get('methodology'): output += f"**๐Ÿ”ฌ Methodology:**\n{hyp['methodology']}\n\n" # Required resources if hyp.get('required_resources'): output += f"**๐Ÿ“‹ Required Resources:**\n" resources = hyp['required_resources'] if isinstance(resources, dict): for key, value in resources.items(): output += f"- **{key.title()}:** {value}\n" else: output += f"{resources}\n" output += "\n" # Target venue and collaborators if hyp.get('target_venue'): output += f"**๐Ÿ“ฐ Target Venues:** {hyp['target_venue']}\n\n" if hyp.get('potential_collaborators'): output += f"**๐Ÿค Potential Collaborators:** {hyp['potential_collaborators']}\n\n" output += "---\n\n" return output class ExperimentGenerator: """Generate experiments, datasets, and metrics for research hypotheses""" def __init__(self): """Initialize the experiment generator""" self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) self.dataset_fetcher = DatasetFetcher() print("โœ… Experiment Generator initialized") def generate_experiments_for_hypothesis(self, hypothesis: dict, topic: str = "") -> dict: """ Generate structured experiment plan for a given hypothesis Args: hypothesis (dict): Selected hypothesis object topic (str): Original research topic for context Returns: dict: Structured experiment plan with datasets, metrics, etc. """ try: print(f"๐Ÿงช Generating experiments for hypothesis: {hypothesis.get('title', 'Unknown')}") # Extract hypothesis text hypothesis_text = self._extract_hypothesis_text(hypothesis) # Create prompt prompt = self._create_experiment_prompt(hypothesis_text, topic) # Call OpenAI API with higher token limit for detailed experiments response = self.client.chat.completions.create( model="gpt-3.5-turbo-16k", # Use 16k model for longer responses messages=[ { "role": "system", "content": "You are an expert AI research experiment designer. Generate COMPLETE, PUBLICATION-READY experiments with ALL required details. Return ONLY valid JSON without any markdown formatting or code blocks." }, { "role": "user", "content": prompt } ], temperature=0.7, max_tokens=4000 # Max for gpt-3.5-turbo-16k output ) # Parse response response_text = response.choices[0].message.content.strip() print(f"๐Ÿ“ Raw API response length: {len(response_text)} characters") # Clean response (remove markdown if present) if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() elif response_text.startswith("```"): response_text = response_text.replace("```", "").strip() # Try to parse JSON try: experiments_data = json.loads(response_text) print(f"โœ… Successfully parsed JSON with {len(experiments_data.get('experiments', []))} experiments") except json.JSONDecodeError as json_err: print(f"โŒ JSON parsing failed: {str(json_err)}") print(f"๐Ÿ“„ First 500 chars of response: {response_text[:500]}") print(f"๐Ÿ“„ Last 200 chars of response: {response_text[-200:]}") raise # Re-raise to trigger fallback # Fetch REAL datasets from Kaggle API (with AI fallback) print("๐Ÿ” Fetching datasets from Kaggle...") try: # Use dataset keywords if provided, otherwise use topic dataset_keywords = experiments_data.get('dataset_keywords', []) print(f"๐Ÿ”Ž Keywords: {dataset_keywords}") real_datasets = self.dataset_fetcher.get_datasets_for_topic( topic=topic, hypothesis=hypothesis.get('title', ''), dataset_keywords=dataset_keywords ) print(f"๐Ÿ“Š Found {len(real_datasets)} datasets") # Store datasets as simple list experiments_data['datasets'] = real_datasets print(f"โœ… Added {len(real_datasets)} dataset(s)") except Exception as dataset_err: print(f"โš ๏ธ Dataset fetching failed: {str(dataset_err)}") experiments_data['datasets'] = [] # Add metadata experiments_data["metadata"] = { "hypothesis_id": hypothesis.get("hypothesis_id", "unknown"), "hypothesis_title": hypothesis.get("title", "Unknown Hypothesis"), "generated_at": datetime.now().isoformat(), "model_used": "gpt-3.5-turbo-16k", "datasets_verified": bool(real_datasets and len(real_datasets) > 0) if 'real_datasets' in locals() else False } print(f"โœ… Generated {len(experiments_data.get('experiments', []))} experiments") return experiments_data except json.JSONDecodeError as e: print(f"โŒ JSON parsing error: {str(e)}") print(" Returning fallback experiment structure") return self._get_fallback_experiment_structure() except Exception as e: print(f"โŒ Error generating experiments: {str(e)}") import traceback traceback.print_exc() print(" Returning fallback experiment structure") return self._get_fallback_experiment_structure() def _extract_hypothesis_text(self, hypothesis: dict) -> str: """Extract ALL relevant details from hypothesis object for complete context""" parts = [] # Core hypothesis information if hypothesis.get("title"): parts.append(f"Hypothesis Title: {hypothesis['title']}") if hypothesis.get("problem_statement"): parts.append(f"Problem Statement: {hypothesis['problem_statement']}") if hypothesis.get("proposed_solution"): parts.append(f"Proposed Solution: {hypothesis['proposed_solution']}") if hypothesis.get("gap_addressed"): parts.append(f"Research Gap Being Addressed: {hypothesis['gap_addressed']}") if hypothesis.get("expected_impact"): parts.append(f"Expected Impact: {hypothesis['expected_impact']}") if hypothesis.get("methodology"): parts.append(f"Suggested Methodology: {hypothesis['methodology']}") # Scores for context on feasibility scores = [] if hypothesis.get("novelty_score"): scores.append(f"Novelty: {hypothesis['novelty_score']}/10") if hypothesis.get("impact_score"): scores.append(f"Impact: {hypothesis['impact_score']}/10") if hypothesis.get("feasibility_score"): scores.append(f"Feasibility: {hypothesis['feasibility_score']}/10") if scores: parts.append(f"Scores: {', '.join(scores)}") return "\n\n".join(parts) def _create_experiment_prompt(self, hypothesis_text: str, topic: str) -> str: """Create the experiment generation prompt""" return f""" Generate 2-3 COMPLETE experiments to test this hypothesis. Make them SPECIFIC to the hypothesis, not generic. TOPIC: {topic} HYPOTHESIS: {hypothesis_text} For each experiment, provide: 1. **id**: Number (1, 2, 3) 2. **title**: Specific title reflecting the hypothesis 3. **introduction**: 3-4 sentences explaining how this tests the hypothesis 4. **description**: 4-5 sentences detailing the approach 5. **steps**: Array of 8-12 detailed methodology steps specific to this hypothesis 6. **difficulty**: "Easy", "Medium", or "Hard" 7. **estimated_time**: Realistic timeline (e.g., "2-3 months") 8. **required_resources**: Array of specific computing/storage/tool requirements Also provide SHARED resources: - **dataset_keywords**: Array of 3-5 keywords for finding relevant datasets - **metrics**: Array of 4-5 evaluation metrics with name, description, range - **architectures**: Array of 3-4 model architectures with name, description, parameters - **baselines**: Array of 3 baseline models with name, expected_performance, description - **challenges**: Array of 4-5 specific challenges with mitigation strategies - **expected_outcomes**: Object with accuracy_range, training_time, model_size, inference_speed IMPORTANT: - Design experiments SPECIFICALLY for THIS hypothesis (not generic ML experiments) - Steps must be detailed and hypothesis-specific - Use domain-appropriate datasets, metrics, and models that match the hypothesis topic - Tailor everything (data sources, evaluation methods, architectures) to the specific research domain Return ONLY valid JSON (no markdown, no code blocks). JSON format: {{ "experiments": [ {{ "id": 1, "title": "Specific experiment title", "introduction": "How this tests the hypothesis...", "description": "Detailed approach...", "steps": ["Step 1: ...", "Step 2: ...", ... 8-12 steps], "difficulty": "Medium", "estimated_time": "2-3 months", "required_resources": ["GPU requirements", "Storage", "Software"] }} ], "dataset_keywords": ["keyword1", "keyword2", "keyword3"], "metrics": [{{"name": "Accuracy", "description": "...", "range": "0-100%"}}], "architectures": [{{"name": "Model Name", "description": "...", "parameters": "100M"}}], "baselines": [{{"name": "Baseline", "expected_performance": "70%", "description": "..."}}], "challenges": ["Challenge 1 with mitigation", "Challenge 2..."], "expected_outcomes": {{"accuracy_range": "85-92%", "training_time": "6-8h", "model_size": "150MB", "inference_speed": "20ms"}} }} """ def _get_fallback_experiment_structure(self) -> dict: """Return fallback structure when API fails""" return { "experiments": [ { "id": 1, "title": "Basic Implementation", "description": "Implement a basic version of the proposed solution", "steps": [ "Collect and preprocess data", "Implement baseline model", "Train and validate", "Analyze results" ], "difficulty": "intermediate", "estimated_time": "2-4 weeks", "required_resources": ["Computing resources", "Dataset access"] } ], "datasets": { "kaggle": [], "huggingface": [] }, "metrics": [ {"name": "Accuracy", "description": "Classification accuracy", "range": "0-1"} ], "architectures": [], "baselines": [], "challenges": ["Limited information available"], "expected_outcomes": { "accuracy_range": "To be determined", "training_time": "Variable", "model_size": "Unknown", "inference_speed": "Unknown" }, "alternatives": [], "metadata": { "generated_at": datetime.now().isoformat(), "model_used": "fallback", "status": "limited_data" } } class Agent: """AI agent wrapper for OpenAI ChatGPT model""" def __init__(self, name: str, model: str = "gpt-3.5-turbo"): self.name = name self.api_key = os.getenv('OPENAI_API_KEY') if not self.api_key: raise ValueError("OPENAI_API_KEY environment variable not set") try: self.client = OpenAI(api_key=self.api_key) self.model = model self.chat_history = [] except Exception as e: error_msg = f"Error initializing {name}: {str(e)}" print(error_msg) raise ValueError(error_msg) async def process(self, message: str) -> str: """ Process a message using OpenAI ChatGPT model """ if not self.client: return "Error: Agent not properly initialized" try: # Add user message to history self.chat_history.append({ "role": "user", "content": message }) # Call OpenAI API response = self.client.chat.completions.create( model=self.model, messages=self.chat_history, temperature=0.7, max_tokens=2000 ) # Extract response text assistant_message = response.choices[0].message.content # Add assistant response to history self.chat_history.append({ "role": "assistant", "content": assistant_message }) return assistant_message except Exception as e: error_msg = f"OpenAI API Error: {str(e)}" print(error_msg) return error_msg class MultiAgentSystem: """ System coordinating multiple agents for literature review """ def __init__(self): # Initialize OpenAI client for GPT-4 gap analysis fallback self.api_key = os.getenv('OPENAI_API_KEY') if self.api_key: self.client = OpenAI(api_key=self.api_key) else: print("Warning: OPENAI_API_KEY not found. GPT-4 fallback unavailable.") self.client = None try: self.google_agent = Agent("Google_Search_Agent") except Exception as e: print(f"Failed to initialize Google agent: {str(e)}") self.google_agent = None try: self.arxiv_agent = Agent("Arxiv_Search_Agent") except Exception as e: print(f"Failed to initialize Arxiv agent: {str(e)}") self.arxiv_agent = None try: self.clinical_trials_agent = Agent("ClinicalTrials_Search_Agent") except Exception as e: print(f"Failed to initialize ClinicalTrials agent: {str(e)}") self.clinical_trials_agent = None try: self.report_agent = Agent("Report_Agent") except Exception as e: print(f"Failed to initialize Report agent: {str(e)}") self.report_agent = None self.search_tool = SearchTool() # ============================================================================ # MODIFIED: Don't initialize heavy models here - use lazy loading instead # ============================================================================ self._gap_analyzer = None self._hypothesis_generator = None self._experiment_generator = None print("โœ… MultiAgentSystem initialized (models will load on demand)") # ============================================================================ # LAZY LOADING PROPERTIES - ADD THESE NEW METHODS # ============================================================================ @property def gap_analyzer(self): """Lazy load gap analyzer only when needed""" if self._gap_analyzer is None: self._gap_analyzer = load_gap_analyzer() return self._gap_analyzer @property def hypothesis_generator(self): """Lazy load hypothesis generator only when needed""" if self._hypothesis_generator is None: self._hypothesis_generator = load_hypothesis_generator() return self._hypothesis_generator @property def experiment_generator(self): """Lazy load experiment generator only when needed""" if self._experiment_generator is None: self._experiment_generator = load_experiment_generator() return self._experiment_generator async def run_gap_analysis(self, topic: str) -> Dict: """ Run gap analysis on research papers for a given topic Collects papers and analyzes them for research gaps Args: topic: The research topic to analyze Returns: Dictionary containing gap analysis results and formatted output """ print(f"\n๐Ÿ” Starting Gap Analysis for: {topic}") # Collect papers all_papers = await self._collect_papers(topic) print(f"Collected {len(all_papers)} papers total for analysis") # Analyze gaps using the collected papers return await self._run_gap_analysis_with_papers(all_papers, topic) async def _openai_gap_analysis_fallback(self, papers: List[SearchResult], topic: str) -> Dict: """ Use OpenAI GPT-4 for comprehensive gap analysis Args: papers: List of research papers topic: Research topic Returns: Dictionary of research gaps """ if not self.client: print("OpenAI client not available") return { "knowledge_gaps": ["OpenAI API key not configured"], "methodological_gaps": [], "dataset_gaps": [], "temporal_gaps": [] } try: # Create detailed papers summary with MORE context papers_text = "" for i, paper in enumerate(papers[:15], 1): # Increased from 10 to 15 papers_text += f"\n{'='*60}\n" papers_text += f"Paper {i}: {paper.title}\n" if paper.authors: papers_text += f"Authors: {', '.join(paper.authors[:5])}\n" if paper.published: papers_text += f"Published: {paper.published}\n" # Use FULL abstract, not truncated if paper.abstract: papers_text += f"Abstract: {paper.abstract}\n" # Full abstract, no truncation elif paper.snippet: papers_text += f"Summary: {paper.snippet}\n" if paper.study_type: papers_text += f"Study Type: {paper.study_type}\n" if paper.phase: papers_text += f"Phase: {paper.phase}\n" prompt = f"""You are a PhD-level research analyst with 20+ years experience identifying critical research gaps. Your job is to DEEPLY analyze these papers and find SPECIFIC, ACTIONABLE gaps that researchers can build upon. RESEARCH TOPIC: {topic} PAPERS TO ANALYZE:{papers_text} {'='*80} CRITICAL INSTRUCTIONS: 1. READ EVERY PAPER CAREFULLY - Don't just skim 2. FIND AT LEAST 3-5 GAPS PER CATEGORY - Research ALWAYS has gaps! 3. Be SPECIFIC with technical details, numbers, methods, datasets 4. Each gap should be ACTIONABLE - what exactly is missing? 5. Look for: - Unstudied combinations of techniques - Missing benchmarks or evaluation standards - Lack of real-world deployment/application studies - Outdated assumptions from older papers - Unexplored edge cases or failure modes - Missing cross-domain applications - Contradictions between papers - Limited dataset sizes or diversity - Methods that haven't been tried together - Temporal aspects (what's changed since publication?) EXAMPLE OF GOOD GAPS (BE THIS SPECIFIC): โŒ BAD: "More research needed on privacy" โœ… GOOD: "CRITICAL: No empirical studies measuring privacy leakage in federated learning with >1000 heterogeneous devices (IoT + mobile + edge servers) - all existing work uses <100 homogeneous nodes" โŒ BAD: "Missing datasets" โœ… GOOD: "MISSING DATASET: No standardized benchmark for privacy attacks in federated learning with ground truth labels - researchers cannot compare attack success rates across studies" NOW ANALYZE AND FIND GAPS IN THESE 4 CATEGORIES: 1. KNOWLEDGE GAPS - What questions are NOT answered by these papers? - What aspects of {topic} have NOT been studied? - What combinations haven't been explored? - What assumptions are untested? - What edge cases are ignored? 2. METHODOLOGICAL GAPS - What approaches are MISSING? - What research methods haven't been tried? - What evaluation metrics are absent? - What experimental designs are lacking? - What theoretical frameworks don't exist? 3. DATASET GAPS - What data is MISSING or INSUFFICIENT? - What datasets don't exist but should? - What's the coverage (size, diversity, quality)? - What domains are underrepresented? - What benchmarks are needed? 4. TEMPORAL GAPS - What's OUTDATED? - What has changed since these papers were published? - What new technologies/methods aren't considered? - What recent events/regulations aren't reflected? YOU MUST FIND AT LEAST 2-3 SPECIFIC GAPS PER CATEGORY! Respond with ONLY valid JSON (no markdown, no extra text): {{ "knowledge_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details", "SPECIFIC gap 3 with details"], "methodological_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details", "SPECIFIC gap 3 with details"], "dataset_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details"], "temporal_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details"] }}""" response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[ { "role": "system", "content": "You are a world-class research analyst. You ALWAYS find specific, actionable research gaps. You respond with ONLY valid JSON, no markdown formatting, no extra text." }, {"role": "user", "content": prompt} ], temperature=0.8, # Increased from 0.7 for more creativity max_tokens=2500 # Increased from 1500 for more detailed gaps ) # Extract JSON from response (handle markdown code blocks if present) response_text = response.choices[0].message.content.strip() # Remove markdown code blocks if present if response_text.startswith("```"): # Find the JSON content between ``` markers lines = response_text.split('\n') json_lines = [] in_code_block = False for line in lines: if line.strip().startswith("```"): in_code_block = not in_code_block continue if in_code_block or (not line.strip().startswith("```")): json_lines.append(line) response_text = '\n'.join(json_lines) gaps_json = json.loads(response_text) # Validate that we got gaps total_gaps = sum(len(gaps_json.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) print(f" โœ“ GPT-4 extracted {total_gaps} total gaps") return gaps_json except json.JSONDecodeError as e: print(f"JSON parsing error: {str(e)}") print(f"Response was: {response_text[:500]}...") return { "knowledge_gaps": ["JSON parsing failed - please try again"], "methodological_gaps": [], "dataset_gaps": [], "temporal_gaps": [] } except Exception as e: print(f"OpenAI fallback error: {str(e)}") return { "knowledge_gaps": [f"Error: {str(e)}"], "methodological_gaps": [], "dataset_gaps": [], "temporal_gaps": [] } async def run_hypothesis_generation( self, gaps: Dict, topic: str, papers: List[SearchResult] = None ) -> Dict: """ Generate research hypotheses from identified gaps Args: gaps: Dictionary of research gaps topic: Research topic papers: Optional list of papers for novelty scoring Returns: Dictionary containing generated hypotheses """ print(f"\n๐Ÿ’ก Starting Hypothesis Generation for: {topic}") if not self.hypothesis_generator: return { "error": "Hypothesis generator not available", "hypotheses": [] } # Generate hypotheses result = await self.hypothesis_generator.generate_hypotheses(gaps, topic, papers) # Format for display if result.get("hypotheses"): formatted_output = self.hypothesis_generator.format_hypotheses_for_display(result) result["formatted_output"] = formatted_output return result async def run_complete_analysis(self, topic: str) -> Dict: """ Run complete research pipeline: Paper Collection โ†’ Literature Review โ†’ Gap Analysis โ†’ Hypothesis Generation All phases use the SAME papers for consistency Args: topic: Research topic Returns: Dictionary containing all results (review, gaps, hypotheses) """ print(f"\n๐Ÿš€ Starting COMPLETE ANALYSIS for: {topic}") print("=" * 80) results = { "topic": topic, "literature_review": None, "gap_analysis": None, "hypotheses": None, "papers": [] } try: # Step 1: Collect papers ONCE for all phases print("\n๐Ÿ“š PHASE 1/4: Paper Collection") all_papers = await self._collect_papers(topic) results["papers"] = all_papers print(f"โœ“ Collected {len(all_papers)} papers") # Step 2: Generate Literature Review using collected papers print("\n๐Ÿ“– PHASE 2/4: Literature Review") literature_review = await self.run_literature_review(topic, papers=all_papers) results["literature_review"] = literature_review # Step 3: Run Gap Analysis using same papers print("\n๐Ÿ” PHASE 3/4: Gap Analysis") gap_results = await self._run_gap_analysis_with_papers(all_papers, topic) results["gap_analysis"] = gap_results # Step 4: Generate Hypotheses from gaps using same papers print("\n๐Ÿ’ก PHASE 4/4: Hypothesis Generation") if gap_results.get("gaps"): hypothesis_results = await self.run_hypothesis_generation( gaps=gap_results["gaps"], topic=topic, papers=all_papers ) results["hypotheses"] = hypothesis_results else: print("โš ๏ธ No gaps found, skipping hypothesis generation") results["hypotheses"] = { "error": "No gaps identified for hypothesis generation", "hypotheses": [] } print("\nโœ… COMPLETE ANALYSIS FINISHED") print("=" * 80) return results except Exception as e: print(f"โŒ Error in complete analysis: {str(e)}") results["error"] = str(e) return results async def _collect_papers(self, topic: str) -> List[SearchResult]: """ Collect papers from all sources (internal method for reuse) Args: topic: Research topic Returns: List of SearchResult objects """ all_papers = [] # ArXiv Search print(f" โ†’ Collecting from arXiv...") arxiv_results = self.search_tool.arxiv_search(topic, max_results=7) all_papers.extend(arxiv_results) # Google Search print(f" โ†’ Collecting from Google...") google_results = self.search_tool.google_search(topic, num_results=5) all_papers.extend(google_results) # Clinical Trials print(f" โ†’ Collecting from ClinicalTrials.gov...") clinical_results = self.search_tool.clinicaltrials_search_beta_api(topic, max_results=3) if not clinical_results: clinical_results = self.search_tool.clinicaltrials_search_scrape(topic, max_results=3) all_papers.extend(clinical_results) return all_papers async def generate_experiments_for_hypothesis(self, hypothesis: dict, topic: str = "") -> dict: """ Generate experiments for a selected hypothesis Args: hypothesis (dict): Selected hypothesis topic (str): Original research topic Returns: dict: Experiment plan with datasets, metrics, etc. """ print(f"\n๐Ÿงช GENERATING EXPERIMENTS FOR HYPOTHESIS") print("=" * 50) try: experiments = self.experiment_generator.generate_experiments_for_hypothesis( hypothesis, topic ) print(f"โœ… Experiment generation completed successfully") return experiments except Exception as e: print(f"โŒ Error in experiment generation: {str(e)}") return self.experiment_generator._get_fallback_experiment_structure() # async def _run_gap_analysis_with_papers(self, papers: List[SearchResult], topic: str) -> Dict: # """ # Run gap analysis with pre-collected papers (internal method) # Uses BOTH Flan-T5 and GPT-4, then combines unique gaps # Args: # papers: Pre-collected papers # topic: Research topic # Returns: # Gap analysis results # """ # print(f" โ†’ Analyzing {len(papers)} papers for research gaps...") # if not papers or len(papers) == 0: # return { # "gaps": { # "error": "No papers found", # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # }, # "formatted_output": "โš ๏ธ No papers to analyze", # "papers_analyzed": 0, # "papers": [] # } # # Run BOTH models in parallel for redundancy # flan_t5_gaps = {} # gpt4_gaps = {} # # Method 1: Flan-T5-Large (local, free) # if self.gap_analyzer: # print(" โ†’ Running Flan-T5-Large analysis...") # try: # flan_t5_gaps = self.gap_analyzer.analyze_gaps(papers, topic) # total_flan = sum(len(flan_t5_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) # print(f" โœ“ Flan-T5 found {total_flan} gaps") # except Exception as e: # print(f" โš ๏ธ Flan-T5 error: {str(e)}") # flan_t5_gaps = { # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # } # # Method 2: GPT-4 (always run for reliability) # print(" โ†’ Running GPT-4 analysis...") # try: # gpt4_gaps = await self._openai_gap_analysis_fallback(papers, topic) # total_gpt = sum(len(gpt4_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) # print(f" โœ“ GPT-4 found {total_gpt} gaps") # except Exception as e: # print(f" โš ๏ธ GPT-4 error: {str(e)}") # gpt4_gaps = { # "knowledge_gaps": [], # "methodological_gaps": [], # "dataset_gaps": [], # "temporal_gaps": [] # } # # Combine and deduplicate gaps from both models # combined_gaps = self._combine_and_deduplicate_gaps(flan_t5_gaps, gpt4_gaps) # # Determine which model(s) contributed # total_flan = sum(len(flan_t5_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) # total_gpt = sum(len(gpt4_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) # if total_flan > 0 and total_gpt > 0: # combined_gaps["source"] = "Flan-T5-Large + GPT-4 (Combined)" # elif total_gpt > 0: # combined_gaps["source"] = "GPT-4" # elif total_flan > 0: # combined_gaps["source"] = "Flan-T5-Large" # else: # combined_gaps["source"] = "No gaps found" # total_combined = sum(len(combined_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]) # print(f" โœ“ Total unique gaps after combining: {total_combined}") # formatted_output = self.gap_analyzer.format_gaps_for_display(combined_gaps) if self.gap_analyzer else "No formatter available" # return { # "gaps": combined_gaps, # "formatted_output": formatted_output, # "papers_analyzed": len(papers), # "papers": papers # } async def _run_gap_analysis_with_papers(self, papers: List[SearchResult], topic: str) -> Dict: """ Run gap analysis using OpenAI GPT-4 only (Flan-T5 disabled for speed) Args: papers: List of research papers topic: Research topic Returns: Dictionary containing gap analysis results """ print(f"\n๐Ÿ” PHASE 3/4: Gap Analysis") try: # Use only OpenAI gap analysis (faster, no model loading) gaps = self.gap_analyzer.analyze_gaps(papers, topic) # Format output formatted_output = self.gap_analyzer._format_gaps_output(gaps, len(papers)) return { "gaps": gaps, "formatted_output": formatted_output, "papers_analyzed": len(papers), "method": "OpenAI GPT-4" } except Exception as e: print(f"โŒ Gap analysis error: {str(e)}") return { "gaps": self.gap_analyzer._fallback_gap_structure(), "formatted_output": "Gap analysis encountered an error.", "papers_analyzed": len(papers), "method": "fallback" } def _combine_and_deduplicate_gaps(self, gaps1: Dict, gaps2: Dict) -> Dict: """ Combine gaps from two sources and remove duplicates Args: gaps1: First set of gaps (e.g., from Flan-T5) gaps2: Second set of gaps (e.g., from GPT-4) Returns: Combined and deduplicated gaps """ combined = { "knowledge_gaps": [], "methodological_gaps": [], "dataset_gaps": [], "temporal_gaps": [] } for category in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]: # Get gaps from both sources gaps_set1 = gaps1.get(category, []) gaps_set2 = gaps2.get(category, []) # Combine them all_gaps = gaps_set1 + gaps_set2 # Deduplicate using similarity (case-insensitive, normalized) unique_gaps = [] seen_normalized = set() for gap in all_gaps: # Normalize: lowercase, remove extra spaces, remove punctuation normalized = gap.lower().strip() normalized = ' '.join(normalized.split()) # Remove extra whitespace # Check if we've seen something very similar is_duplicate = False for seen in seen_normalized: # If 80% of words overlap, consider it duplicate gap_words = set(normalized.split()) seen_words = set(seen.split()) if gap_words and seen_words: overlap = len(gap_words & seen_words) / max(len(gap_words), len(seen_words)) if overlap > 0.8: # 80% similarity threshold is_duplicate = True break if not is_duplicate: unique_gaps.append(gap) seen_normalized.add(normalized) combined[category] = unique_gaps return combined async def run_complete_analysis(self, topic: str) -> Dict: """ Run complete end-to-end analysis: Paper Collection โ†’ Literature Review โ†’ Gap Analysis โ†’ Hypothesis Generation All phases use the same set of papers for consistency Args: topic: Research topic Returns: Dict containing all analysis results """ results = { "topic": topic, "papers": [], "literature_review": "", "gap_analysis": {}, "hypotheses": {} } try: print("=" * 80) print(f"๐Ÿš€ STARTING COMPLETE ANALYSIS FOR: {topic}") print("=" * 80) # PHASE 1: Collect papers once for all subsequent phases print("\n๐Ÿ“„ PHASE 1/4: Collecting Papers") papers = await self._collect_papers(topic) results["papers"] = papers print(f" โœ“ Collected {len(papers)} papers total") # PHASE 2: Literature Review using collected papers print("\n๐Ÿ“š PHASE 2/4: Literature Review") literature_review = await self.run_literature_review(topic, papers=papers) results["literature_review"] = literature_review # PHASE 3: Gap Analysis using same papers print("\n๐Ÿ” PHASE 3/4: Gap Analysis") gap_results = await self._run_gap_analysis_with_papers(papers, topic) results["gap_analysis"] = gap_results # PHASE 4: Hypothesis Generation from gaps print("\n๐Ÿ’ก PHASE 4/4: Hypothesis Generation") if gap_results.get("gaps"): hypothesis_results = await self.run_hypothesis_generation( gaps=gap_results["gaps"], topic=topic, papers=papers ) results["hypotheses"] = hypothesis_results else: print("โš ๏ธ No gaps found, skipping hypothesis generation") results["hypotheses"] = { "error": "No gaps identified for hypothesis generation", "hypotheses": [] } print("\nโœ… COMPLETE ANALYSIS FINISHED") print("=" * 80) return results except Exception as e: print(f"โŒ Error in complete analysis: {str(e)}") results["error"] = str(e) return results async def run_literature_review(self, topic: str, papers: List[SearchResult] = None) -> str: """ Run a comprehensive literature review on a given topic Args: topic: Research topic papers: Optional pre-collected papers. If None, will collect new papers Returns: Markdown-formatted literature review """ # If no papers provided, collect them if papers is None: print(f"Collecting papers for literature review on '{topic}'...") papers = await self._collect_papers(topic) print(f"Generating comprehensive literature review from {len(papers)} papers...") # Build context from papers context = f"Research Topic: {topic}\n\n" context += "=" * 80 + "\n" context += "PAPERS FOR ANALYSIS:\n" context += "=" * 80 + "\n\n" for i, paper in enumerate(papers, 1): context += f"Paper {i}: {paper.title}\n" if paper.authors: context += f"Authors: {', '.join(paper.authors)}\n" if paper.published: context += f"Published: {paper.published}\n" if paper.abstract: context += f"Abstract: {paper.abstract[:500]}...\n" elif paper.snippet: context += f"Content: {paper.snippet}\n" if paper.arxiv_url: context += f"URL: {paper.arxiv_url}\n" elif hasattr(paper, 'link') and paper.link: context += f"URL: {paper.link}\n" context += "\n" # Generate literature review using GPT print("Generating comprehensive literature review report...") if not self.report_agent: return "Error: Report agent not initialized. Please check your OpenAI API key." report_prompt = f""" Generate a comprehensive literature review on {topic} based on {len(papers)} research papers provided below. {context} Please provide a well-structured literature review that: 1. Synthesizes the main findings and themes across ALL papers 2. Discusses key research directions and trends 3. Includes proper citations with author names and years 4. Provides clickable links to the papers when available 5. Concludes with future research directions based on the analysis 6. Write in academic style but keep it accessible 7. Be comprehensive - discuss ALL major papers, not just 2-3 Format the review with clear sections using markdown headers (##). """ report = await self.report_agent.process(report_prompt) return report # Example usage if __name__ == "__main__": import asyncio async def run_literature_review(): system = MultiAgentSystem() topic = input("Enter a medical or scientific topic for literature review: ") print(f"\nRunning comprehensive literature review on: {topic}") print("This may take a few minutes...\n") report = await system.run_literature_review(topic) print("\n" + "="*80) print("LITERATURE REVIEW REPORT") print("="*80 + "\n") print(report) # Save report to file filename = f"literature_review_{topic.replace(' ', '_')}.md" with open(filename, "w", encoding="utf-8") as f: f.write(report) print(f"\nReport saved to {filename}") async def test_search(): search_tool = SearchTool() query = input("Enter a medical topic to test search: ") print("\nTesting ClinicalTrials.gov Beta API search...") api_results = search_tool.clinicaltrials_search_beta_api(query, max_results=2) for result in api_results: print(f"Title: {result.title}") print(f"NCT ID: {result.nct_id}") print(f"URL: {result.link}") print(f"Status: {result.status}") print(f"Phase: {result.phase}") print(f"Conditions: {result.conditions}") print("-" * 50) print("\nTesting ClinicalTrials.gov scrape search (fallback method)...") scrape_results = search_tool.clinicaltrials_search_scrape(query, max_results=2) for result in scrape_results: print(f"Title: {result.title}") print(f"NCT ID: {result.nct_id}") print(f"URL: {result.link}") print(f"Status: {result.status}") print("-" * 50) # Choose which function to run choice = input("Choose an option:\n1. Run full literature review\n2. Test search only\nYour choice (1/2): ") if choice == "1": asyncio.run(run_literature_review()) else: asyncio.run(test_search())