Saleh
Revert Gradio 6.x config: move theme/css back to Blocks for 5.x compat
a92da2f
"""
AI Research Paper Analyst β€” Main Application
CrewAI Hierarchical Pipeline + Gradio UI
Reference: system_design.md β€” Complete System Architecture
Reference: engineering_guardrails.md β€” Β§3 Error Handling, Β§5 Observability
Pipeline Flow:
Gate 1: Safety Guardian β†’ blocks if unsafe
Step 1: Paper Extractor
Step 2: Methodology Critic + Relevance Researcher (parallel concept, sequential in CrewAI)
Step 3: Review Synthesizer
Step 4: Rubric Evaluator
Step 5: Enhancer
Gate 2: Quality Check (programmatic)
"""
import os
import sys
import io
import time
import json
import re
import traceback
import threading
from datetime import datetime
from typing import Optional
import gradio as gr
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# ============================================================
# SQLite Fix for Hugging Face Spaces (ChromaDB requirement)
# ============================================================
if sys.platform.startswith("linux"):
try:
__import__("pysqlite3")
sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")
except ImportError:
pass
# ============================================================
# Pipeline Logger β€” engineering_guardrails.md Β§5.1
# ============================================================
class PipelineLogger:
"""Logs every agent step + tool result for observability.
SECURITY: Never logs raw PII, API keys, or secrets.
All text content is truncated to prevent accidental PII in logs.
Reference: engineering_guardrails.md Β§5.4 Logging Security Rules
"""
def __init__(self):
self.logs: list[dict] = []
self.start_time = time.time()
def log_step(
self,
agent_name: str,
status: str,
details: Optional[str] = None,
error: Optional[str] = None,
tool_name: Optional[str] = None,
tokens_used: Optional[int] = None,
):
entry = {
"timestamp": datetime.now().isoformat(),
"elapsed_seconds": round(time.time() - self.start_time, 2),
"agent": agent_name,
"status": status,
}
if details:
entry["details"] = details
if error:
entry["error"] = self._sanitize(error)
if tool_name:
entry["tool"] = tool_name
if tokens_used:
entry["tokens_used"] = tokens_used
self.logs.append(entry)
def _sanitize(self, text: str) -> str:
text = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED_API_KEY]', text)
text = re.sub(r'key["\s:=]+["\']?[a-zA-Z0-9]{20,}', '[REDACTED_KEY]', text)
return text
def _truncate(self, text: str, max_chars: int = 200) -> str:
if len(text) <= max_chars:
return text
return text[:max_chars] + f"... [truncated, {len(text)} chars total]"
def get_summary(self) -> dict:
total_time = round(time.time() - self.start_time, 2)
agent_times = {}
for log in self.logs:
agent = log["agent"]
if agent not in agent_times:
agent_times[agent] = {"status": "unknown", "elapsed": 0}
agent_times[agent]["status"] = log["status"]
agent_times[agent]["elapsed"] = log["elapsed_seconds"]
failed_agents = [
name for name, info in agent_times.items()
if info["status"] == "failed"
]
return {
"total_time_seconds": total_time,
"total_steps": len(self.logs),
"agents_completed": len([
a for a in agent_times.values() if a["status"] == "completed"
]),
"agents_failed": failed_agents,
"per_agent": agent_times,
}
def get_logs_for_display(self) -> str:
lines = []
lines.append("=" * 60)
lines.append("PIPELINE EXECUTION LOG")
lines.append("=" * 60)
for entry in self.logs:
timestamp = entry["elapsed_seconds"]
agent = entry["agent"]
status = entry["status"]
emoji = {
"started": "πŸ”„",
"completed": "βœ…",
"failed": "❌",
"retrying": "πŸ”",
"passed": "βœ…",
}.get(status, "πŸ“Œ")
line = f"[{timestamp:>6.1f}s] {emoji} {agent}: {status}"
if entry.get("tool"):
line += f" (tool: {entry['tool']})"
if entry.get("details"):
line += f" β€” {entry['details']}"
if entry.get("error"):
line += f"\n ⚠️ Error: {entry['error']}"
if entry.get("tokens_used"):
line += f" [{entry['tokens_used']} tokens]"
lines.append(line)
summary = self.get_summary()
lines.append("")
lines.append("-" * 60)
lines.append("SUMMARY")
lines.append(f" Total time: {summary['total_time_seconds']}s")
lines.append(f" Steps logged: {summary['total_steps']}")
lines.append(f" Agents completed: {summary['agents_completed']}")
if summary['agents_failed']:
lines.append(f" Agents failed: {', '.join(summary['agents_failed'])}")
lines.append("=" * 60)
return "\n".join(lines)
# ============================================================
# Stdout Capture β€” streams CrewAI verbose output to UI
# ============================================================
class StreamCapture:
"""Captures stdout to show live agent process in the UI.
Tees output to both the real terminal and an internal buffer."""
def __init__(self):
self.buffer = io.StringIO()
self._real_stdout = sys.stdout
self._real_stderr = sys.stderr
self.lock = threading.Lock()
def write(self, text):
with self.lock:
self.buffer.write(text)
self._real_stdout.write(text)
def flush(self):
self._real_stdout.flush()
def get_output(self) -> str:
with self.lock:
return self.buffer.getvalue()
def __enter__(self):
sys.stdout = self
sys.stderr = self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self._real_stdout
sys.stderr = self._real_stderr
def isatty(self):
"""Mock isatty check to prevent AttributeError."""
return False
def fileno(self):
"""Delegate fileno to real stdout if needed."""
return self._real_stdout.fileno()
@property
def encoding(self):
return self._real_stdout.encoding
# ============================================================
# Pipeline Functions
# ============================================================
def run_safety_check(file_path: str, pipeline_log: PipelineLogger) -> dict:
"""Run safety checks programmatically β€” no LLM needed.
Calls the 3 safety tools directly as Python functions for speed and accuracy.
This avoids LLM hallucinations and takes <1 second instead of ~480s.
"""
import json as _json
from tools.pdf_parser import pdf_parser_tool
from tools.pii_detector import pii_detector_tool
from tools.injection_scanner import prompt_injection_scanner_tool
from tools.url_validator import url_validator_tool
from schemas.models import SafetyReport
pipeline_log.log_step("safety_guardian", "started")
# Step 1: Extract text from PDF
pipeline_log.log_step("safety_guardian", "started", tool_name="pdf_parser",
details="Extracting raw text from PDF")
raw_text = pdf_parser_tool.run(file_path)
if raw_text.startswith("ERROR:"):
pipeline_log.log_step("safety_guardian", "failed",
error=f"PDF extraction failed: {raw_text}")
return {
"success": False,
"error": raw_text,
"safety_report": None,
"raw_text": None,
}
pipeline_log.log_step("safety_guardian", "completed", tool_name="pdf_parser",
details=f"{len(raw_text)} chars extracted")
# Step 2: Run safety tools PROGRAMMATICALLY (no LLM)
# Scan first 15k chars for safety (enough to catch injections, much faster)
scan_text = raw_text[:15000]
# PII Detection
pipeline_log.log_step("safety_guardian", "started", tool_name="pii_detector",
details="Scanning for PII")
pii_result = _json.loads(pii_detector_tool.run(scan_text))
pii_found = pii_result.get("findings", [])
redacted_text = pii_result.get("redacted_text", raw_text)
# If we only scanned a portion, append the rest
if len(raw_text) > 15000:
redacted_text = redacted_text + raw_text[15000:]
pipeline_log.log_step("safety_guardian", "completed", tool_name="pii_detector",
details=f"{len(pii_found)} PII types found")
# Injection Scanning
pipeline_log.log_step("safety_guardian", "started", tool_name="injection_scanner",
details="Checking for prompt injections")
injection_result = _json.loads(prompt_injection_scanner_tool.run(scan_text))
injection_detected = not injection_result.get("is_safe", True)
pipeline_log.log_step("safety_guardian", "completed", tool_name="injection_scanner",
details=f"injection_detected={injection_detected}")
# URL Validation
pipeline_log.log_step("safety_guardian", "started", tool_name="url_validator",
details="Validating URLs")
url_result = _json.loads(url_validator_tool.run(scan_text))
malicious_urls = url_result.get("malicious_urls", [])
pipeline_log.log_step("safety_guardian", "completed", tool_name="url_validator",
details=f"{len(malicious_urls)} malicious URLs found")
# Step 3: Build SafetyReport programmatically
is_safe = (not injection_detected) and (len(malicious_urls) == 0)
if injection_detected or len(malicious_urls) > 0:
risk_level = "high"
elif len(pii_found) > 0:
risk_level = "medium"
else:
risk_level = "low"
safety_report = SafetyReport(
is_safe=is_safe,
pii_found=pii_found,
injection_detected=injection_detected,
malicious_urls=malicious_urls,
sanitized_text=redacted_text,
risk_level=risk_level,
)
pipeline_log.log_step("safety_guardian", "completed",
details=f"is_safe={is_safe}, risk_level={risk_level}")
return {
"success": True,
"safety_report": safety_report,
"raw_text": raw_text,
}
def run_analysis_crew(sanitized_text: str, pipeline_log: PipelineLogger) -> dict:
"""Run the main analysis crew (Steps 1-5).
Reference: system_design.md β€” Manager Delegation Order (Lines 59-64)
Reference: engineering_guardrails.md β€” Β§3 Error Handling + Β§5.3 Callbacks
"""
from crewai import Crew, Process
from agents.paper_extractor import paper_extractor, create_extraction_task
from agents.methodology_critic import methodology_critic, create_critique_task
from agents.relevance_researcher import relevance_researcher, create_research_task
from agents.review_synthesizer import review_synthesizer, create_synthesis_task
from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task
from agents.enhancer import enhancer, create_enhancement_task
from tools.citation_search import _reset_call_count
_reset_call_count() # Reset API call counter for this run
# --- Step 1: Paper Extraction ---
pipeline_log.log_step("paper_extractor", "started")
try:
extraction_task = create_extraction_task(sanitized_text)
extraction_crew = Crew(
agents=[paper_extractor],
tasks=[extraction_task],
process=Process.sequential,
verbose=True,
)
extraction_result = extraction_crew.kickoff()
if hasattr(extraction_result, 'pydantic') and extraction_result.pydantic:
paper_data = extraction_result.pydantic
paper_json = paper_data.model_dump_json(indent=2)
else:
paper_json = str(extraction_result.raw) if hasattr(extraction_result, 'raw') else str(extraction_result)
pipeline_log.log_step("paper_extractor", "completed",
details=f"Extracted paper data")
except Exception as e:
pipeline_log.log_step("paper_extractor", "failed", error=str(e))
paper_json = json.dumps({"error": f"Extraction failed: {str(e)}", "raw_text": sanitized_text[:5000]})
# --- Step 2a: Methodology Critique ---
pipeline_log.log_step("methodology_critic", "started")
try:
critique_task = create_critique_task(paper_json)
critique_crew = Crew(
agents=[methodology_critic],
tasks=[critique_task],
process=Process.sequential,
verbose=True,
)
critique_result = critique_crew.kickoff()
if hasattr(critique_result, 'pydantic') and critique_result.pydantic:
critique_data = critique_result.pydantic
critique_json = critique_data.model_dump_json(indent=2)
else:
critique_json = str(critique_result.raw) if hasattr(critique_result, 'raw') else str(critique_result)
pipeline_log.log_step("methodology_critic", "completed",
details="Critique completed")
except Exception as e:
pipeline_log.log_step("methodology_critic", "failed", error=str(e))
critique_json = json.dumps({"error": f"Critique failed: {str(e)}"})
# --- Step 2b: Relevance Research ---
pipeline_log.log_step("relevance_researcher", "started")
try:
research_task = create_research_task(paper_json)
research_crew = Crew(
agents=[relevance_researcher],
tasks=[research_task],
process=Process.sequential,
verbose=True,
)
research_result = research_crew.kickoff()
if hasattr(research_result, 'pydantic') and research_result.pydantic:
research_data = research_result.pydantic
research_json = research_data.model_dump_json(indent=2)
else:
research_json = str(research_result.raw) if hasattr(research_result, 'raw') else str(research_result)
pipeline_log.log_step("relevance_researcher", "completed",
details="Research completed")
except Exception as e:
pipeline_log.log_step("relevance_researcher", "failed", error=str(e))
research_json = json.dumps({"error": f"Research failed: {str(e)}"})
# --- Step 3: Review Synthesis ---
pipeline_log.log_step("review_synthesizer", "started")
try:
synthesis_task = create_synthesis_task(paper_json, critique_json, research_json)
synthesis_crew = Crew(
agents=[review_synthesizer],
tasks=[synthesis_task],
process=Process.sequential,
verbose=True,
)
synthesis_result = synthesis_crew.kickoff()
if hasattr(synthesis_result, 'pydantic') and synthesis_result.pydantic:
draft_data = synthesis_result.pydantic
draft_json = draft_data.model_dump_json(indent=2)
else:
draft_json = str(synthesis_result.raw) if hasattr(synthesis_result, 'raw') else str(synthesis_result)
pipeline_log.log_step("review_synthesizer", "completed",
details="Draft review synthesized")
except Exception as e:
pipeline_log.log_step("review_synthesizer", "failed", error=str(e))
draft_json = json.dumps({"error": f"Synthesis failed: {str(e)}"})
# --- Step 4: Rubric Evaluation ---
pipeline_log.log_step("rubric_evaluator", "started")
try:
eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json)
eval_crew = Crew(
agents=[rubric_evaluator],
tasks=[eval_task],
process=Process.sequential,
verbose=True,
)
eval_result = eval_crew.kickoff()
if hasattr(eval_result, 'pydantic') and eval_result.pydantic:
rubric_data = eval_result.pydantic
rubric_json = rubric_data.model_dump_json(indent=2)
else:
rubric_json = str(eval_result.raw) if hasattr(eval_result, 'raw') else str(eval_result)
pipeline_log.log_step("rubric_evaluator", "completed",
details=f"Rubric evaluation completed")
except Exception as e:
pipeline_log.log_step("rubric_evaluator", "failed", error=str(e))
rubric_json = json.dumps({"error": f"Evaluation failed: {str(e)}"})
# --- Step 5: Enhancement ---
pipeline_log.log_step("enhancer", "started")
try:
enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json)
enhance_crew = Crew(
agents=[enhancer],
tasks=[enhance_task],
process=Process.sequential,
verbose=True,
)
enhance_result = enhance_crew.kickoff()
if hasattr(enhance_result, 'pydantic') and enhance_result.pydantic:
final_data = enhance_result.pydantic
final_json = final_data.model_dump_json(indent=2)
else:
final_json = str(enhance_result.raw) if hasattr(enhance_result, 'raw') else str(enhance_result)
pipeline_log.log_step("enhancer", "completed",
details="Final review produced")
except Exception as e:
pipeline_log.log_step("enhancer", "failed", error=str(e))
final_json = json.dumps({"error": f"Enhancement failed: {str(e)}"})
return {
"paper_extraction": paper_json,
"methodology_critique": critique_json,
"relevance_report": research_json,
"review_draft": draft_json,
"rubric_evaluation": rubric_json,
"final_review": final_json,
}
# ============================================================
# Output Formatting
# ============================================================
def format_executive_summary(final_json: str) -> str:
"""Format the executive summary tab."""
try:
data = json.loads(final_json)
if "error" in data:
return f"## ⚠️ Error\n\n{data['error']}"
recommendation = data.get("recommendation", "N/A")
emoji = {"Accept": "βœ…", "Revise": "πŸ”„", "Reject": "❌"}.get(recommendation, "❓")
md = f"# {emoji} Recommendation: **{recommendation}**\n\n"
md += f"**Confidence:** {data.get('confidence_score', 'N/A')}/5\n\n"
md += f"**Rubric Score:** {data.get('rubric_total', 'N/A')}/15\n\n" # Changed from /10 to /15
md += "---\n\n"
md += f"## Executive Summary\n\n{data.get('executive_summary', 'N/A')}\n\n"
metadata = data.get("paper_metadata", {})
if metadata:
md += "---\n\n## Paper Information\n\n"
md += f"- **Title:** {metadata.get('title', 'N/A')}\n"
md += f"- **Authors:** {metadata.get('authors', 'N/A')}\n"
md += f"- **Type:** {metadata.get('paper_type', 'N/A')}\n"
return md
except (json.JSONDecodeError, KeyError):
return f"## Raw Output\n\n{final_json}"
def format_full_review(final_json: str) -> str:
"""Format the full review tab."""
try:
data = json.loads(final_json)
if "error" in data:
return f"## ⚠️ Error\n\n{data['error']}"
md = "# Full Peer Review Report\n\n"
md += "## Strengths\n\n"
for s in data.get("strengths", []):
md += f"- {s}\n"
md += "\n## Weaknesses\n\n"
for w in data.get("weaknesses", []):
md += f"- {w}\n"
md += f"\n## Methodology Assessment\n\n{data.get('methodology_assessment', 'N/A')}\n"
md += f"\n## Novelty Assessment\n\n{data.get('novelty_assessment', 'N/A')}\n"
md += f"\n## Related Work Context\n\n{data.get('related_work_context', 'N/A')}\n"
md += "\n## Questions for Authors\n\n"
for q in data.get("questions_for_authors", []):
md += f"1. {q}\n"
if data.get("improvement_log"):
md += "\n## Enhancement Log\n\n"
for item in data["improvement_log"]:
md += f"- {item}\n"
return md
except (json.JSONDecodeError, KeyError):
return f"## Raw Output\n\n{final_json}"
def format_rubric_scorecard(rubric_json: str) -> str:
"""Format the rubric scorecard tab."""
try:
data = json.loads(rubric_json)
if "error" in data:
return f"## ⚠️ Error\n\n{data['error']}"
total = data.get("total_score", "?")
passed = data.get("passed", False)
status_emoji = "βœ… PASSED" if passed else "❌ NEEDS IMPROVEMENT"
md = f"# Rubric Scorecard β€” {total}/15 {status_emoji}\n\n"
scores = data.get("scores", {})
feedback = data.get("feedback_per_criterion", {})
categories = [
("πŸ“‹ Content Completeness", [
"title_authors_correct", "abstract_summarized",
"methodology_described", "strengths_sufficient",
"weaknesses_sufficient", "limitations_acknowledged",
"related_work_present",
]),
("πŸ”¬ Analytical Depth", [
"novelty_assessed", "reproducibility_discussed",
"evidence_quality_evaluated", "contribution_stated",
]),
("πŸ“ Review Quality", [
"recommendation_justified", "actionable_questions",
"no_hallucinated_citations", "professional_coherent",
]),
]
for cat_name, criteria in categories:
md += f"### {cat_name}\n\n"
md += "| # | Criterion | Score | Feedback |\n"
md += "|---|-----------|-------|----------|\n"
for name in criteria:
score = scores.get(name, "?")
score_emoji = "βœ…" if score == 1 else "❌" if score == 0 else "❓"
fb = feedback.get(name, "N/A")
display_name = name.replace("_", " ").title()
md += f"| | {display_name} | {score_emoji} {score} | {fb} |\n"
md += "\n"
return md
except (json.JSONDecodeError, KeyError):
return f"## Raw Output\n\n{rubric_json}"
def format_safety_report(safety_report) -> str:
"""Format the safety report tab."""
if safety_report is None:
return "## ⚠️ Safety check was not completed."
try:
if hasattr(safety_report, 'model_dump'):
data = safety_report.model_dump()
else:
data = json.loads(str(safety_report))
is_safe = data.get("is_safe", False)
status = "βœ… SAFE β€” Document passed all safety checks" if is_safe else "❌ UNSAFE β€” Issues detected"
md = f"# Safety Report β€” {status}\n\n"
md += f"**Risk Level:** {data.get('risk_level', 'N/A')}\n\n"
if data.get("pii_found"):
md += "## PII Detected\n\n"
for pii in data["pii_found"]:
md += f"- πŸ”’ {pii}\n"
md += "\n*All PII has been redacted before analysis.*\n\n"
else:
md += "## PII: βœ… None detected\n\n"
md += f"## Prompt Injection: {'❌ DETECTED' if data.get('injection_detected') else 'βœ… None detected'}\n\n"
if data.get("malicious_urls"):
md += "## Malicious URLs\n\n"
for url in data["malicious_urls"]:
md += f"- ⚠️ {url}\n"
else:
md += "## URLs: βœ… No malicious URLs found\n\n"
return md
except Exception:
return f"## Raw Safety Data\n\n{str(safety_report)}"
def format_agent_outputs(analysis_results: dict, safety_report=None) -> str:
"""Format individual agent outputs into a readable markdown view."""
md = "# πŸ“Š Individual Agent Outputs\n\n"
md += "Each agent's structured output is shown below.\n\n---\n\n"
# Safety Guardian
if safety_report:
md += "## πŸ›‘οΈ Agent 1: Safety Guardian\n\n"
md += f"- **is_safe:** {safety_report.is_safe}\n"
md += f"- **risk_level:** {safety_report.risk_level}\n"
md += f"- **injection_detected:** {safety_report.injection_detected}\n"
md += f"- **pii_found:** {safety_report.pii_found}\n"
md += f"- **malicious_urls:** {safety_report.malicious_urls}\n"
md += "\n---\n\n"
agents = [
("πŸ“„ Agent 2: Paper Extractor", "paper_extraction"),
("πŸ”¬ Agent 3: Methodology Critic", "methodology_critique"),
("πŸ” Agent 4: Relevance Researcher", "relevance_report"),
("✍️ Agent 5: Review Synthesizer", "review_draft"),
("πŸ“ Agent 6: Rubric Evaluator", "rubric_evaluation"),
("✨ Agent 7: Enhancer (Final)", "final_review"),
]
for title, key in agents:
raw = analysis_results.get(key, "{}")
md += f"## {title}\n\n"
try:
data = json.loads(raw)
if isinstance(data, dict):
for k, v in data.items():
if isinstance(v, list):
md += f"**{k}:**\n"
for item in v:
if isinstance(item, dict):
md += f" - {json.dumps(item)}\n"
else:
md += f" - {item}\n"
elif isinstance(v, dict):
md += f"**{k}:**\n"
for dk, dv in v.items():
md += f" - {dk}: {dv}\n"
else:
md += f"**{k}:** {v}\n"
else:
md += f"```\n{raw[:2000]}\n```\n"
except (json.JSONDecodeError, TypeError):
md += f"```\n{str(raw)[:2000]}\n```\n"
md += "\n---\n\n"
return md
def create_download_file(full_review_md: str) -> str:
"""Create a temporary markdown file for download."""
if not full_review_md:
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"Paper_Review_{timestamp}.md"
# Save to temporary file
with open(filename, "w", encoding="utf-8") as f:
f.write(full_review_md)
return filename
# ============================================================
# Main Pipeline β€” engineering_guardrails.md Β§3.4
# ============================================================
import gradio as gr # Import gradio for gr.Progress and gr.update
def run_analysis_pipeline(pdf_file, progress=gr.Progress()) -> tuple:
"""Main pipeline with full error handling.
Reference: engineering_guardrails.md Β§3.4 Pipeline-Level Error Handling
Returns:
(summary, review, score, safety, logs, summary_json, agents, download_file, download_visible)
"""
pipeline_log = PipelineLogger()
empty_agents = "*Upload a PDF and click 'Analyze Paper' to see agent outputs.*"
try:
# === Step 0: File validation ===
pipeline_log.log_step("file_validation", "started")
progress(0.05, desc="Validating file...")
if pdf_file is None:
pipeline_log.log_step("file_validation", "failed", error="No file uploaded")
error_msg = "## ⚠️ No file uploaded\n\nPlease upload a PDF file."
return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False)
# Handle Gradio file object
file_path = pdf_file.name if hasattr(pdf_file, 'name') else str(pdf_file)
if not file_path.endswith(".pdf"):
pipeline_log.log_step("file_validation", "failed", error="Not a PDF file")
error_msg = "## ⚠️ Invalid file type\n\nPlease upload a PDF file."
return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False)
pipeline_log.log_step("file_validation", "passed")
# === Step 1: Safety Gate ===
progress(0.1, desc="Running Safety Gate...")
safety_result = run_safety_check(file_path, pipeline_log)
if not safety_result["success"]:
error_msg = f"## ⚠️ Safety check failed\n\n{safety_result.get('error', 'Unknown error')}"
safety_md = format_safety_report(safety_result.get("safety_report"))
return error_msg, error_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), empty_agents, None, gr.update(visible=False)
safety_report = safety_result["safety_report"]
if not safety_report.is_safe:
pipeline_log.log_step("pipeline", "blocked", details="Safety gate blocked the document")
safety_md = format_safety_report(safety_report)
block_msg = (
"## πŸ›‘οΈ Document Blocked by Safety Guardian\n\n"
"This document was flagged as potentially unsafe and cannot be analyzed.\n\n"
f"**Risk Level:** {safety_report.risk_level}\n\n"
"Please review the Safety Report tab for details."
)
return block_msg, block_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), format_agent_outputs({}, safety_report), None, gr.update(visible=False)
# Get sanitized text for analysis
sanitized_text = safety_report.sanitized_text or safety_result.get("raw_text", "")
# === Step 2: Main Analysis Crew ===
pipeline_log.log_step("analysis_crew", "started")
# We pass progress directly to the crew function if we want granular update,
# but here we'll update it based on steps returning.
# Actually simplified: we run the crew function which does sequential steps.
# To get progress updates, we'd need to emit them from inside run_analysis_crew
# or split run_analysis_crew into individual calls here.
# Refactoring run_analysis_crew to be inline here for progress updates!
# --- Import Logic ---
from crewai import Crew, Process
from agents.paper_extractor import paper_extractor, create_extraction_task
from agents.methodology_critic import methodology_critic, create_critique_task
from agents.relevance_researcher import relevance_researcher, create_research_task
from agents.review_synthesizer import review_synthesizer, create_synthesis_task
from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task
from agents.enhancer import enhancer, create_enhancement_task
from tools.citation_search import _reset_call_count
_reset_call_count()
analysis_results = {}
# 1. Extraction
progress(0.2, desc="Agent 1/6: Formatting & Extracting...")
pipeline_log.log_step("paper_extractor", "started")
try:
extraction_task = create_extraction_task(sanitized_text)
extraction_crew = Crew(agents=[paper_extractor], tasks=[extraction_task], process=Process.sequential, verbose=True)
res = extraction_crew.kickoff()
paper_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["paper_extraction"] = paper_json
pipeline_log.log_step("paper_extractor", "completed")
except Exception as e:
pipeline_log.log_step("paper_extractor", "failed", error=str(e))
paper_json = json.dumps({"error": str(e)})
# 2a. Critique
progress(0.35, desc="Agent 2/6: Critiquing Methodology...")
pipeline_log.log_step("methodology_critic", "started")
try:
critique_task = create_critique_task(paper_json)
critique_crew = Crew(agents=[methodology_critic], tasks=[critique_task], process=Process.sequential, verbose=True)
res = critique_crew.kickoff()
critique_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["methodology_critique"] = critique_json
pipeline_log.log_step("methodology_critic", "completed")
except Exception as e:
pipeline_log.log_step("methodology_critic", "failed", error=str(e))
critique_json = json.dumps({"error": str(e)})
# 2b. Research
progress(0.5, desc="Agent 3/6: Searching Related Work...")
pipeline_log.log_step("relevance_researcher", "started")
try:
research_task = create_research_task(paper_json)
research_crew = Crew(agents=[relevance_researcher], tasks=[research_task], process=Process.sequential, verbose=True)
res = research_crew.kickoff()
research_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["relevance_report"] = research_json
pipeline_log.log_step("relevance_researcher", "completed")
except Exception as e:
pipeline_log.log_step("relevance_researcher", "failed", error=str(e))
research_json = json.dumps({"error": str(e)})
# 3. Synthesis
progress(0.65, desc="Agent 4/6: Synthesizing Draft...")
pipeline_log.log_step("review_synthesizer", "started")
try:
synthesis_task = create_synthesis_task(paper_json, critique_json, research_json)
synthesis_crew = Crew(agents=[review_synthesizer], tasks=[synthesis_task], process=Process.sequential, verbose=True)
res = synthesis_crew.kickoff()
draft_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["review_draft"] = draft_json
pipeline_log.log_step("review_synthesizer", "completed")
except Exception as e:
pipeline_log.log_step("review_synthesizer", "failed", error=str(e))
draft_json = json.dumps({"error": str(e)})
# 4. Rubric
progress(0.8, desc="Agent 5/6: Scoring against Rubric...")
pipeline_log.log_step("rubric_evaluator", "started")
try:
eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json)
eval_crew = Crew(agents=[rubric_evaluator], tasks=[eval_task], process=Process.sequential, verbose=True)
res = eval_crew.kickoff()
rubric_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["rubric_evaluation"] = rubric_json
pipeline_log.log_step("rubric_evaluator", "completed")
except Exception as e:
pipeline_log.log_step("rubric_evaluator", "failed", error=str(e))
rubric_json = json.dumps({"error": str(e)})
# 5. Enhancer
progress(0.9, desc="Agent 6/6: Final Polish...")
pipeline_log.log_step("enhancer", "started")
try:
enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json)
enhance_crew = Crew(agents=[enhancer], tasks=[enhance_task], process=Process.sequential, verbose=True)
res = enhance_crew.kickoff()
final_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw)
analysis_results["final_review"] = final_json
pipeline_log.log_step("enhancer", "completed")
except Exception as e:
pipeline_log.log_step("enhancer", "failed", error=str(e))
final_json = json.dumps({"error": str(e)})
pipeline_log.log_step("analysis_crew", "completed")
# === Step 3: Format outputs ===
progress(0.95, desc="Formatting Report...")
executive_summary = format_executive_summary(final_json)
full_review = format_full_review(final_json)
rubric_scorecard = format_rubric_scorecard(rubric_json)
safety_md = format_safety_report(safety_report)
agent_outputs_md = format_agent_outputs(analysis_results, safety_report)
# Create download file
download_file = create_download_file(full_review)
progress(1.0, desc="Done!")
return (
executive_summary,
full_review,
rubric_scorecard,
safety_md,
pipeline_log.get_logs_for_display(),
json.dumps(pipeline_log.get_summary(), indent=2),
agent_outputs_md,
download_file,
gr.update(visible=True) # Show download button
)
except Exception as e:
pipeline_log.log_step("pipeline", "critical_failure", error=traceback.format_exc())
error_msg = (
"## ⚠️ Unexpected Error\n\n"
"An unexpected error occurred. Please try again.\n\n"
f"**Error:** {type(e).__name__}: {str(e)}"
)
return (
error_msg, error_msg, "", "",
pipeline_log.get_logs_for_display(),
json.dumps(pipeline_log.get_summary(), indent=2),
empty_agents,
None,
gr.update(visible=False)
)
# ============================================================
# Gradio UI β€” system_design.md Lines 279-287
# ============================================================
import gradio as gr
def build_ui():
"""Build the Gradio interface with 5 tabs."""
with gr.Blocks(
title="AI Research Paper Analyst",
theme=gr.themes.Default(primary_hue="blue", secondary_hue="slate"),
css=".main-header {text-align: center; margin-bottom: 2rem;} .upload-area {min-height: 150px;}"
) as app:
gr.Markdown(
"""
# πŸ”¬ AI Research Paper Analyst
### Automated Peer-Review System powered by Multi-Agent AI
Upload a research paper (PDF) and receive a comprehensive peer-review analysis
with methodology critique, novelty assessment, and a structured recommendation.
---
""",
elem_classes="main-header"
)
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(
label="πŸ“„ Upload Research Paper (PDF)",
file_types=[".pdf"],
type="filepath",
elem_classes="upload-area",
)
analyze_btn = gr.Button(
"πŸ” Analyze Paper",
variant="primary",
size="lg",
)
gr.Markdown(
"*Analysis typically takes 1-3 minutes depending on paper length.*"
)
# Output Components
with gr.Tabs():
with gr.Tab("πŸ“‹ Executive Summary"):
summary_output = gr.Markdown(
label="Executive Summary",
value="*Upload a PDF and click 'Analyze Paper' to begin.*"
)
# Download button hidden initially
download_btn = gr.DownloadButton(
label="πŸ’Ύ Download Full Report (Markdown)",
visible=False
)
with gr.Tab("πŸ“ Full Review"):
review_output = gr.Markdown(
label="Full Review Report",
value="*Detailed review will appear here after analysis.*"
)
with gr.Tab("πŸ“Š Rubric Scorecard"):
rubric_output = gr.Markdown(
label="Rubric Scorecard",
value="*Rubric evaluation will appear here after analysis.*"
)
with gr.Tab("πŸ›‘οΈ Safety Report"):
safety_output = gr.Markdown(
label="Safety Report",
value="*Safety scan results will appear here after analysis.*"
)
with gr.Tab("οΏ½ Agent Outputs"):
agent_outputs = gr.Markdown(
label="Per-Agent Structured Outputs",
value="*Upload a PDF and click 'Analyze Paper' to see agent outputs.*"
)
with gr.Tab("βš™οΈ Pipeline Logs"): # Renamed tab
logs_output = gr.Textbox(
label="Execution Log",
lines=25,
interactive=False,
)
summary_json = gr.Code(
label="Run Summary (JSON)",
language="json"
)
analyze_btn.click(
fn=run_analysis_pipeline,
inputs=[pdf_input],
outputs=[
summary_output,
review_output,
rubric_output,
safety_output,
logs_output,
summary_json,
agent_outputs,
download_btn, # File component
download_btn, # For visibility update
],
)
return app
# ============================================================
# Entry Point
# ============================================================
if __name__ == "__main__":
# Verify API key is set
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ WARNING: OPENAI_API_KEY not found in environment.")
print(" Create a .env file with: OPENAI_API_KEY=your-key-here")
print(" Or export it: export OPENAI_API_KEY=your-key-here")
print()
# Launch with environment variables handling port/host
ui = build_ui()
ui.launch()