ResearchForge / multi_agent_system.py
Amna2568's picture
Upload 6 files
4d020d6 verified
import os
import json
from dataclasses import dataclass
from typing import List, Optional, Dict, Tuple
from bs4 import BeautifulSoup
import requests
import time
import arxiv
from dotenv import load_dotenv
from openai import OpenAI
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from datetime import datetime
import streamlit as st
# Load environment variables first
load_dotenv()
# Import Kaggle only when needed (lazy import to avoid auto-authentication)
try:
from kaggle.api.kaggle_api_extended import KaggleApi
KAGGLE_AVAILABLE = True
except Exception as e:
print(f"⚠️ Kaggle API not available: {str(e)}")
KAGGLE_AVAILABLE = False
@st.cache_resource(show_spinner="Loading Gap Analysis Engine (Flan-T5-Large)... This may take 1-2 minutes on first run")
def load_gap_analyzer():
"""Load and cache the Gap Analysis Engine"""
print("🔄 Initializing Gap Analysis Engine...")
try:
analyzer = GapAnalysisEngine()
print("✅ Gap Analysis Engine loaded successfully!")
return analyzer
except Exception as e:
print(f"❌ Failed to load Gap Analysis Engine: {str(e)}")
return None
@st.cache_resource(show_spinner="Loading Hypothesis Generator...")
def load_hypothesis_generator():
"""Load and cache the Hypothesis Generator"""
print("🔄 Initializing Hypothesis Generator...")
try:
generator = HypothesisGenerator()
print("✅ Hypothesis Generator loaded successfully!")
return generator
except Exception as e:
print(f"❌ Failed to load Hypothesis Generator: {str(e)}")
return None
@st.cache_resource(show_spinner="Loading Experiment Generator...")
def load_experiment_generator():
"""Load and cache the Experiment Generator"""
print("🔄 Initializing Experiment Generator...")
try:
generator = ExperimentGenerator()
print("✅ Experiment Generator loaded successfully!")
return generator
except Exception as e:
print(f"❌ Failed to load Experiment Generator: {str(e)}")
return None
class DatasetFetcher:
"""Fetch real, verified datasets from Kaggle and HuggingFace"""
def __init__(self):
"""Initialize Kaggle API and HuggingFace client"""
self.kaggle_api = None
self.hf_token = os.getenv('HF_TOKEN')
# Initialize Kaggle API only if available and credentials exist
if not KAGGLE_AVAILABLE:
print("⚠️ Kaggle library not available. Dataset verification will be limited.")
return
try:
kaggle_username = os.getenv('KAGGLE_USERNAME')
kaggle_key = os.getenv('KAGGLE_KEY')
if kaggle_username and kaggle_key:
# Create .kaggle directory if it doesn't exist
kaggle_dir = os.path.expanduser('~/.kaggle')
os.makedirs(kaggle_dir, exist_ok=True)
# Create kaggle.json file
kaggle_json_path = os.path.join(kaggle_dir, 'kaggle.json')
kaggle_config = {
"username": kaggle_username,
"key": kaggle_key
}
with open(kaggle_json_path, 'w') as f:
json.dump(kaggle_config, f)
# Set proper permissions (read/write for owner only)
try:
os.chmod(kaggle_json_path, 0o600)
except Exception:
pass # Windows doesn't support chmod
# Now initialize Kaggle API
self.kaggle_api = KaggleApi()
self.kaggle_api.authenticate()
print("✅ Kaggle API initialized successfully")
else:
print("⚠️ Kaggle credentials not found in .env file. Add KAGGLE_USERNAME and KAGGLE_KEY to enable dataset verification.")
except Exception as e:
print(f"⚠️ Kaggle API initialization failed: {str(e)}")
print(" Dataset links will be AI-generated (may need manual verification)")
self.kaggle_api = None
def search_kaggle_datasets(self, query: str, max_results: int = 3) -> List[Dict]:
"""Search for real Kaggle datasets matching query"""
if not self.kaggle_api:
return []
try:
print(f"🔍 Searching Kaggle for: {query}")
# Fetch datasets using Kaggle API
datasets = list(self.kaggle_api.dataset_list(search=query))
results = []
for dataset in datasets[:max_results]: # Limit to top results
results.append({
"name": dataset.title,
"description": dataset.subtitle or "No description available",
"url": f"https://www.kaggle.com/datasets/{dataset.ref}",
"type": "CSV/Tabular",
"size": f"{dataset.totalBytes // (1024*1024)}MB" if dataset.totalBytes else "Unknown",
"downloads": dataset.downloadCount,
"usability": dataset.usabilityRating
})
print(f"✅ Found {len(results)} Kaggle datasets")
return results
except Exception as e:
print(f"❌ Kaggle search error: {str(e)}")
return []
def search_huggingface_datasets(self, query: str, max_results: int = 3) -> List[Dict]:
"""Search for real HuggingFace datasets matching query"""
try:
print(f"🔍 Searching HuggingFace for: {query}")
# Use HuggingFace API to search datasets
url = "https://huggingface.co/api/datasets"
params = {
"search": query,
"limit": max_results,
"sort": "downloads"
}
headers = {}
if self.hf_token:
headers["Authorization"] = f"Bearer {self.hf_token}"
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
datasets = response.json()
results = []
for dataset in datasets[:max_results]:
dataset_id = dataset.get('id', '')
results.append({
"name": dataset.get('id', 'Unknown'),
"description": dataset.get('description', 'No description available')[:200],
"url": f"https://huggingface.co/datasets/{dataset_id}",
"type": "Text/Images/Audio",
"size": dataset.get('size', 'Unknown'),
"downloads": dataset.get('downloads', 0),
"likes": dataset.get('likes', 0)
})
print(f"✅ Found {len(results)} HuggingFace datasets")
return results
else:
print(f"⚠️ HuggingFace API returned status {response.status_code}")
return []
except Exception as e:
print(f"❌ HuggingFace search error: {str(e)}")
return []
def get_datasets_for_topic(self, topic: str, hypothesis: str = "", dataset_keywords: List[str] = None) -> List[Dict]:
"""Get verified datasets from Kaggle (with AI fallback if none found)"""
# Use provided keywords or construct from topic
if dataset_keywords:
search_query = " ".join(dataset_keywords[:2])
else:
search_query = f"{topic} {hypothesis}"[:80]
# Search Kaggle (primary source)
kaggle_datasets = self.search_kaggle_datasets(search_query, max_results=2)
# If Kaggle found datasets, return them
if kaggle_datasets:
return kaggle_datasets
# Fallback: Generate AI-suggested datasets if Kaggle fails
print("⚠️ No datasets found from Kaggle API. Using AI-suggested datasets.")
ai_datasets = [
{
"name": f"Suggested: {topic.title()} Dataset",
"description": f"Recommended dataset for {topic}. Search on Kaggle or academic repositories.",
"url": f"https://www.kaggle.com/search?q={topic.replace(' ', '+')}",
"type": "Dataset",
"size": "Varies",
"downloads": "N/A",
"usability": None,
"source": "AI Suggested"
}
]
return ai_datasets
@dataclass
class SearchResult:
"""Data class to store search results"""
title: str
link: str = ""
snippet: str = ""
body: str = ""
authors: List[str] = None
published: str = ""
abstract: str = ""
pdf_url: str = ""
arxiv_url: str = ""
# Clinical trials specific fields
nct_id: str = ""
status: str = ""
study_type: str = ""
conditions: List[str] = None
interventions: List[str] = None
phase: str = ""
enrollment: int = 0
locations: List[str] = None
class SearchTool:
"""Tool for performing Google, arXiv, and ClinicalTrials.gov searches"""
def __init__(self):
self.api_key = os.getenv('GOOGLE_API_KEY')
self.search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID')
def google_search(self, query: str, num_results: int = 2, max_chars: int = 500) -> List[SearchResult]:
"""
Perform Google search and return enriched results
"""
if not self.api_key or not self.search_engine_id:
print("Missing API credentials")
return []
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
"key": self.api_key,
"cx": self.search_engine_id,
"q": query,
"num": num_results
}
response = requests.get(url, params=params)
if response.status_code != 200:
print(f"Google Search API error: {response.status_code}")
print(f"Response content: {response.text}")
return []
results = response.json().get("items", [])
enriched_results = []
for item in results:
body = self._get_page_content(item["link"], max_chars)
result = SearchResult(
title=item.get("title", "No title"), # Added .get() with default
link=item.get("link", ""),
snippet=item.get("snippet", ""), # FIX: Added .get() with default empty string
body=body
)
enriched_results.append(result)
time.sleep(1) # Rate limiting
return enriched_results
except Exception as e:
print(f"Error in Google search: {str(e)}")
return []
def arxiv_search(self, query: str, max_results: int = 2) -> List[SearchResult]:
"""
Perform arXiv search and return results
"""
try:
client = arxiv.Client()
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)
results = []
for paper in client.results(search):
paper_id = paper.entry_id.split('/')[-1]
arxiv_url = f"https://arxiv.org/abs/{paper_id}"
result = SearchResult(
title=paper.title,
authors=[author.name for author in paper.authors],
published=paper.published.strftime("%Y-%m-%d"),
abstract=paper.summary,
pdf_url=paper.pdf_url,
arxiv_url=arxiv_url
)
results.append(result)
time.sleep(0.5) # Gentle rate limiting
return results
except Exception as e:
print(f"Error in Arxiv search: {str(e)}")
return []
def clinicaltrials_search_beta_api(self, query: str, max_results: int = 2) -> List[SearchResult]:
"""
Perform ClinicalTrials.gov search using their current Beta API
https://clinicaltrials.gov/data-api/ui
"""
try:
# Base URL for the ClinicalTrials.gov Beta API
base_url = "https://clinicaltrials.gov/api/v2/studies"
# Parameters for the API request
params = {
"query.term": query,
"pageSize": max_results,
"format": "json"
}
# Make the API request
response = requests.get(base_url, params=params, timeout=15)
if response.status_code != 200:
print(f"ClinicalTrials.gov API error: {response.status_code}")
print(f"Response content: {response.text}")
return []
# Parse the JSON response
data = response.json()
studies = data.get("studies", [])
if not studies:
print(f"No clinical trial studies found for query: {query}")
return []
results = []
for study in studies:
# Extract study details
protocol_section = study.get("protocolSection", {})
identification = protocol_section.get("identificationModule", {})
status_module = protocol_section.get("statusModule", {})
design_module = protocol_section.get("designModule", {})
conditions_module = protocol_section.get("conditionsModule", {})
description_module = protocol_section.get("descriptionModule", {})
# Extract NCT ID
nct_id = identification.get("nctId", "")
if not nct_id:
continue
# Extract title
title = identification.get("briefTitle", "")
official_title = identification.get("officialTitle", "")
# Extract study details
study_type = design_module.get("studyType", "")
phase_list = design_module.get("phases", [])
phase = ", ".join(phase_list) if phase_list else "Not Specified"
# Extract status
status = status_module.get("overallStatus", "")
# Extract conditions
conditions = conditions_module.get("conditions", [])
# Extract description/summary
summary = description_module.get("briefSummary", "")
detailed_desc = description_module.get("detailedDescription", "")
# Create the study URL - using the stable URL format
# Changed from /study/ to /ct2/show/ which is more stable
study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}"
result = SearchResult(
title=title if title else official_title,
link=study_url,
snippet=summary[:200] + "..." if len(summary) > 200 else summary,
abstract=summary if summary else detailed_desc,
nct_id=nct_id,
status=status,
study_type=study_type,
phase=phase,
conditions=conditions
)
results.append(result)
# Get full study details if needed
if not summary and not detailed_desc:
detailed_study = self._get_clinical_trial_details_api(nct_id)
if detailed_study:
result.abstract = detailed_study.abstract
result.conditions = detailed_study.conditions
result.interventions = detailed_study.interventions
time.sleep(2) # Increased rate limiting for API requests
return results
except Exception as e:
print(f"Error in ClinicalTrials.gov Beta API search: {str(e)}")
return []
def _get_clinical_trial_details_api(self, nct_id: str) -> Optional[SearchResult]:
"""
Fetch detailed information for a specific study using the Beta API
"""
# Validate NCT ID format
if not nct_id or not nct_id.startswith("NCT"):
print(f"Invalid NCT ID format: {nct_id}")
return None
try:
# Base URL for the ClinicalTrials.gov Beta API - single study endpoint
base_url = f"https://clinicaltrials.gov/api/v2/studies/{nct_id}"
# Make the API request
response = requests.get(base_url, params={"format": "json"}, timeout=15)
if response.status_code != 200:
print(f"ClinicalTrials.gov API error for study {nct_id}: {response.status_code}")
return None
# Parse the JSON response
study = response.json()
# Extract study details
protocol_section = study.get("protocolSection", {})
identification = protocol_section.get("identificationModule", {})
description_module = protocol_section.get("descriptionModule", {})
conditions_module = protocol_section.get("conditionsModule", {})
intervention_module = protocol_section.get("armsInterventionsModule", {})
# Extract title
title = identification.get("briefTitle", "")
# Extract summary
summary = description_module.get("briefSummary", "")
detailed_desc = description_module.get("detailedDescription", "")
# Extract conditions
conditions = conditions_module.get("conditions", [])
# Extract interventions
interventions_list = []
for intervention in intervention_module.get("interventions", []):
intervention_name = intervention.get("name", "")
if intervention_name:
interventions_list.append(intervention_name)
# Create the study URL using stable format
study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}"
return SearchResult(
title=title,
link=study_url,
nct_id=nct_id,
abstract=summary if summary else detailed_desc,
conditions=conditions,
interventions=interventions_list
)
except Exception as e:
print(f"Error fetching details for study {nct_id}: {str(e)}")
return None
def clinicaltrials_search_scrape(self, query: str, max_results: int = 2) -> List[SearchResult]:
"""
Perform ClinicalTrials.gov search by scraping the website
Use this as a fallback if the API method doesn't work
"""
try:
# Construct the search URL - updated to use v2 search
base_url = "https://clinicaltrials.gov/search"
params = {
"term": query,
"draw": 1,
"rank": 1
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
}
# Make the initial request to get the search results page
response = requests.get(base_url, params=params, headers=headers, timeout=15)
if response.status_code != 200:
print(f"ClinicalTrials.gov search error: {response.status_code}")
return []
soup = BeautifulSoup(response.content, "html.parser")
study_links = soup.select(".ct-search-result a.ct-search-result__title-link")[:max_results]
if not study_links:
print("No study links found on the search results page")
return []
results = []
for link in study_links:
href = link.get('href', '')
if not href:
continue
# Extract NCT ID from href and validate
try:
# Different ways the NCT ID might appear in the URL
if '/study/' in href:
nct_id = href.split('/')[-1]
elif '/ct2/show/' in href:
nct_id = href.split('/')[-1]
else:
# Try to find NCT pattern (NCTXXXXXXXX)
import re
nct_match = re.search(r'(NCT\d{8})', href)
if nct_match:
nct_id = nct_match.group(1)
else:
print(f"Could not extract NCT ID from href: {href}")
continue
except Exception as e:
print(f"Error extracting NCT ID from {href}: {str(e)}")
continue
# Ensure proper URL format
study_url = f"https://clinicaltrials.gov/ct2/show/{nct_id}"
# Get detailed information from the study page
study_info = self._get_clinical_trial_details_scrape(study_url)
if study_info:
results.append(study_info)
time.sleep(2) # Increased rate limiting
return results
except Exception as e:
print(f"Error in ClinicalTrials.gov scrape search: {str(e)}")
return []
def _get_clinical_trial_details_scrape(self, url: str) -> Optional[SearchResult]:
"""
Extract detailed information from a ClinicalTrials.gov study page using web scraping
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5"
}
response = requests.get(url, headers=headers, timeout=15)
if response.status_code != 200:
print(f"Error accessing {url}: {response.status_code}")
return None
soup = BeautifulSoup(response.content, "html.parser")
# Extract NCT ID - handle different URL formats
try:
if '/ct2/show/' in url:
nct_id = url.split('/')[-1]
else:
# Try to find NCT pattern
import re
nct_match = re.search(r'(NCT\d{8})', url)
if nct_match:
nct_id = nct_match.group(1)
else:
# Last resort - check the page content for NCT ID
nct_elem = soup.find(string=re.compile(r'NCT\d{8}'))
if nct_elem:
nct_match = re.search(r'(NCT\d{8})', nct_elem)
nct_id = nct_match.group(1) if nct_match else "Unknown"
else:
nct_id = "Unknown"
except Exception:
nct_id = "Unknown"
# Extract title - handle different page structures
title = "Unknown Title"
title_selectors = ["h1.tr-h1", "h1.ct-title", ".headline-title"]
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
title = title_elem.get_text().strip()
break
# Extract status
status = "Unknown"
status_selectors = [".ct-recruitment-status div.ct-recruitment-status__label",
".statusLabel",
"p:contains('Recruitment Status:')"]
for selector in status_selectors:
status_elem = None
try:
if ':contains' in selector:
# Handle custom contains selector
text = selector.split(':contains(')[1].strip("')")
for p in soup.find_all('p'):
if text in p.get_text():
status_elem = p
break
else:
status_elem = soup.select_one(selector)
except Exception:
continue
if status_elem:
status_text = status_elem.get_text().strip()
if "Status:" in status_text:
status = status_text.split("Status:")[1].strip()
else:
status = status_text
break
# Extract summary - try different selectors
summary = ""
summary_selectors = ["#brief-summary div.tr-indent2",
".ct-body__section div.tr-indent1",
"section#brief-summary"]
for selector in summary_selectors:
summary_elem = soup.select_one(selector)
if summary_elem:
summary = summary_elem.get_text().strip()
break
# Extract study type
study_type = ""
study_type_selectors = [
lambda s: s.find(string="Study Type:"),
lambda s: s.find("th", string="Study Type")
]
for selector_func in study_type_selectors:
study_type_label = selector_func(soup)
if study_type_label:
if study_type_label.parent:
value_elem = None
if study_type_label.parent.name == "th":
# Handle table format
value_elem = study_type_label.parent.find_next("td")
else:
# Handle div format
value_elem = study_type_label.parent.find_next("div", class_="ct-data-elem__value")
if not value_elem:
value_elem = study_type_label.parent.find_next_sibling("div")
if value_elem:
study_type = value_elem.get_text().strip()
break
# Extract phase
phase = ""
phase_selectors = [
lambda s: s.find(string="Phase:"),
lambda s: s.find("th", string="Phase")
]
for selector_func in phase_selectors:
phase_label = selector_func(soup)
if phase_label:
if phase_label.parent:
value_elem = None
if phase_label.parent.name == "th":
# Handle table format
value_elem = phase_label.parent.find_next("td")
else:
# Handle div format
value_elem = phase_label.parent.find_next("div", class_="ct-data-elem__value")
if not value_elem:
value_elem = phase_label.parent.find_next_sibling("div")
if value_elem:
phase = value_elem.get_text().strip()
break
# Extract conditions
conditions = []
conditions_selectors = ["#conditions", "section#conditions", "section:contains('Condition')"]
for selector in conditions_selectors:
conditions_section = None
try:
if ':contains' in selector:
# Handle custom contains selector
text = selector.split(':contains(')[1].strip("')")
for section in soup.find_all('section'):
if text in section.get_text():
conditions_section = section
break
else:
conditions_section = soup.select_one(selector)
except Exception:
continue
if conditions_section:
# Try to find conditions in list items
condition_items = conditions_section.select("li")
if condition_items:
conditions = [item.get_text().strip() for item in condition_items]
else:
# If no list items, try to get text content
conditions_text = conditions_section.get_text().strip()
# Remove section title if present
if ":" in conditions_text:
conditions_text = conditions_text.split(":", 1)[1].strip()
conditions = [cond.strip() for cond in conditions_text.split(",")]
break
return SearchResult(
title=title,
link=url,
snippet=summary[:200] + "..." if summary and len(summary) > 200 else summary,
abstract=summary,
nct_id=nct_id,
status=status,
study_type=study_type,
phase=phase,
conditions=conditions
)
except Exception as e:
print(f"Error extracting details from {url}: {str(e)}")
return None
def _get_page_content(self, url: str, max_chars: int) -> str:
"""
Fetch and extract text content from a webpage
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5"
}
response = requests.get(url, headers=headers, timeout=15)
soup = BeautifulSoup(response.content, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text(separator=" ", strip=True)
return text[:max_chars]
except Exception as e:
print(f"Error fetching {url}: {str(e)}")
return ""
# class GapAnalysisEngine:
# """
# Research Gap Analysis Engine using Flan-T5-Large model
# Identifies knowledge, methodological, dataset, and temporal gaps in research papers
# """
# def __init__(self):
# """Initialize the Flan-T5 model and tokenizer"""
# print("Loading Flan-T5-Large model for gap analysis...")
# self.model_name = "google/flan-t5-large"
# try:
# # Load tokenizer and model from Hugging Face
# self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
# self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name)
# # Set device (GPU if available, else CPU)
# self.device = "cuda" if torch.cuda.is_available() else "cpu"
# self.model.to(self.device)
# print(f"Model loaded successfully on {self.device}")
# except Exception as e:
# print(f"Error loading model: {str(e)}")
# self.model = None
# self.tokenizer = None
# def _create_few_shot_prompt(self, papers: List[SearchResult], topic: str) -> str:
# """
# Create a few-shot prompt with examples to guide the model
# Args:
# papers: List of SearchResult objects containing paper information
# topic: The research topic being analyzed
# Returns:
# Formatted prompt string with few-shot examples
# """
# # Format the input papers with more detail
# papers_text = ""
# for i, paper in enumerate(papers[:12], 1): # Increased from 10 to 12 papers
# papers_text += f"\n--- Paper {i} ---\n"
# papers_text += f"Title: {paper.title}\n"
# # Add authors if available
# if paper.authors:
# papers_text += f"Authors: {', '.join(paper.authors[:3])}\n" # First 3 authors
# # Add publication date
# if paper.published:
# papers_text += f"Published: {paper.published}\n"
# # Add abstract/summary with more characters
# if paper.abstract:
# papers_text += f"Abstract: {paper.abstract[:500]}...\n" # Increased from 300
# elif paper.snippet:
# papers_text += f"Summary: {paper.snippet[:500]}...\n"
# # Add study type for clinical trials
# if paper.study_type:
# papers_text += f"Study Type: {paper.study_type}\n"
# if paper.phase:
# papers_text += f"Phase: {paper.phase}\n"
# # Enhanced few-shot prompt with detailed, insightful examples
# prompt = f"""You are a PhD-level research analyst with expertise in identifying critical research gaps. Analyze the papers deeply and provide SPECIFIC, ACTIONABLE gaps that researchers can build upon.
# EXAMPLE 1:
# Topic: Federated Learning Privacy
# Papers:
# Paper 1: Differential Privacy in Federated Learning (2022)
# Abstract: We apply differential privacy mechanisms to federated learning, achieving epsilon=0.5 privacy guarantees with 3% accuracy loss on MNIST...
# Paper 2: Secure Aggregation Protocols (2021)
# Abstract: Novel cryptographic protocols for secure parameter aggregation in federated settings, tested on 100 devices...
# Paper 3: Privacy Budget Allocation (2020)
# Abstract: Adaptive privacy budget allocation across federated learning rounds, optimizing utility-privacy tradeoff...
# DEEP ANALYSIS:
# {{
# "knowledge_gaps": [
# "CRITICAL: No empirical studies on privacy leakage through gradient updates in heterogeneous device environments (IoT + mobile + edge)",
# "UNEXPLORED: Privacy guarantees degrade with model size - no theoretical bounds for models >1B parameters in federated settings",
# "MISSING: Real-world privacy attack success rates beyond academic datasets - what about medical records, financial data?",
# "UNKNOWN: Privacy-utility tradeoffs in non-IID data distributions with extreme class imbalance (e.g., rare disease detection)"
# ],
# "methodological_gaps": [
# "NO FRAMEWORK: Combining differential privacy + secure aggregation + homomorphic encryption in single unified system - existing work addresses only 1-2",
# "MISSING EVALUATION: Longitudinal privacy analysis across 100+ training rounds - current studies stop at 10-20 rounds",
# "ABSENT: Privacy-preserving techniques for vertical federated learning (different features per client) - all work focuses on horizontal FL",
# "LACKING: Adaptive privacy mechanisms that adjust epsilon based on attack risk in real-time during training"
# ],
# "dataset_gaps": [
# "CRITICAL MISSING: Standardized privacy attack benchmark suite with diverse attack vectors (membership inference, model inversion, gradient leakage)",
# "NO BENCHMARK: Real federated datasets with known privacy violations - current benchmarks use synthetic privacy labels",
# "LACKING: Heterogeneous device capability datasets showing computation/communication/privacy tradeoffs across 1000+ devices",
# "ABSENT: Privacy audit trails from production federated learning deployments"
# ],
# "temporal_gaps": [
# "OUTDATED (pre-2023): Privacy analysis doesn't account for LLM-scale models (GPT-4, Llama-2) in federated settings",
# "OBSOLETE HARDWARE: Studies assume 2019-era mobile devices - modern edge TPUs and neural engines change privacy-performance dynamics",
# "MISSING 2024 CONTEXT: New privacy regulations (EU AI Act, US state laws) not reflected in federated learning design"
# ]
# }}
# EXAMPLE 2:
# Topic: Explainable AI in Medical Diagnosis
# Papers:
# Paper 1: SHAP for Medical Imaging (2021)
# Abstract: Applying SHAP values to explain CNN predictions in chest X-ray diagnosis, achieving correlation with radiologist attention...
# Paper 2: LIME in Clinical Decision Support (2020)
# Abstract: Local interpretable model-agnostic explanations for electronic health record-based predictions...
# Paper 3: Attention Visualization in Diagnosis (2022)
# Abstract: Visualizing transformer attention patterns for disease classification from medical images...
# DEEP ANALYSIS:
# {{
# "knowledge_gaps": [
# "CRITICAL: Zero studies on whether physicians ACTUALLY change treatment decisions based on AI explanations - all measure 'trust' not clinical outcomes",
# "UNEXPLORED: Explanation quality for rare diseases (<1% prevalence) where models have insufficient training data - current work focuses on common conditions",
# "MISSING: Conflicting explanation scenarios - what happens when SHAP suggests feature X but physician believes Y? No resolution frameworks exist",
# "UNKNOWN: Cognitive load of processing AI explanations during emergency medicine - explanations may slow critical decisions"
# ],
# "methodological_gaps": [
# "NO GOLD STANDARD: Evaluating explanation correctness requires ground truth (which features TRULY matter) - current methods use proxy metrics like 'plausibility'",
# "MISSING FRAMEWORK: Comparative explanation methods - should we use SHAP vs LIME vs attention? No decision tree exists for medical contexts",
# "ABSENT: Real-time explanation generation for time-critical diagnoses - current methods take 30+ seconds, unacceptable in ER settings",
# "LACKING: Multi-modal explanations combining imaging + EHR + genomics - existing work explains single modality only"
# ],
# "dataset_gaps": [
# "CRITICAL MISSING: Datasets with physician-annotated 'ground truth' explanations for 1000+ diagnoses - current datasets lack expert labels",
# "NO BENCHMARK: Longitudinal patient data showing how explanations affected treatment outcomes over months/years",
# "LACKING: Adversarial explanation datasets - cases where explanations are intentionally misleading yet appear valid",
# "ABSENT: Cross-hospital explanation generalization datasets - do explanations transfer between institutions?"
# ],
# "temporal_gaps": [
# "OUTDATED (pre-2023): Explanation methods designed for CNNs, not foundation models (MedPaLM, GPT-4 for medicine) with emergent reasoning",
# "OBSOLETE REGULATORY: FDA guidance on AI explainability from 2019 - new 2024 requirements for transparency not addressed",
# "MISSING CURRENT CONTEXT: Post-COVID telehealth adoption means remote explanation delivery - no studies on explaining AI over video consultations"
# ]
# }}
# NOW ANALYZE THIS RESEARCH AREA WITH THE SAME DEPTH AND SPECIFICITY:
# Topic: {topic}
# Papers:{papers_text}
# REQUIREMENTS FOR YOUR ANALYSIS:
# 1. Each gap must be SPECIFIC with technical details, numbers, or clear scenarios - NOT generic statements
# 2. Identify CRITICAL gaps that block real-world deployment or advancement
# 3. Point out CONTRADICTIONS or CONFLICTS in existing research approaches
# 4. Consider PRACTICAL implications (cost, time, feasibility, scalability)
# 5. Include WHY each gap matters - what problem does it cause? What opportunities does it create?
# 6. Mention specific technologies, datasets, methods, or evaluation metrics that are missing
# 7. Be ACTIONABLE - researchers should know exactly what to investigate next
# 8. FIND AT LEAST 2-3 GAPS PER CATEGORY - there are ALWAYS gaps in research!
# IMPORTANT: Even well-researched areas have gaps! Look for:
# - Unstudied combinations of techniques
# - Missing benchmarks or standardized evaluations
# - Lack of real-world deployment studies
# - Outdated assumptions from older papers
# - Unexplored edge cases or failure modes
# - Missing cross-domain applications
# You MUST respond with ONLY valid JSON in this exact format (no extra text):
# {{
# "knowledge_gaps": ["gap1", "gap2", "gap3"],
# "methodological_gaps": ["gap1", "gap2", "gap3"],
# "dataset_gaps": ["gap1", "gap2", "gap3"],
# "temporal_gaps": ["gap1", "gap2"]
# }}
# Generate the JSON now:"""
# return prompt
# def analyze_gaps(self, papers: List[SearchResult], topic: str) -> Dict:
# """
# Analyze research papers to identify gaps
# Args:
# papers: List of SearchResult objects from searches
# topic: The research topic being analyzed
# Returns:
# Dictionary containing identified gaps in four categories
# """
# if not self.model or not self.tokenizer:
# return {
# "error": "Model not loaded",
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# }
# print(f"Analyzing {len(papers)} papers for research gaps...")
# # Create the few-shot prompt
# prompt = self._create_few_shot_prompt(papers, topic)
# try:
# # Tokenize the input
# inputs = self.tokenizer(
# prompt,
# return_tensors="pt",
# max_length=1024,
# truncation=True
# ).to(self.device)
# # Generate the analysis with better parameters for detailed output
# outputs = self.model.generate(
# **inputs,
# max_length=1024, # Increased from 512 for more detailed gaps
# min_length=200, # Ensure substantial output
# num_beams=5, # Increased from 4 for better quality
# temperature=0.8, # Slightly higher for more creative insights
# do_sample=True,
# top_p=0.92, # Slightly higher for more diversity
# repetition_penalty=1.2, # Avoid repetitive gaps
# length_penalty=1.0
# )
# # Decode the output
# analysis_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# print("Raw model output:", analysis_text[:200])
# # Try to parse as JSON
# try:
# gaps = json.loads(analysis_text)
# except json.JSONDecodeError:
# # If not valid JSON, try to extract structured information
# gaps = self._parse_unstructured_output(analysis_text)
# # Ensure all required fields exist
# gaps.setdefault("knowledge_gaps", [])
# gaps.setdefault("methodological_gaps", [])
# gaps.setdefault("dataset_gaps", [])
# gaps.setdefault("temporal_gaps", [])
# return gaps
# except Exception as e:
# print(f"Error during gap analysis: {str(e)}")
# return {
# "error": str(e),
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# }
# def _parse_unstructured_output(self, text: str) -> Dict:
# """
# Parse unstructured model output into structured format
# Args:
# text: Raw text output from the model
# Returns:
# Dictionary with gap categories
# """
# gaps = {
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# }
# # Simple parsing logic - extract bullet points or numbered lists
# lines = text.split('\n')
# current_category = None
# for line in lines:
# line = line.strip()
# if 'knowledge' in line.lower():
# current_category = 'knowledge_gaps'
# elif 'methodological' in line.lower() or 'method' in line.lower():
# current_category = 'methodological_gaps'
# elif 'dataset' in line.lower() or 'data' in line.lower():
# current_category = 'dataset_gaps'
# elif 'temporal' in line.lower() or 'time' in line.lower():
# current_category = 'temporal_gaps'
# elif current_category and (line.startswith('-') or line.startswith('•') or any(line.startswith(f"{i}.") for i in range(10))):
# # Clean the line
# cleaned = line.lstrip('-•0123456789. ').strip()
# if cleaned and len(cleaned) > 10:
# gaps[current_category].append(cleaned)
# return gaps
# def format_gaps_for_display(self, gaps: Dict) -> str:
# """
# Format the gaps analysis for display in Streamlit
# Args:
# gaps: Dictionary of identified gaps
# Returns:
# Formatted markdown string
# """
# output = "## 🔍 Research Gap Analysis\n\n"
# if gaps.get("error"):
# output += f"⚠️ Error: {gaps['error']}\n\n"
# # Knowledge Gaps
# output += "### 📚 Knowledge Gaps\n"
# output += "*Questions or areas that haven't been studied yet*\n\n"
# if gaps.get("knowledge_gaps"):
# for i, gap in enumerate(gaps["knowledge_gaps"], 1):
# output += f"{i}. {gap}\n"
# else:
# output += "*No significant knowledge gaps identified*\n"
# output += "\n"
# # Methodological Gaps
# output += "### 🔬 Methodological Gaps\n"
# output += "*Research approaches or methods not tried*\n\n"
# if gaps.get("methodological_gaps"):
# for i, gap in enumerate(gaps["methodological_gaps"], 1):
# output += f"{i}. {gap}\n"
# else:
# output += "*No significant methodological gaps identified*\n"
# output += "\n"
# # Dataset Gaps
# output += "### 💾 Dataset Gaps\n"
# output += "*Missing or under-explored datasets*\n\n"
# if gaps.get("dataset_gaps"):
# for i, gap in enumerate(gaps["dataset_gaps"], 1):
# output += f"{i}. {gap}\n"
# else:
# output += "*No significant dataset gaps identified*\n"
# output += "\n"
# # Temporal Gaps
# output += "### ⏰ Temporal Gaps\n"
# output += "*Areas that are outdated or need updating*\n\n"
# if gaps.get("temporal_gaps"):
# for i, gap in enumerate(gaps["temporal_gaps"], 1):
# output += f"{i}. {gap}\n"
# else:
# output += "*No significant temporal gaps identified*\n"
# output += "\n"
# return output
class GapAnalysisEngine:
"""
Research Gap Analysis Engine using OpenAI GPT-4
Identifies knowledge, methodological, dataset, and temporal gaps in research papers
"""
def __init__(self, progress_callback=None):
"""Initialize the Gap Analysis Engine with OpenAI only
Args:
progress_callback: Optional callback function(message, progress) to report loading progress
"""
print("Initializing Gap Analysis Engine (OpenAI-powered)...")
self.progress_callback = progress_callback
# OpenAI client for gap analysis
self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
# Comment out Flan-T5 model loading
# self.model_name = "google/flan-t5-large"
# self.tokenizer = None
# self.model = None
# self.device = "cuda" if torch.cuda.is_available() else "cpu"
# self._models_loaded = False
print("✅ Gap Analysis Engine ready (OpenAI GPT-4 mode)")
# Comment out the model loading method since we're not using Flan-T5
# def _ensure_models_loaded(self):
# """Load models only when first needed"""
# pass
def analyze_gaps(self, papers: List[SearchResult], topic: str) -> Dict:
"""
Analyze research papers to identify gaps using OpenAI GPT-4 only
Args:
papers: List of research papers to analyze
topic: Research topic
Returns:
Dictionary containing identified gaps by category
"""
try:
print(f" → Analyzing {len(papers)} papers for research gaps...")
# Skip Flan-T5 analysis entirely
# print(" → Running Flan-T5-Large analysis...")
# flan_gaps = self._analyze_with_flan_t5(papers, topic)
# Use only OpenAI analysis
print(" → Running GPT-4 gap analysis...")
gpt_gaps = self._analyze_with_openai(papers, topic)
print(f" ✓ GPT-4 found {len(gpt_gaps.get('knowledge_gaps', []))} knowledge gaps")
print(f" ✓ Total unique gaps identified")
return gpt_gaps
except Exception as e:
print(f" ❌ Error in gap analysis: {str(e)}")
return {
"knowledge_gaps": [],
"methodological_gaps": [],
"dataset_gaps": [],
"temporal_gaps": []
}
# Comment out the Flan-T5 analysis method
# def _analyze_with_flan_t5(self, papers: List[SearchResult], topic: str) -> Dict:
# """Analyze gaps using Flan-T5 model"""
# # ... commented out ...
# pass
def _analyze_with_openai(self, papers: List[SearchResult], topic: str) -> Dict:
"""
Analyze gaps using OpenAI GPT-4
Args:
papers: List of research papers
topic: Research topic
Returns:
Dictionary with categorized gaps
"""
try:
# Prepare paper summaries
papers_text = self._prepare_papers_context(papers, max_papers=12)
# Create comprehensive prompt
prompt = f"""You are an expert research analyst. Analyze the following research papers on "{topic}" and identify research gaps.
PAPERS TO ANALYZE:
{papers_text}
Identify gaps in these categories:
1. **Knowledge Gaps**: Unexplored research questions or phenomena
2. **Methodological Gaps**: Missing or inadequate research approaches
3. **Dataset Gaps**: Lack of suitable datasets or data limitations
4. **Temporal Gaps**: Outdated assumptions or need for updated research
For each gap, provide:
- Clear description
- Why it's important
- Potential impact if addressed
Return your analysis in this JSON format:
{{
"knowledge_gaps": [
{{"description": "...", "importance": "...", "impact": "..."}}
],
"methodological_gaps": [
{{"description": "...", "importance": "...", "impact": "..."}}
],
"dataset_gaps": [
{{"description": "...", "importance": "...", "impact": "..."}}
],
"temporal_gaps": [
{{"description": "...", "importance": "...", "impact": "..."}}
]
}}
Provide 2-4 gaps per category. Be specific and actionable."""
# Call OpenAI API
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are an expert research analyst specializing in identifying research gaps. Return only valid JSON."
},
{
"role": "user",
"content": prompt
}
],
temperature=0.7,
max_tokens=2000
)
# Parse response
response_text = response.choices[0].message.content.strip()
# Clean markdown if present
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
gaps = json.loads(response_text)
return gaps
except json.JSONDecodeError as e:
print(f" ⚠️ JSON parsing error: {str(e)}")
return self._fallback_gap_structure()
except Exception as e:
print(f" ⚠️ OpenAI API error: {str(e)}")
return self._fallback_gap_structure()
def _prepare_papers_context(self, papers: List[SearchResult], max_papers: int = 12) -> str:
"""Prepare papers context for the prompt"""
papers_text = ""
for i, paper in enumerate(papers[:max_papers], 1):
papers_text += f"\n{'='*60}\n"
papers_text += f"PAPER {i}:\n"
papers_text += f"Title: {paper.title}\n"
if paper.authors:
papers_text += f"Authors: {', '.join(paper.authors[:3])}\n"
if paper.published:
papers_text += f"Published: {paper.published}\n"
# Use abstract or snippet
content = paper.abstract if paper.abstract else paper.snippet
if content:
papers_text += f"Abstract: {content[:500]}...\n"
return papers_text
def _fallback_gap_structure(self) -> Dict:
"""Return fallback gap structure when analysis fails"""
return {
"knowledge_gaps": [
{
"description": "Limited understanding of underlying mechanisms",
"importance": "Critical for theoretical advancement",
"impact": "Could lead to breakthrough insights"
}
],
"methodological_gaps": [
{
"description": "Need for novel experimental approaches",
"importance": "Current methods have limitations",
"impact": "Enable more robust research"
}
],
"dataset_gaps": [
{
"description": "Insufficient diverse datasets",
"importance": "Limits generalizability",
"impact": "Improve model performance"
}
],
"temporal_gaps": [
{
"description": "Research needs updating with recent developments",
"importance": "Field has evolved significantly",
"impact": "More relevant findings"
}
]
}
# Keep the formatting method
def _format_gaps_output(self, gaps: Dict, papers_analyzed: int) -> str:
"""Format gaps into readable markdown output"""
output = f"## 🔍 Research Gap Analysis\n\n"
output += f"**Papers Analyzed:** {papers_analyzed}\n\n"
if gaps.get('knowledge_gaps'):
output += "### 📚 Knowledge Gaps\n"
for i, gap in enumerate(gaps['knowledge_gaps'], 1):
output += f"{i}. **{gap.get('description', 'N/A')}**\n"
output += f" - *Importance:* {gap.get('importance', 'N/A')}\n"
output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n"
if gaps.get('methodological_gaps'):
output += "### 🔬 Methodological Gaps\n"
for i, gap in enumerate(gaps['methodological_gaps'], 1):
output += f"{i}. **{gap.get('description', 'N/A')}**\n"
output += f" - *Importance:* {gap.get('importance', 'N/A')}\n"
output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n"
if gaps.get('dataset_gaps'):
output += "### 💾 Dataset Gaps\n"
for i, gap in enumerate(gaps['dataset_gaps'], 1):
output += f"{i}. **{gap.get('description', 'N/A')}**\n"
output += f" - *Importance:* {gap.get('importance', 'N/A')}\n"
output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n"
if gaps.get('temporal_gaps'):
output += "### ⏰ Temporal Gaps\n"
for i, gap in enumerate(gaps['temporal_gaps'], 1):
output += f"{i}. **{gap.get('description', 'N/A')}**\n"
output += f" - *Importance:* {gap.get('importance', 'N/A')}\n"
output += f" - *Potential Impact:* {gap.get('impact', 'N/A')}\n\n"
return output
class HypothesisGenerator:
"""
Hypothesis Generation and Scoring Engine
Converts research gaps into actionable hypotheses with novelty, impact, and feasibility scores
"""
def __init__(self):
"""Initialize OpenAI client and embedding model"""
print("Initializing Hypothesis Generator...")
# OpenAI for hypothesis generation
self.api_key = os.getenv('OPENAI_API_KEY')
if not self.api_key:
print("Warning: OPENAI_API_KEY not found. Hypothesis generation will be limited.")
self.client = None
else:
self.client = OpenAI(api_key=self.api_key)
# Load embedding model for novelty scoring
try:
print("Loading embedding model for novelty scoring...")
self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print("Embedding model loaded successfully")
except Exception as e:
print(f"Warning: Could not load embedding model: {str(e)}")
self.embedding_model = None
def _create_hypothesis_prompt(self, gaps: Dict, topic: str) -> str:
"""
Create few-shot prompt for hypothesis generation
Args:
gaps: Dictionary of identified research gaps
topic: Research topic
Returns:
Formatted prompt for GPT-4
"""
# Format gaps for the prompt
gaps_text = "IDENTIFIED RESEARCH GAPS:\n\n"
if gaps.get("knowledge_gaps"):
gaps_text += "🔬 Knowledge Gaps:\n"
for i, gap in enumerate(gaps["knowledge_gaps"][:5], 1):
gaps_text += f"{i}. {gap}\n"
gaps_text += "\n"
if gaps.get("methodological_gaps"):
gaps_text += "🛠️ Methodological Gaps:\n"
for i, gap in enumerate(gaps["methodological_gaps"][:5], 1):
gaps_text += f"{i}. {gap}\n"
gaps_text += "\n"
if gaps.get("dataset_gaps"):
gaps_text += "💾 Dataset Gaps:\n"
for i, gap in enumerate(gaps["dataset_gaps"][:5], 1):
gaps_text += f"{i}. {gap}\n"
gaps_text += "\n"
if gaps.get("temporal_gaps"):
gaps_text += "⏰ Temporal Gaps:\n"
for i, gap in enumerate(gaps["temporal_gaps"][:5], 1):
gaps_text += f"{i}. {gap}\n"
gaps_text += "\n"
# Few-shot prompt with detailed examples
prompt = f"""You are a world-class research strategist. Convert research gaps into actionable, fundable research hypotheses.
EXAMPLE INPUT:
Gap: "CRITICAL: No empirical studies on privacy leakage through gradient updates in heterogeneous device environments (IoT + mobile + edge)"
EXAMPLE OUTPUT:
{{
"hypothesis_id": 1,
"title": "Privacy-Preserving Gradient Sharing in Heterogeneous Federated Learning",
"gap_addressed": "No empirical studies on privacy leakage in heterogeneous federated environments",
"problem_statement": "Current federated learning privacy studies assume homogeneous devices (all mobile or all edge), but real deployments mix IoT sensors, mobile phones, and edge servers. Privacy guarantees derived for homogeneous settings may not hold when devices have vastly different computational capabilities, leading to asymmetric privacy leakage where weaker devices reveal more information through gradient updates.",
"proposed_solution": "Design and implement a heterogeneous federated learning testbed with 1000+ devices (IoT sensors, smartphones, edge servers) to empirically measure privacy leakage across device types. Develop adaptive differential privacy mechanisms that adjust noise levels based on device computational capacity. Create theoretical framework proving privacy bounds for heterogeneous settings.",
"expected_impact": "First empirical evidence of device-specific privacy risks in real federated deployments. Will enable $50M+ industry deployments of federated learning in healthcare and smart cities by providing provable privacy guarantees for mixed-device environments. Expected to reduce privacy leakage by 40-60% compared to one-size-fits-all approaches.",
"methodology": "1) Deploy federated learning across 1000 heterogeneous devices, 2) Conduct membership inference and gradient inversion attacks tailored to each device type, 3) Measure information leakage using mutual information and attack success rates, 4) Develop and evaluate adaptive DP mechanisms, 5) Prove theoretical privacy bounds",
"required_resources": {{
"compute": "Access to 1000 heterogeneous devices (or simulation), 10 GPUs for attack models",
"data": "Federated datasets (FEMNIST, Shakespeare, medical imaging if available)",
"tools": "PyTorch, PySyft for federated learning, privacy attack libraries",
"team": "2-3 researchers (1 privacy expert, 1 systems engineer, 1 ML researcher)",
"timeline": "12-18 months",
"estimated_cost": "$80K-120K (device access, compute, personnel)"
}},
"novelty_score": 9,
"impact_score": 9,
"feasibility_score": 7,
"priority": "HIGH",
"target_venue": "USENIX Security, IEEE S&P, NeurIPS (privacy track)",
"potential_collaborators": "Federated learning research groups at Google, Meta, academic labs working on privacy-preserving ML"
}}
EXAMPLE INPUT 2:
Gap: "MISSING FRAMEWORK: Real-time carbon footprint tracking during training with <5% overhead"
EXAMPLE OUTPUT 2:
{{
"hypothesis_id": 2,
"title": "ZeroCarbon: Real-Time Energy and Carbon Tracking for ML Training",
"gap_addressed": "No real-time carbon tracking framework with <5% overhead during model training",
"problem_statement": "Existing carbon tracking tools (CodeCarbon, experiment-impact-tracker) add 20-30% training overhead, making them impractical for production use. Researchers cannot monitor carbon emissions in real-time during training, leading to wasteful hyperparameter searches and missed optimization opportunities. Need lightweight system that tracks energy/carbon with <5% overhead while providing actionable insights.",
"proposed_solution": "Develop ZeroCarbon: a GPU kernel-level energy monitoring system that hooks into CUDA/PyTorch at minimal overhead. Use hardware performance counters instead of external power meters. Implement predictive modeling to estimate full training carbon cost after first 100 iterations. Provide real-time dashboard showing carbon/hour and projected total emissions with recommendations to stop wasteful runs.",
"expected_impact": "Enable carbon-aware ML training at scale. If adopted by 10% of ML researchers, could reduce global ML training carbon by 5-8% (estimated 50K-100K tons CO2/year). Will become standard tool in MLOps pipelines at major tech companies. Expected to identify and stop 30-40% of wasteful training runs early.",
"methodology": "1) Develop CUDA kernel hooks for energy monitoring, 2) Benchmark overhead across 20+ model architectures (ResNet, BERT, GPT), 3) Build predictive models for carbon estimation, 4) Create real-time dashboard with FastAPI backend, 5) Conduct user studies with 50+ ML practitioners, 6) Integrate with popular frameworks (PyTorch Lightning, HuggingFace Trainer)",
"required_resources": {{
"compute": "4-8 GPUs (A100/H100) for benchmarking, cloud credits for testing",
"data": "Training logs from diverse ML workloads",
"tools": "CUDA, PyTorch, FastAPI, React for dashboard",
"team": "2-3 developers (1 systems engineer, 1 ML engineer, 1 full-stack dev)",
"timeline": "6-9 months",
"estimated_cost": "$40K-60K (compute, personnel)"
}},
"novelty_score": 8,
"impact_score": 9,
"feasibility_score": 8,
"priority": "HIGH",
"target_venue": "MLSys, SysML, NeurIPS (datasets/benchmarks track)",
"potential_collaborators": "Green AI labs, cloud providers (AWS, Google Cloud), MLOps startups"
}}
NOW GENERATE HYPOTHESES FOR THIS RESEARCH AREA:
Topic: {topic}
{gaps_text}
Generate EXACTLY 5 HIGH-QUALITY, ACTIONABLE research hypotheses that:
1. Address the most CRITICAL gaps identified above
2. Are SPECIFIC with technical details, timelines, and resource estimates
3. Include realistic impact projections (numbers, percentages, cost savings)
4. Provide clear methodology and required resources
5. Can realistically be executed by a research team or PhD student
6. Have clear success metrics and target publication venues
7. Cover DIVERSE approaches - don't repeat similar ideas
CRITICAL: Generate EXACTLY 5 hypotheses, not 3-4. Each should be unique and address different gaps.
IMPORTANT: Your response must be ONLY valid JSON in this exact format:
{{
"hypotheses": [
{{
"hypothesis_id": 1,
"title": "...",
"gap_addressed": "...",
"problem_statement": "...",
"proposed_solution": "...",
"expected_impact": "...",
"methodology": "...",
"required_resources": {{...}},
"novelty_score": 8,
"impact_score": 9,
"feasibility_score": 7,
"priority": "HIGH",
"target_venue": "...",
"potential_collaborators": "..."
}},
{{
"hypothesis_id": 2,
...
}},
{{
"hypothesis_id": 3,
...
}},
{{
"hypothesis_id": 4,
...
}},
{{
"hypothesis_id": 5,
...
}}
]
}}
Respond with ONLY the JSON containing EXACTLY 5 hypotheses, no additional text."""
return prompt
def _calculate_novelty_score(self, hypothesis_text: str, papers: List[SearchResult]) -> float:
"""
Calculate novelty score by comparing hypothesis to existing papers using embeddings
Args:
hypothesis_text: The hypothesis text to evaluate
papers: List of existing papers
Returns:
Novelty score from 1-10 (10 = highly novel)
"""
if not self.embedding_model or not papers:
return 7.0 # Default moderate novelty if can't calculate
try:
# Combine title and proposed solution for embedding
hypothesis_embedding = self.embedding_model.encode([hypothesis_text])
# Get embeddings for existing paper abstracts
paper_texts = []
for paper in papers[:15]: # Limit to first 15 papers
if paper.abstract:
paper_texts.append(paper.abstract[:500])
elif paper.snippet:
paper_texts.append(paper.snippet[:500])
if not paper_texts:
return 7.0
paper_embeddings = self.embedding_model.encode(paper_texts)
# Calculate cosine similarities
similarities = cosine_similarity(hypothesis_embedding, paper_embeddings)[0]
# Novelty score: inverse of max similarity, scaled to 1-10
# High similarity = low novelty, low similarity = high novelty
max_similarity = np.max(similarities)
novelty_score = (1 - max_similarity) * 10
# Ensure score is between 1 and 10
novelty_score = max(1.0, min(10.0, novelty_score))
return round(novelty_score, 1)
except Exception as e:
print(f"Error calculating novelty score: {str(e)}")
return 7.0
async def generate_hypotheses(
self,
gaps: Dict,
topic: str,
papers: List[SearchResult] = None
) -> Dict:
"""
Generate research hypotheses from identified gaps
Args:
gaps: Dictionary of research gaps from gap analysis
topic: Research topic
papers: Optional list of papers for novelty scoring
Returns:
Dictionary containing generated hypotheses with scores
"""
if not self.client:
return {
"error": "OpenAI API not configured",
"hypotheses": []
}
print(f"\n💡 Generating research hypotheses for: {topic}")
# Create the prompt
prompt = self._create_hypothesis_prompt(gaps, topic)
try:
# Generate hypotheses using GPT-4
print("Calling GPT-4 to generate hypotheses...")
response = self.client.chat.completions.create(
model="gpt-4", # Use GPT-4 for highest quality and better scoring
messages=[
{"role": "system", "content": "You are a world-class research strategist and grant proposal expert. Generate detailed, actionable research hypotheses with HIGH scores (7-10 range). Always respond with valid JSON only."},
{"role": "user", "content": prompt}
],
temperature=0.8, # Higher for creativity
max_tokens=4000 # Enough for 5 detailed hypotheses
)
# Parse the response
hypotheses_json = json.loads(response.choices[0].message.content)
# Handle both single hypothesis and array formats
if "hypotheses" in hypotheses_json:
hypotheses_list = hypotheses_json["hypotheses"]
elif isinstance(hypotheses_json, list):
hypotheses_list = hypotheses_json
else:
# Wrap single hypothesis in array
hypotheses_list = [hypotheses_json]
# Calculate novelty scores using embeddings and overall scores
if papers and self.embedding_model:
print("Calculating novelty scores using embeddings...")
for hyp in hypotheses_list:
# Create text for novelty comparison
hyp_text = f"{hyp.get('title', '')} {hyp.get('proposed_solution', '')}"
novelty = self._calculate_novelty_score(hyp_text, papers)
hyp['calculated_novelty_score'] = novelty
# Use embedding-based novelty as primary score (more objective)
hyp['novelty_score'] = novelty
# Ensure all scores exist and are in valid range
if 'impact_score' not in hyp or hyp['impact_score'] == 0:
hyp['impact_score'] = 8 # Default high impact
if 'feasibility_score' not in hyp or hyp['feasibility_score'] == 0:
hyp['feasibility_score'] = 7 # Default good feasibility
# Calculate overall score as weighted average
# Novelty: 40%, Impact: 40%, Feasibility: 20%
hyp['overall_score'] = round(
(hyp['novelty_score'] * 0.4 +
hyp['impact_score'] * 0.4 +
hyp['feasibility_score'] * 0.2), 1
)
else:
# No papers for novelty calculation, use GPT-4's scores and calculate overall
for hyp in hypotheses_list:
if 'impact_score' not in hyp or hyp['impact_score'] == 0:
hyp['impact_score'] = 8
if 'feasibility_score' not in hyp or hyp['feasibility_score'] == 0:
hyp['feasibility_score'] = 7
if 'novelty_score' not in hyp or hyp['novelty_score'] == 0:
hyp['novelty_score'] = 7
hyp['overall_score'] = round(
(hyp['novelty_score'] * 0.4 +
hyp['impact_score'] * 0.4 +
hyp['feasibility_score'] * 0.2), 1
)
# Sort hypotheses by overall score (highest first)
hypotheses_list.sort(key=lambda h: h.get('overall_score', 0), reverse=True)
print(f"✓ Generated {len(hypotheses_list)} hypotheses (sorted by overall score)")
print(f" Top hypothesis overall score: {hypotheses_list[0].get('overall_score', 0)}/10")
return {
"hypotheses": hypotheses_list,
"count": len(hypotheses_list),
"topic": topic
}
except Exception as e:
print(f"Error generating hypotheses: {str(e)}")
return {
"error": str(e),
"hypotheses": []
}
def format_hypotheses_for_display(self, hypotheses_data: Dict) -> str:
"""
Format hypotheses for display in Streamlit
Args:
hypotheses_data: Dictionary containing hypotheses
Returns:
Formatted markdown string
"""
if hypotheses_data.get("error"):
return f"⚠️ Error: {hypotheses_data['error']}"
hypotheses = hypotheses_data.get("hypotheses", [])
if not hypotheses:
return "No hypotheses generated"
output = "## 💡 Generated Research Hypotheses\n\n"
output += f"*Generated {len(hypotheses)} actionable research proposals*\n\n"
output += "---\n\n"
for i, hyp in enumerate(hypotheses, 1):
# Header with title and priority
priority = hyp.get('priority', 'MEDIUM')
priority_emoji = "🔴" if priority == "HIGH" else "🟡" if priority == "MEDIUM" else "🟢"
output += f"### {priority_emoji} Hypothesis {i}: {hyp.get('title', 'Untitled')}\n\n"
# Scores display
novelty = hyp.get('novelty_score', 'N/A')
impact = hyp.get('impact_score', 'N/A')
feasibility = hyp.get('feasibility_score', 'N/A')
output += f"**📊 Scores:** Novelty: {novelty}/10 | Impact: {impact}/10 | Feasibility: {feasibility}/10\n\n"
# Gap addressed
if hyp.get('gap_addressed'):
output += f"**🎯 Gap Addressed:** {hyp['gap_addressed']}\n\n"
# Problem statement
if hyp.get('problem_statement'):
output += f"**❓ Problem Statement:**\n{hyp['problem_statement']}\n\n"
# Proposed solution
if hyp.get('proposed_solution'):
output += f"**💡 Proposed Solution:**\n{hyp['proposed_solution']}\n\n"
# Expected impact
if hyp.get('expected_impact'):
output += f"**🚀 Expected Impact:**\n{hyp['expected_impact']}\n\n"
# Methodology
if hyp.get('methodology'):
output += f"**🔬 Methodology:**\n{hyp['methodology']}\n\n"
# Required resources
if hyp.get('required_resources'):
output += f"**📋 Required Resources:**\n"
resources = hyp['required_resources']
if isinstance(resources, dict):
for key, value in resources.items():
output += f"- **{key.title()}:** {value}\n"
else:
output += f"{resources}\n"
output += "\n"
# Target venue and collaborators
if hyp.get('target_venue'):
output += f"**📰 Target Venues:** {hyp['target_venue']}\n\n"
if hyp.get('potential_collaborators'):
output += f"**🤝 Potential Collaborators:** {hyp['potential_collaborators']}\n\n"
output += "---\n\n"
return output
class ExperimentGenerator:
"""Generate experiments, datasets, and metrics for research hypotheses"""
def __init__(self):
"""Initialize the experiment generator"""
self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
self.dataset_fetcher = DatasetFetcher()
print("✅ Experiment Generator initialized")
def generate_experiments_for_hypothesis(self, hypothesis: dict, topic: str = "") -> dict:
"""
Generate structured experiment plan for a given hypothesis
Args:
hypothesis (dict): Selected hypothesis object
topic (str): Original research topic for context
Returns:
dict: Structured experiment plan with datasets, metrics, etc.
"""
try:
print(f"🧪 Generating experiments for hypothesis: {hypothesis.get('title', 'Unknown')}")
# Extract hypothesis text
hypothesis_text = self._extract_hypothesis_text(hypothesis)
# Create prompt
prompt = self._create_experiment_prompt(hypothesis_text, topic)
# Call OpenAI API with higher token limit for detailed experiments
response = self.client.chat.completions.create(
model="gpt-3.5-turbo-16k", # Use 16k model for longer responses
messages=[
{
"role": "system",
"content": "You are an expert AI research experiment designer. Generate COMPLETE, PUBLICATION-READY experiments with ALL required details. Return ONLY valid JSON without any markdown formatting or code blocks."
},
{
"role": "user",
"content": prompt
}
],
temperature=0.7,
max_tokens=4000 # Max for gpt-3.5-turbo-16k output
)
# Parse response
response_text = response.choices[0].message.content.strip()
print(f"📝 Raw API response length: {len(response_text)} characters")
# Clean response (remove markdown if present)
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
elif response_text.startswith("```"):
response_text = response_text.replace("```", "").strip()
# Try to parse JSON
try:
experiments_data = json.loads(response_text)
print(f"✅ Successfully parsed JSON with {len(experiments_data.get('experiments', []))} experiments")
except json.JSONDecodeError as json_err:
print(f"❌ JSON parsing failed: {str(json_err)}")
print(f"📄 First 500 chars of response: {response_text[:500]}")
print(f"📄 Last 200 chars of response: {response_text[-200:]}")
raise # Re-raise to trigger fallback
# Fetch REAL datasets from Kaggle API (with AI fallback)
print("🔍 Fetching datasets from Kaggle...")
try:
# Use dataset keywords if provided, otherwise use topic
dataset_keywords = experiments_data.get('dataset_keywords', [])
print(f"🔎 Keywords: {dataset_keywords}")
real_datasets = self.dataset_fetcher.get_datasets_for_topic(
topic=topic,
hypothesis=hypothesis.get('title', ''),
dataset_keywords=dataset_keywords
)
print(f"📊 Found {len(real_datasets)} datasets")
# Store datasets as simple list
experiments_data['datasets'] = real_datasets
print(f"✅ Added {len(real_datasets)} dataset(s)")
except Exception as dataset_err:
print(f"⚠️ Dataset fetching failed: {str(dataset_err)}")
experiments_data['datasets'] = []
# Add metadata
experiments_data["metadata"] = {
"hypothesis_id": hypothesis.get("hypothesis_id", "unknown"),
"hypothesis_title": hypothesis.get("title", "Unknown Hypothesis"),
"generated_at": datetime.now().isoformat(),
"model_used": "gpt-3.5-turbo-16k",
"datasets_verified": bool(real_datasets and len(real_datasets) > 0) if 'real_datasets' in locals() else False
}
print(f"✅ Generated {len(experiments_data.get('experiments', []))} experiments")
return experiments_data
except json.JSONDecodeError as e:
print(f"❌ JSON parsing error: {str(e)}")
print(" Returning fallback experiment structure")
return self._get_fallback_experiment_structure()
except Exception as e:
print(f"❌ Error generating experiments: {str(e)}")
import traceback
traceback.print_exc()
print(" Returning fallback experiment structure")
return self._get_fallback_experiment_structure()
def _extract_hypothesis_text(self, hypothesis: dict) -> str:
"""Extract ALL relevant details from hypothesis object for complete context"""
parts = []
# Core hypothesis information
if hypothesis.get("title"):
parts.append(f"Hypothesis Title: {hypothesis['title']}")
if hypothesis.get("problem_statement"):
parts.append(f"Problem Statement: {hypothesis['problem_statement']}")
if hypothesis.get("proposed_solution"):
parts.append(f"Proposed Solution: {hypothesis['proposed_solution']}")
if hypothesis.get("gap_addressed"):
parts.append(f"Research Gap Being Addressed: {hypothesis['gap_addressed']}")
if hypothesis.get("expected_impact"):
parts.append(f"Expected Impact: {hypothesis['expected_impact']}")
if hypothesis.get("methodology"):
parts.append(f"Suggested Methodology: {hypothesis['methodology']}")
# Scores for context on feasibility
scores = []
if hypothesis.get("novelty_score"):
scores.append(f"Novelty: {hypothesis['novelty_score']}/10")
if hypothesis.get("impact_score"):
scores.append(f"Impact: {hypothesis['impact_score']}/10")
if hypothesis.get("feasibility_score"):
scores.append(f"Feasibility: {hypothesis['feasibility_score']}/10")
if scores:
parts.append(f"Scores: {', '.join(scores)}")
return "\n\n".join(parts)
def _create_experiment_prompt(self, hypothesis_text: str, topic: str) -> str:
"""Create the experiment generation prompt"""
return f"""
Generate 2-3 COMPLETE experiments to test this hypothesis. Make them SPECIFIC to the hypothesis, not generic.
TOPIC: {topic}
HYPOTHESIS:
{hypothesis_text}
For each experiment, provide:
1. **id**: Number (1, 2, 3)
2. **title**: Specific title reflecting the hypothesis
3. **introduction**: 3-4 sentences explaining how this tests the hypothesis
4. **description**: 4-5 sentences detailing the approach
5. **steps**: Array of 8-12 detailed methodology steps specific to this hypothesis
6. **difficulty**: "Easy", "Medium", or "Hard"
7. **estimated_time**: Realistic timeline (e.g., "2-3 months")
8. **required_resources**: Array of specific computing/storage/tool requirements
Also provide SHARED resources:
- **dataset_keywords**: Array of 3-5 keywords for finding relevant datasets
- **metrics**: Array of 4-5 evaluation metrics with name, description, range
- **architectures**: Array of 3-4 model architectures with name, description, parameters
- **baselines**: Array of 3 baseline models with name, expected_performance, description
- **challenges**: Array of 4-5 specific challenges with mitigation strategies
- **expected_outcomes**: Object with accuracy_range, training_time, model_size, inference_speed
IMPORTANT:
- Design experiments SPECIFICALLY for THIS hypothesis (not generic ML experiments)
- Steps must be detailed and hypothesis-specific
- Use domain-appropriate datasets, metrics, and models that match the hypothesis topic
- Tailor everything (data sources, evaluation methods, architectures) to the specific research domain
Return ONLY valid JSON (no markdown, no code blocks).
JSON format:
{{
"experiments": [
{{
"id": 1,
"title": "Specific experiment title",
"introduction": "How this tests the hypothesis...",
"description": "Detailed approach...",
"steps": ["Step 1: ...", "Step 2: ...", ... 8-12 steps],
"difficulty": "Medium",
"estimated_time": "2-3 months",
"required_resources": ["GPU requirements", "Storage", "Software"]
}}
],
"dataset_keywords": ["keyword1", "keyword2", "keyword3"],
"metrics": [{{"name": "Accuracy", "description": "...", "range": "0-100%"}}],
"architectures": [{{"name": "Model Name", "description": "...", "parameters": "100M"}}],
"baselines": [{{"name": "Baseline", "expected_performance": "70%", "description": "..."}}],
"challenges": ["Challenge 1 with mitigation", "Challenge 2..."],
"expected_outcomes": {{"accuracy_range": "85-92%", "training_time": "6-8h", "model_size": "150MB", "inference_speed": "20ms"}}
}}
"""
def _get_fallback_experiment_structure(self) -> dict:
"""Return fallback structure when API fails"""
return {
"experiments": [
{
"id": 1,
"title": "Basic Implementation",
"description": "Implement a basic version of the proposed solution",
"steps": [
"Collect and preprocess data",
"Implement baseline model",
"Train and validate",
"Analyze results"
],
"difficulty": "intermediate",
"estimated_time": "2-4 weeks",
"required_resources": ["Computing resources", "Dataset access"]
}
],
"datasets": {
"kaggle": [],
"huggingface": []
},
"metrics": [
{"name": "Accuracy", "description": "Classification accuracy", "range": "0-1"}
],
"architectures": [],
"baselines": [],
"challenges": ["Limited information available"],
"expected_outcomes": {
"accuracy_range": "To be determined",
"training_time": "Variable",
"model_size": "Unknown",
"inference_speed": "Unknown"
},
"alternatives": [],
"metadata": {
"generated_at": datetime.now().isoformat(),
"model_used": "fallback",
"status": "limited_data"
}
}
class Agent:
"""AI agent wrapper for OpenAI ChatGPT model"""
def __init__(self, name: str, model: str = "gpt-3.5-turbo"):
self.name = name
self.api_key = os.getenv('OPENAI_API_KEY')
if not self.api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
try:
self.client = OpenAI(api_key=self.api_key)
self.model = model
self.chat_history = []
except Exception as e:
error_msg = f"Error initializing {name}: {str(e)}"
print(error_msg)
raise ValueError(error_msg)
async def process(self, message: str) -> str:
"""
Process a message using OpenAI ChatGPT model
"""
if not self.client:
return "Error: Agent not properly initialized"
try:
# Add user message to history
self.chat_history.append({
"role": "user",
"content": message
})
# Call OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=self.chat_history,
temperature=0.7,
max_tokens=2000
)
# Extract response text
assistant_message = response.choices[0].message.content
# Add assistant response to history
self.chat_history.append({
"role": "assistant",
"content": assistant_message
})
return assistant_message
except Exception as e:
error_msg = f"OpenAI API Error: {str(e)}"
print(error_msg)
return error_msg
class MultiAgentSystem:
"""
System coordinating multiple agents for literature review
"""
def __init__(self):
# Initialize OpenAI client for GPT-4 gap analysis fallback
self.api_key = os.getenv('OPENAI_API_KEY')
if self.api_key:
self.client = OpenAI(api_key=self.api_key)
else:
print("Warning: OPENAI_API_KEY not found. GPT-4 fallback unavailable.")
self.client = None
try:
self.google_agent = Agent("Google_Search_Agent")
except Exception as e:
print(f"Failed to initialize Google agent: {str(e)}")
self.google_agent = None
try:
self.arxiv_agent = Agent("Arxiv_Search_Agent")
except Exception as e:
print(f"Failed to initialize Arxiv agent: {str(e)}")
self.arxiv_agent = None
try:
self.clinical_trials_agent = Agent("ClinicalTrials_Search_Agent")
except Exception as e:
print(f"Failed to initialize ClinicalTrials agent: {str(e)}")
self.clinical_trials_agent = None
try:
self.report_agent = Agent("Report_Agent")
except Exception as e:
print(f"Failed to initialize Report agent: {str(e)}")
self.report_agent = None
self.search_tool = SearchTool()
# ============================================================================
# MODIFIED: Don't initialize heavy models here - use lazy loading instead
# ============================================================================
self._gap_analyzer = None
self._hypothesis_generator = None
self._experiment_generator = None
print("✅ MultiAgentSystem initialized (models will load on demand)")
# ============================================================================
# LAZY LOADING PROPERTIES - ADD THESE NEW METHODS
# ============================================================================
@property
def gap_analyzer(self):
"""Lazy load gap analyzer only when needed"""
if self._gap_analyzer is None:
self._gap_analyzer = load_gap_analyzer()
return self._gap_analyzer
@property
def hypothesis_generator(self):
"""Lazy load hypothesis generator only when needed"""
if self._hypothesis_generator is None:
self._hypothesis_generator = load_hypothesis_generator()
return self._hypothesis_generator
@property
def experiment_generator(self):
"""Lazy load experiment generator only when needed"""
if self._experiment_generator is None:
self._experiment_generator = load_experiment_generator()
return self._experiment_generator
async def run_gap_analysis(self, topic: str) -> Dict:
"""
Run gap analysis on research papers for a given topic
Collects papers and analyzes them for research gaps
Args:
topic: The research topic to analyze
Returns:
Dictionary containing gap analysis results and formatted output
"""
print(f"\n🔍 Starting Gap Analysis for: {topic}")
# Collect papers
all_papers = await self._collect_papers(topic)
print(f"Collected {len(all_papers)} papers total for analysis")
# Analyze gaps using the collected papers
return await self._run_gap_analysis_with_papers(all_papers, topic)
async def _openai_gap_analysis_fallback(self, papers: List[SearchResult], topic: str) -> Dict:
"""
Use OpenAI GPT-4 for comprehensive gap analysis
Args:
papers: List of research papers
topic: Research topic
Returns:
Dictionary of research gaps
"""
if not self.client:
print("OpenAI client not available")
return {
"knowledge_gaps": ["OpenAI API key not configured"],
"methodological_gaps": [],
"dataset_gaps": [],
"temporal_gaps": []
}
try:
# Create detailed papers summary with MORE context
papers_text = ""
for i, paper in enumerate(papers[:15], 1): # Increased from 10 to 15
papers_text += f"\n{'='*60}\n"
papers_text += f"Paper {i}: {paper.title}\n"
if paper.authors:
papers_text += f"Authors: {', '.join(paper.authors[:5])}\n"
if paper.published:
papers_text += f"Published: {paper.published}\n"
# Use FULL abstract, not truncated
if paper.abstract:
papers_text += f"Abstract: {paper.abstract}\n" # Full abstract, no truncation
elif paper.snippet:
papers_text += f"Summary: {paper.snippet}\n"
if paper.study_type:
papers_text += f"Study Type: {paper.study_type}\n"
if paper.phase:
papers_text += f"Phase: {paper.phase}\n"
prompt = f"""You are a PhD-level research analyst with 20+ years experience identifying critical research gaps. Your job is to DEEPLY analyze these papers and find SPECIFIC, ACTIONABLE gaps that researchers can build upon.
RESEARCH TOPIC: {topic}
PAPERS TO ANALYZE:{papers_text}
{'='*80}
CRITICAL INSTRUCTIONS:
1. READ EVERY PAPER CAREFULLY - Don't just skim
2. FIND AT LEAST 3-5 GAPS PER CATEGORY - Research ALWAYS has gaps!
3. Be SPECIFIC with technical details, numbers, methods, datasets
4. Each gap should be ACTIONABLE - what exactly is missing?
5. Look for:
- Unstudied combinations of techniques
- Missing benchmarks or evaluation standards
- Lack of real-world deployment/application studies
- Outdated assumptions from older papers
- Unexplored edge cases or failure modes
- Missing cross-domain applications
- Contradictions between papers
- Limited dataset sizes or diversity
- Methods that haven't been tried together
- Temporal aspects (what's changed since publication?)
EXAMPLE OF GOOD GAPS (BE THIS SPECIFIC):
❌ BAD: "More research needed on privacy"
✅ GOOD: "CRITICAL: No empirical studies measuring privacy leakage in federated learning with >1000 heterogeneous devices (IoT + mobile + edge servers) - all existing work uses <100 homogeneous nodes"
❌ BAD: "Missing datasets"
✅ GOOD: "MISSING DATASET: No standardized benchmark for privacy attacks in federated learning with ground truth labels - researchers cannot compare attack success rates across studies"
NOW ANALYZE AND FIND GAPS IN THESE 4 CATEGORIES:
1. KNOWLEDGE GAPS - What questions are NOT answered by these papers?
- What aspects of {topic} have NOT been studied?
- What combinations haven't been explored?
- What assumptions are untested?
- What edge cases are ignored?
2. METHODOLOGICAL GAPS - What approaches are MISSING?
- What research methods haven't been tried?
- What evaluation metrics are absent?
- What experimental designs are lacking?
- What theoretical frameworks don't exist?
3. DATASET GAPS - What data is MISSING or INSUFFICIENT?
- What datasets don't exist but should?
- What's the coverage (size, diversity, quality)?
- What domains are underrepresented?
- What benchmarks are needed?
4. TEMPORAL GAPS - What's OUTDATED?
- What has changed since these papers were published?
- What new technologies/methods aren't considered?
- What recent events/regulations aren't reflected?
YOU MUST FIND AT LEAST 2-3 SPECIFIC GAPS PER CATEGORY!
Respond with ONLY valid JSON (no markdown, no extra text):
{{
"knowledge_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details", "SPECIFIC gap 3 with details"],
"methodological_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details", "SPECIFIC gap 3 with details"],
"dataset_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details"],
"temporal_gaps": ["SPECIFIC gap 1 with details", "SPECIFIC gap 2 with details"]
}}"""
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a world-class research analyst. You ALWAYS find specific, actionable research gaps. You respond with ONLY valid JSON, no markdown formatting, no extra text."
},
{"role": "user", "content": prompt}
],
temperature=0.8, # Increased from 0.7 for more creativity
max_tokens=2500 # Increased from 1500 for more detailed gaps
)
# Extract JSON from response (handle markdown code blocks if present)
response_text = response.choices[0].message.content.strip()
# Remove markdown code blocks if present
if response_text.startswith("```"):
# Find the JSON content between ``` markers
lines = response_text.split('\n')
json_lines = []
in_code_block = False
for line in lines:
if line.strip().startswith("```"):
in_code_block = not in_code_block
continue
if in_code_block or (not line.strip().startswith("```")):
json_lines.append(line)
response_text = '\n'.join(json_lines)
gaps_json = json.loads(response_text)
# Validate that we got gaps
total_gaps = sum(len(gaps_json.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
print(f" ✓ GPT-4 extracted {total_gaps} total gaps")
return gaps_json
except json.JSONDecodeError as e:
print(f"JSON parsing error: {str(e)}")
print(f"Response was: {response_text[:500]}...")
return {
"knowledge_gaps": ["JSON parsing failed - please try again"],
"methodological_gaps": [],
"dataset_gaps": [],
"temporal_gaps": []
}
except Exception as e:
print(f"OpenAI fallback error: {str(e)}")
return {
"knowledge_gaps": [f"Error: {str(e)}"],
"methodological_gaps": [],
"dataset_gaps": [],
"temporal_gaps": []
}
async def run_hypothesis_generation(
self,
gaps: Dict,
topic: str,
papers: List[SearchResult] = None
) -> Dict:
"""
Generate research hypotheses from identified gaps
Args:
gaps: Dictionary of research gaps
topic: Research topic
papers: Optional list of papers for novelty scoring
Returns:
Dictionary containing generated hypotheses
"""
print(f"\n💡 Starting Hypothesis Generation for: {topic}")
if not self.hypothesis_generator:
return {
"error": "Hypothesis generator not available",
"hypotheses": []
}
# Generate hypotheses
result = await self.hypothesis_generator.generate_hypotheses(gaps, topic, papers)
# Format for display
if result.get("hypotheses"):
formatted_output = self.hypothesis_generator.format_hypotheses_for_display(result)
result["formatted_output"] = formatted_output
return result
async def run_complete_analysis(self, topic: str) -> Dict:
"""
Run complete research pipeline: Paper Collection → Literature Review → Gap Analysis → Hypothesis Generation
All phases use the SAME papers for consistency
Args:
topic: Research topic
Returns:
Dictionary containing all results (review, gaps, hypotheses)
"""
print(f"\n🚀 Starting COMPLETE ANALYSIS for: {topic}")
print("=" * 80)
results = {
"topic": topic,
"literature_review": None,
"gap_analysis": None,
"hypotheses": None,
"papers": []
}
try:
# Step 1: Collect papers ONCE for all phases
print("\n📚 PHASE 1/4: Paper Collection")
all_papers = await self._collect_papers(topic)
results["papers"] = all_papers
print(f"✓ Collected {len(all_papers)} papers")
# Step 2: Generate Literature Review using collected papers
print("\n📖 PHASE 2/4: Literature Review")
literature_review = await self.run_literature_review(topic, papers=all_papers)
results["literature_review"] = literature_review
# Step 3: Run Gap Analysis using same papers
print("\n🔍 PHASE 3/4: Gap Analysis")
gap_results = await self._run_gap_analysis_with_papers(all_papers, topic)
results["gap_analysis"] = gap_results
# Step 4: Generate Hypotheses from gaps using same papers
print("\n💡 PHASE 4/4: Hypothesis Generation")
if gap_results.get("gaps"):
hypothesis_results = await self.run_hypothesis_generation(
gaps=gap_results["gaps"],
topic=topic,
papers=all_papers
)
results["hypotheses"] = hypothesis_results
else:
print("⚠️ No gaps found, skipping hypothesis generation")
results["hypotheses"] = {
"error": "No gaps identified for hypothesis generation",
"hypotheses": []
}
print("\n✅ COMPLETE ANALYSIS FINISHED")
print("=" * 80)
return results
except Exception as e:
print(f"❌ Error in complete analysis: {str(e)}")
results["error"] = str(e)
return results
async def _collect_papers(self, topic: str) -> List[SearchResult]:
"""
Collect papers from all sources (internal method for reuse)
Args:
topic: Research topic
Returns:
List of SearchResult objects
"""
all_papers = []
# ArXiv Search
print(f" → Collecting from arXiv...")
arxiv_results = self.search_tool.arxiv_search(topic, max_results=7)
all_papers.extend(arxiv_results)
# Google Search
print(f" → Collecting from Google...")
google_results = self.search_tool.google_search(topic, num_results=5)
all_papers.extend(google_results)
# Clinical Trials
print(f" → Collecting from ClinicalTrials.gov...")
clinical_results = self.search_tool.clinicaltrials_search_beta_api(topic, max_results=3)
if not clinical_results:
clinical_results = self.search_tool.clinicaltrials_search_scrape(topic, max_results=3)
all_papers.extend(clinical_results)
return all_papers
async def generate_experiments_for_hypothesis(self, hypothesis: dict, topic: str = "") -> dict:
"""
Generate experiments for a selected hypothesis
Args:
hypothesis (dict): Selected hypothesis
topic (str): Original research topic
Returns:
dict: Experiment plan with datasets, metrics, etc.
"""
print(f"\n🧪 GENERATING EXPERIMENTS FOR HYPOTHESIS")
print("=" * 50)
try:
experiments = self.experiment_generator.generate_experiments_for_hypothesis(
hypothesis, topic
)
print(f"✅ Experiment generation completed successfully")
return experiments
except Exception as e:
print(f"❌ Error in experiment generation: {str(e)}")
return self.experiment_generator._get_fallback_experiment_structure()
# async def _run_gap_analysis_with_papers(self, papers: List[SearchResult], topic: str) -> Dict:
# """
# Run gap analysis with pre-collected papers (internal method)
# Uses BOTH Flan-T5 and GPT-4, then combines unique gaps
# Args:
# papers: Pre-collected papers
# topic: Research topic
# Returns:
# Gap analysis results
# """
# print(f" → Analyzing {len(papers)} papers for research gaps...")
# if not papers or len(papers) == 0:
# return {
# "gaps": {
# "error": "No papers found",
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# },
# "formatted_output": "⚠️ No papers to analyze",
# "papers_analyzed": 0,
# "papers": []
# }
# # Run BOTH models in parallel for redundancy
# flan_t5_gaps = {}
# gpt4_gaps = {}
# # Method 1: Flan-T5-Large (local, free)
# if self.gap_analyzer:
# print(" → Running Flan-T5-Large analysis...")
# try:
# flan_t5_gaps = self.gap_analyzer.analyze_gaps(papers, topic)
# total_flan = sum(len(flan_t5_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
# print(f" ✓ Flan-T5 found {total_flan} gaps")
# except Exception as e:
# print(f" ⚠️ Flan-T5 error: {str(e)}")
# flan_t5_gaps = {
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# }
# # Method 2: GPT-4 (always run for reliability)
# print(" → Running GPT-4 analysis...")
# try:
# gpt4_gaps = await self._openai_gap_analysis_fallback(papers, topic)
# total_gpt = sum(len(gpt4_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
# print(f" ✓ GPT-4 found {total_gpt} gaps")
# except Exception as e:
# print(f" ⚠️ GPT-4 error: {str(e)}")
# gpt4_gaps = {
# "knowledge_gaps": [],
# "methodological_gaps": [],
# "dataset_gaps": [],
# "temporal_gaps": []
# }
# # Combine and deduplicate gaps from both models
# combined_gaps = self._combine_and_deduplicate_gaps(flan_t5_gaps, gpt4_gaps)
# # Determine which model(s) contributed
# total_flan = sum(len(flan_t5_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
# total_gpt = sum(len(gpt4_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
# if total_flan > 0 and total_gpt > 0:
# combined_gaps["source"] = "Flan-T5-Large + GPT-4 (Combined)"
# elif total_gpt > 0:
# combined_gaps["source"] = "GPT-4"
# elif total_flan > 0:
# combined_gaps["source"] = "Flan-T5-Large"
# else:
# combined_gaps["source"] = "No gaps found"
# total_combined = sum(len(combined_gaps.get(k, [])) for k in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"])
# print(f" ✓ Total unique gaps after combining: {total_combined}")
# formatted_output = self.gap_analyzer.format_gaps_for_display(combined_gaps) if self.gap_analyzer else "No formatter available"
# return {
# "gaps": combined_gaps,
# "formatted_output": formatted_output,
# "papers_analyzed": len(papers),
# "papers": papers
# }
async def _run_gap_analysis_with_papers(self, papers: List[SearchResult], topic: str) -> Dict:
"""
Run gap analysis using OpenAI GPT-4 only (Flan-T5 disabled for speed)
Args:
papers: List of research papers
topic: Research topic
Returns:
Dictionary containing gap analysis results
"""
print(f"\n🔍 PHASE 3/4: Gap Analysis")
try:
# Use only OpenAI gap analysis (faster, no model loading)
gaps = self.gap_analyzer.analyze_gaps(papers, topic)
# Format output
formatted_output = self.gap_analyzer._format_gaps_output(gaps, len(papers))
return {
"gaps": gaps,
"formatted_output": formatted_output,
"papers_analyzed": len(papers),
"method": "OpenAI GPT-4"
}
except Exception as e:
print(f"❌ Gap analysis error: {str(e)}")
return {
"gaps": self.gap_analyzer._fallback_gap_structure(),
"formatted_output": "Gap analysis encountered an error.",
"papers_analyzed": len(papers),
"method": "fallback"
}
def _combine_and_deduplicate_gaps(self, gaps1: Dict, gaps2: Dict) -> Dict:
"""
Combine gaps from two sources and remove duplicates
Args:
gaps1: First set of gaps (e.g., from Flan-T5)
gaps2: Second set of gaps (e.g., from GPT-4)
Returns:
Combined and deduplicated gaps
"""
combined = {
"knowledge_gaps": [],
"methodological_gaps": [],
"dataset_gaps": [],
"temporal_gaps": []
}
for category in ["knowledge_gaps", "methodological_gaps", "dataset_gaps", "temporal_gaps"]:
# Get gaps from both sources
gaps_set1 = gaps1.get(category, [])
gaps_set2 = gaps2.get(category, [])
# Combine them
all_gaps = gaps_set1 + gaps_set2
# Deduplicate using similarity (case-insensitive, normalized)
unique_gaps = []
seen_normalized = set()
for gap in all_gaps:
# Normalize: lowercase, remove extra spaces, remove punctuation
normalized = gap.lower().strip()
normalized = ' '.join(normalized.split()) # Remove extra whitespace
# Check if we've seen something very similar
is_duplicate = False
for seen in seen_normalized:
# If 80% of words overlap, consider it duplicate
gap_words = set(normalized.split())
seen_words = set(seen.split())
if gap_words and seen_words:
overlap = len(gap_words & seen_words) / max(len(gap_words), len(seen_words))
if overlap > 0.8: # 80% similarity threshold
is_duplicate = True
break
if not is_duplicate:
unique_gaps.append(gap)
seen_normalized.add(normalized)
combined[category] = unique_gaps
return combined
async def run_complete_analysis(self, topic: str) -> Dict:
"""
Run complete end-to-end analysis: Paper Collection → Literature Review → Gap Analysis → Hypothesis Generation
All phases use the same set of papers for consistency
Args:
topic: Research topic
Returns:
Dict containing all analysis results
"""
results = {
"topic": topic,
"papers": [],
"literature_review": "",
"gap_analysis": {},
"hypotheses": {}
}
try:
print("=" * 80)
print(f"🚀 STARTING COMPLETE ANALYSIS FOR: {topic}")
print("=" * 80)
# PHASE 1: Collect papers once for all subsequent phases
print("\n📄 PHASE 1/4: Collecting Papers")
papers = await self._collect_papers(topic)
results["papers"] = papers
print(f" ✓ Collected {len(papers)} papers total")
# PHASE 2: Literature Review using collected papers
print("\n📚 PHASE 2/4: Literature Review")
literature_review = await self.run_literature_review(topic, papers=papers)
results["literature_review"] = literature_review
# PHASE 3: Gap Analysis using same papers
print("\n🔍 PHASE 3/4: Gap Analysis")
gap_results = await self._run_gap_analysis_with_papers(papers, topic)
results["gap_analysis"] = gap_results
# PHASE 4: Hypothesis Generation from gaps
print("\n💡 PHASE 4/4: Hypothesis Generation")
if gap_results.get("gaps"):
hypothesis_results = await self.run_hypothesis_generation(
gaps=gap_results["gaps"],
topic=topic,
papers=papers
)
results["hypotheses"] = hypothesis_results
else:
print("⚠️ No gaps found, skipping hypothesis generation")
results["hypotheses"] = {
"error": "No gaps identified for hypothesis generation",
"hypotheses": []
}
print("\n✅ COMPLETE ANALYSIS FINISHED")
print("=" * 80)
return results
except Exception as e:
print(f"❌ Error in complete analysis: {str(e)}")
results["error"] = str(e)
return results
async def run_literature_review(self, topic: str, papers: List[SearchResult] = None) -> str:
"""
Run a comprehensive literature review on a given topic
Args:
topic: Research topic
papers: Optional pre-collected papers. If None, will collect new papers
Returns:
Markdown-formatted literature review
"""
# If no papers provided, collect them
if papers is None:
print(f"Collecting papers for literature review on '{topic}'...")
papers = await self._collect_papers(topic)
print(f"Generating comprehensive literature review from {len(papers)} papers...")
# Build context from papers
context = f"Research Topic: {topic}\n\n"
context += "=" * 80 + "\n"
context += "PAPERS FOR ANALYSIS:\n"
context += "=" * 80 + "\n\n"
for i, paper in enumerate(papers, 1):
context += f"Paper {i}: {paper.title}\n"
if paper.authors:
context += f"Authors: {', '.join(paper.authors)}\n"
if paper.published:
context += f"Published: {paper.published}\n"
if paper.abstract:
context += f"Abstract: {paper.abstract[:500]}...\n"
elif paper.snippet:
context += f"Content: {paper.snippet}\n"
if paper.arxiv_url:
context += f"URL: {paper.arxiv_url}\n"
elif hasattr(paper, 'link') and paper.link:
context += f"URL: {paper.link}\n"
context += "\n"
# Generate literature review using GPT
print("Generating comprehensive literature review report...")
if not self.report_agent:
return "Error: Report agent not initialized. Please check your OpenAI API key."
report_prompt = f"""
Generate a comprehensive literature review on {topic} based on {len(papers)} research papers provided below.
{context}
Please provide a well-structured literature review that:
1. Synthesizes the main findings and themes across ALL papers
2. Discusses key research directions and trends
3. Includes proper citations with author names and years
4. Provides clickable links to the papers when available
5. Concludes with future research directions based on the analysis
6. Write in academic style but keep it accessible
7. Be comprehensive - discuss ALL major papers, not just 2-3
Format the review with clear sections using markdown headers (##).
"""
report = await self.report_agent.process(report_prompt)
return report
# Example usage
if __name__ == "__main__":
import asyncio
async def run_literature_review():
system = MultiAgentSystem()
topic = input("Enter a medical or scientific topic for literature review: ")
print(f"\nRunning comprehensive literature review on: {topic}")
print("This may take a few minutes...\n")
report = await system.run_literature_review(topic)
print("\n" + "="*80)
print("LITERATURE REVIEW REPORT")
print("="*80 + "\n")
print(report)
# Save report to file
filename = f"literature_review_{topic.replace(' ', '_')}.md"
with open(filename, "w", encoding="utf-8") as f:
f.write(report)
print(f"\nReport saved to {filename}")
async def test_search():
search_tool = SearchTool()
query = input("Enter a medical topic to test search: ")
print("\nTesting ClinicalTrials.gov Beta API search...")
api_results = search_tool.clinicaltrials_search_beta_api(query, max_results=2)
for result in api_results:
print(f"Title: {result.title}")
print(f"NCT ID: {result.nct_id}")
print(f"URL: {result.link}")
print(f"Status: {result.status}")
print(f"Phase: {result.phase}")
print(f"Conditions: {result.conditions}")
print("-" * 50)
print("\nTesting ClinicalTrials.gov scrape search (fallback method)...")
scrape_results = search_tool.clinicaltrials_search_scrape(query, max_results=2)
for result in scrape_results:
print(f"Title: {result.title}")
print(f"NCT ID: {result.nct_id}")
print(f"URL: {result.link}")
print(f"Status: {result.status}")
print("-" * 50)
# Choose which function to run
choice = input("Choose an option:\n1. Run full literature review\n2. Test search only\nYour choice (1/2): ")
if choice == "1":
asyncio.run(run_literature_review())
else:
asyncio.run(test_search())