""" AI Research Paper Analyst — Main Application CrewAI Hierarchical Pipeline + Gradio UI Reference: system_design.md — Complete System Architecture Reference: engineering_guardrails.md — §3 Error Handling, §5 Observability Pipeline Flow: Gate 1: Safety Guardian → blocks if unsafe Step 1: Paper Extractor Step 2: Methodology Critic + Relevance Researcher (parallel concept, sequential in CrewAI) Step 3: Review Synthesizer Step 4: Rubric Evaluator Step 5: Enhancer Gate 2: Quality Check (programmatic) """ import os import sys import io import time import json import re import traceback import threading from datetime import datetime from typing import Optional import gradio as gr from dotenv import load_dotenv # Load environment variables load_dotenv() # ============================================================ # SQLite Fix for Hugging Face Spaces (ChromaDB requirement) # ============================================================ if sys.platform.startswith("linux"): try: __import__("pysqlite3") sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") except ImportError: pass # ============================================================ # Pipeline Logger — engineering_guardrails.md §5.1 # ============================================================ class PipelineLogger: """Logs every agent step + tool result for observability. SECURITY: Never logs raw PII, API keys, or secrets. All text content is truncated to prevent accidental PII in logs. Reference: engineering_guardrails.md §5.4 Logging Security Rules """ def __init__(self): self.logs: list[dict] = [] self.start_time = time.time() def log_step( self, agent_name: str, status: str, details: Optional[str] = None, error: Optional[str] = None, tool_name: Optional[str] = None, tokens_used: Optional[int] = None, ): entry = { "timestamp": datetime.now().isoformat(), "elapsed_seconds": round(time.time() - self.start_time, 2), "agent": agent_name, "status": status, } if details: entry["details"] = details if error: entry["error"] = self._sanitize(error) if tool_name: entry["tool"] = tool_name if tokens_used: entry["tokens_used"] = tokens_used self.logs.append(entry) def _sanitize(self, text: str) -> str: text = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED_API_KEY]', text) text = re.sub(r'key["\s:=]+["\']?[a-zA-Z0-9]{20,}', '[REDACTED_KEY]', text) return text def _truncate(self, text: str, max_chars: int = 200) -> str: if len(text) <= max_chars: return text return text[:max_chars] + f"... [truncated, {len(text)} chars total]" def get_summary(self) -> dict: total_time = round(time.time() - self.start_time, 2) agent_times = {} for log in self.logs: agent = log["agent"] if agent not in agent_times: agent_times[agent] = {"status": "unknown", "elapsed": 0} agent_times[agent]["status"] = log["status"] agent_times[agent]["elapsed"] = log["elapsed_seconds"] failed_agents = [ name for name, info in agent_times.items() if info["status"] == "failed" ] return { "total_time_seconds": total_time, "total_steps": len(self.logs), "agents_completed": len([ a for a in agent_times.values() if a["status"] == "completed" ]), "agents_failed": failed_agents, "per_agent": agent_times, } def get_logs_for_display(self) -> str: lines = [] lines.append("=" * 60) lines.append("PIPELINE EXECUTION LOG") lines.append("=" * 60) for entry in self.logs: timestamp = entry["elapsed_seconds"] agent = entry["agent"] status = entry["status"] emoji = { "started": "🔄", "completed": "✅", "failed": "❌", "retrying": "🔁", "passed": "✅", }.get(status, "📌") line = f"[{timestamp:>6.1f}s] {emoji} {agent}: {status}" if entry.get("tool"): line += f" (tool: {entry['tool']})" if entry.get("details"): line += f" — {entry['details']}" if entry.get("error"): line += f"\n ⚠️ Error: {entry['error']}" if entry.get("tokens_used"): line += f" [{entry['tokens_used']} tokens]" lines.append(line) summary = self.get_summary() lines.append("") lines.append("-" * 60) lines.append("SUMMARY") lines.append(f" Total time: {summary['total_time_seconds']}s") lines.append(f" Steps logged: {summary['total_steps']}") lines.append(f" Agents completed: {summary['agents_completed']}") if summary['agents_failed']: lines.append(f" Agents failed: {', '.join(summary['agents_failed'])}") lines.append("=" * 60) return "\n".join(lines) # ============================================================ # Stdout Capture — streams CrewAI verbose output to UI # ============================================================ class StreamCapture: """Captures stdout to show live agent process in the UI. Tees output to both the real terminal and an internal buffer.""" def __init__(self): self.buffer = io.StringIO() self._real_stdout = sys.stdout self._real_stderr = sys.stderr self.lock = threading.Lock() def write(self, text): with self.lock: self.buffer.write(text) self._real_stdout.write(text) def flush(self): self._real_stdout.flush() def get_output(self) -> str: with self.lock: return self.buffer.getvalue() def __enter__(self): sys.stdout = self sys.stderr = self return self def __exit__(self, exc_type, exc_val, exc_tb): sys.stdout = self._real_stdout sys.stderr = self._real_stderr def isatty(self): """Mock isatty check to prevent AttributeError.""" return False def fileno(self): """Delegate fileno to real stdout if needed.""" return self._real_stdout.fileno() @property def encoding(self): return self._real_stdout.encoding # ============================================================ # Pipeline Functions # ============================================================ def run_safety_check(file_path: str, pipeline_log: PipelineLogger) -> dict: """Run safety checks programmatically — no LLM needed. Calls the 3 safety tools directly as Python functions for speed and accuracy. This avoids LLM hallucinations and takes <1 second instead of ~480s. """ import json as _json from tools.pdf_parser import pdf_parser_tool from tools.pii_detector import pii_detector_tool from tools.injection_scanner import prompt_injection_scanner_tool from tools.url_validator import url_validator_tool from schemas.models import SafetyReport pipeline_log.log_step("safety_guardian", "started") # Step 1: Extract text from PDF pipeline_log.log_step("safety_guardian", "started", tool_name="pdf_parser", details="Extracting raw text from PDF") raw_text = pdf_parser_tool.run(file_path) if raw_text.startswith("ERROR:"): pipeline_log.log_step("safety_guardian", "failed", error=f"PDF extraction failed: {raw_text}") return { "success": False, "error": raw_text, "safety_report": None, "raw_text": None, } pipeline_log.log_step("safety_guardian", "completed", tool_name="pdf_parser", details=f"{len(raw_text)} chars extracted") # Step 2: Run safety tools PROGRAMMATICALLY (no LLM) # Scan first 15k chars for safety (enough to catch injections, much faster) scan_text = raw_text[:15000] # PII Detection pipeline_log.log_step("safety_guardian", "started", tool_name="pii_detector", details="Scanning for PII") pii_result = _json.loads(pii_detector_tool.run(scan_text)) pii_found = pii_result.get("findings", []) redacted_text = pii_result.get("redacted_text", raw_text) # If we only scanned a portion, append the rest if len(raw_text) > 15000: redacted_text = redacted_text + raw_text[15000:] pipeline_log.log_step("safety_guardian", "completed", tool_name="pii_detector", details=f"{len(pii_found)} PII types found") # Injection Scanning pipeline_log.log_step("safety_guardian", "started", tool_name="injection_scanner", details="Checking for prompt injections") injection_result = _json.loads(prompt_injection_scanner_tool.run(scan_text)) injection_detected = not injection_result.get("is_safe", True) pipeline_log.log_step("safety_guardian", "completed", tool_name="injection_scanner", details=f"injection_detected={injection_detected}") # URL Validation pipeline_log.log_step("safety_guardian", "started", tool_name="url_validator", details="Validating URLs") url_result = _json.loads(url_validator_tool.run(scan_text)) malicious_urls = url_result.get("malicious_urls", []) pipeline_log.log_step("safety_guardian", "completed", tool_name="url_validator", details=f"{len(malicious_urls)} malicious URLs found") # Step 3: Build SafetyReport programmatically is_safe = (not injection_detected) and (len(malicious_urls) == 0) if injection_detected or len(malicious_urls) > 0: risk_level = "high" elif len(pii_found) > 0: risk_level = "medium" else: risk_level = "low" safety_report = SafetyReport( is_safe=is_safe, pii_found=pii_found, injection_detected=injection_detected, malicious_urls=malicious_urls, sanitized_text=redacted_text, risk_level=risk_level, ) pipeline_log.log_step("safety_guardian", "completed", details=f"is_safe={is_safe}, risk_level={risk_level}") return { "success": True, "safety_report": safety_report, "raw_text": raw_text, } def run_analysis_crew(sanitized_text: str, pipeline_log: PipelineLogger) -> dict: """Run the main analysis crew (Steps 1-5). Reference: system_design.md — Manager Delegation Order (Lines 59-64) Reference: engineering_guardrails.md — §3 Error Handling + §5.3 Callbacks """ from crewai import Crew, Process from agents.paper_extractor import paper_extractor, create_extraction_task from agents.methodology_critic import methodology_critic, create_critique_task from agents.relevance_researcher import relevance_researcher, create_research_task from agents.review_synthesizer import review_synthesizer, create_synthesis_task from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task from agents.enhancer import enhancer, create_enhancement_task from tools.citation_search import _reset_call_count _reset_call_count() # Reset API call counter for this run # --- Step 1: Paper Extraction --- pipeline_log.log_step("paper_extractor", "started") try: extraction_task = create_extraction_task(sanitized_text) extraction_crew = Crew( agents=[paper_extractor], tasks=[extraction_task], process=Process.sequential, verbose=True, ) extraction_result = extraction_crew.kickoff() if hasattr(extraction_result, 'pydantic') and extraction_result.pydantic: paper_data = extraction_result.pydantic paper_json = paper_data.model_dump_json(indent=2) else: paper_json = str(extraction_result.raw) if hasattr(extraction_result, 'raw') else str(extraction_result) pipeline_log.log_step("paper_extractor", "completed", details=f"Extracted paper data") except Exception as e: pipeline_log.log_step("paper_extractor", "failed", error=str(e)) paper_json = json.dumps({"error": f"Extraction failed: {str(e)}", "raw_text": sanitized_text[:5000]}) # --- Step 2a: Methodology Critique --- pipeline_log.log_step("methodology_critic", "started") try: critique_task = create_critique_task(paper_json) critique_crew = Crew( agents=[methodology_critic], tasks=[critique_task], process=Process.sequential, verbose=True, ) critique_result = critique_crew.kickoff() if hasattr(critique_result, 'pydantic') and critique_result.pydantic: critique_data = critique_result.pydantic critique_json = critique_data.model_dump_json(indent=2) else: critique_json = str(critique_result.raw) if hasattr(critique_result, 'raw') else str(critique_result) pipeline_log.log_step("methodology_critic", "completed", details="Critique completed") except Exception as e: pipeline_log.log_step("methodology_critic", "failed", error=str(e)) critique_json = json.dumps({"error": f"Critique failed: {str(e)}"}) # --- Step 2b: Relevance Research --- pipeline_log.log_step("relevance_researcher", "started") try: research_task = create_research_task(paper_json) research_crew = Crew( agents=[relevance_researcher], tasks=[research_task], process=Process.sequential, verbose=True, ) research_result = research_crew.kickoff() if hasattr(research_result, 'pydantic') and research_result.pydantic: research_data = research_result.pydantic research_json = research_data.model_dump_json(indent=2) else: research_json = str(research_result.raw) if hasattr(research_result, 'raw') else str(research_result) pipeline_log.log_step("relevance_researcher", "completed", details="Research completed") except Exception as e: pipeline_log.log_step("relevance_researcher", "failed", error=str(e)) research_json = json.dumps({"error": f"Research failed: {str(e)}"}) # --- Step 3: Review Synthesis --- pipeline_log.log_step("review_synthesizer", "started") try: synthesis_task = create_synthesis_task(paper_json, critique_json, research_json) synthesis_crew = Crew( agents=[review_synthesizer], tasks=[synthesis_task], process=Process.sequential, verbose=True, ) synthesis_result = synthesis_crew.kickoff() if hasattr(synthesis_result, 'pydantic') and synthesis_result.pydantic: draft_data = synthesis_result.pydantic draft_json = draft_data.model_dump_json(indent=2) else: draft_json = str(synthesis_result.raw) if hasattr(synthesis_result, 'raw') else str(synthesis_result) pipeline_log.log_step("review_synthesizer", "completed", details="Draft review synthesized") except Exception as e: pipeline_log.log_step("review_synthesizer", "failed", error=str(e)) draft_json = json.dumps({"error": f"Synthesis failed: {str(e)}"}) # --- Step 4: Rubric Evaluation --- pipeline_log.log_step("rubric_evaluator", "started") try: eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json) eval_crew = Crew( agents=[rubric_evaluator], tasks=[eval_task], process=Process.sequential, verbose=True, ) eval_result = eval_crew.kickoff() if hasattr(eval_result, 'pydantic') and eval_result.pydantic: rubric_data = eval_result.pydantic rubric_json = rubric_data.model_dump_json(indent=2) else: rubric_json = str(eval_result.raw) if hasattr(eval_result, 'raw') else str(eval_result) pipeline_log.log_step("rubric_evaluator", "completed", details=f"Rubric evaluation completed") except Exception as e: pipeline_log.log_step("rubric_evaluator", "failed", error=str(e)) rubric_json = json.dumps({"error": f"Evaluation failed: {str(e)}"}) # --- Step 5: Enhancement --- pipeline_log.log_step("enhancer", "started") try: enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json) enhance_crew = Crew( agents=[enhancer], tasks=[enhance_task], process=Process.sequential, verbose=True, ) enhance_result = enhance_crew.kickoff() if hasattr(enhance_result, 'pydantic') and enhance_result.pydantic: final_data = enhance_result.pydantic final_json = final_data.model_dump_json(indent=2) else: final_json = str(enhance_result.raw) if hasattr(enhance_result, 'raw') else str(enhance_result) pipeline_log.log_step("enhancer", "completed", details="Final review produced") except Exception as e: pipeline_log.log_step("enhancer", "failed", error=str(e)) final_json = json.dumps({"error": f"Enhancement failed: {str(e)}"}) return { "paper_extraction": paper_json, "methodology_critique": critique_json, "relevance_report": research_json, "review_draft": draft_json, "rubric_evaluation": rubric_json, "final_review": final_json, } # ============================================================ # Output Formatting # ============================================================ def format_executive_summary(final_json: str) -> str: """Format the executive summary tab.""" try: data = json.loads(final_json) if "error" in data: return f"## ⚠️ Error\n\n{data['error']}" recommendation = data.get("recommendation", "N/A") emoji = {"Accept": "✅", "Revise": "🔄", "Reject": "❌"}.get(recommendation, "❓") md = f"# {emoji} Recommendation: **{recommendation}**\n\n" md += f"**Confidence:** {data.get('confidence_score', 'N/A')}/5\n\n" md += f"**Rubric Score:** {data.get('rubric_total', 'N/A')}/15\n\n" # Changed from /10 to /15 md += "---\n\n" md += f"## Executive Summary\n\n{data.get('executive_summary', 'N/A')}\n\n" metadata = data.get("paper_metadata", {}) if metadata: md += "---\n\n## Paper Information\n\n" md += f"- **Title:** {metadata.get('title', 'N/A')}\n" md += f"- **Authors:** {metadata.get('authors', 'N/A')}\n" md += f"- **Type:** {metadata.get('paper_type', 'N/A')}\n" return md except (json.JSONDecodeError, KeyError): return f"## Raw Output\n\n{final_json}" def format_full_review(final_json: str) -> str: """Format the full review tab.""" try: data = json.loads(final_json) if "error" in data: return f"## ⚠️ Error\n\n{data['error']}" md = "# Full Peer Review Report\n\n" md += "## Strengths\n\n" for s in data.get("strengths", []): md += f"- {s}\n" md += "\n## Weaknesses\n\n" for w in data.get("weaknesses", []): md += f"- {w}\n" md += f"\n## Methodology Assessment\n\n{data.get('methodology_assessment', 'N/A')}\n" md += f"\n## Novelty Assessment\n\n{data.get('novelty_assessment', 'N/A')}\n" md += f"\n## Related Work Context\n\n{data.get('related_work_context', 'N/A')}\n" md += "\n## Questions for Authors\n\n" for q in data.get("questions_for_authors", []): md += f"1. {q}\n" if data.get("improvement_log"): md += "\n## Enhancement Log\n\n" for item in data["improvement_log"]: md += f"- {item}\n" return md except (json.JSONDecodeError, KeyError): return f"## Raw Output\n\n{final_json}" def format_rubric_scorecard(rubric_json: str) -> str: """Format the rubric scorecard tab.""" try: data = json.loads(rubric_json) if "error" in data: return f"## ⚠️ Error\n\n{data['error']}" total = data.get("total_score", "?") passed = data.get("passed", False) status_emoji = "✅ PASSED" if passed else "❌ NEEDS IMPROVEMENT" md = f"# Rubric Scorecard — {total}/15 {status_emoji}\n\n" scores = data.get("scores", {}) feedback = data.get("feedback_per_criterion", {}) categories = [ ("📋 Content Completeness", [ "title_authors_correct", "abstract_summarized", "methodology_described", "strengths_sufficient", "weaknesses_sufficient", "limitations_acknowledged", "related_work_present", ]), ("🔬 Analytical Depth", [ "novelty_assessed", "reproducibility_discussed", "evidence_quality_evaluated", "contribution_stated", ]), ("📝 Review Quality", [ "recommendation_justified", "actionable_questions", "no_hallucinated_citations", "professional_coherent", ]), ] for cat_name, criteria in categories: md += f"### {cat_name}\n\n" md += "| # | Criterion | Score | Feedback |\n" md += "|---|-----------|-------|----------|\n" for name in criteria: score = scores.get(name, "?") score_emoji = "✅" if score == 1 else "❌" if score == 0 else "❓" fb = feedback.get(name, "N/A") display_name = name.replace("_", " ").title() md += f"| | {display_name} | {score_emoji} {score} | {fb} |\n" md += "\n" return md except (json.JSONDecodeError, KeyError): return f"## Raw Output\n\n{rubric_json}" def format_safety_report(safety_report) -> str: """Format the safety report tab.""" if safety_report is None: return "## ⚠️ Safety check was not completed." try: if hasattr(safety_report, 'model_dump'): data = safety_report.model_dump() else: data = json.loads(str(safety_report)) is_safe = data.get("is_safe", False) status = "✅ SAFE — Document passed all safety checks" if is_safe else "❌ UNSAFE — Issues detected" md = f"# Safety Report — {status}\n\n" md += f"**Risk Level:** {data.get('risk_level', 'N/A')}\n\n" if data.get("pii_found"): md += "## PII Detected\n\n" for pii in data["pii_found"]: md += f"- 🔒 {pii}\n" md += "\n*All PII has been redacted before analysis.*\n\n" else: md += "## PII: ✅ None detected\n\n" md += f"## Prompt Injection: {'❌ DETECTED' if data.get('injection_detected') else '✅ None detected'}\n\n" if data.get("malicious_urls"): md += "## Malicious URLs\n\n" for url in data["malicious_urls"]: md += f"- ⚠️ {url}\n" else: md += "## URLs: ✅ No malicious URLs found\n\n" return md except Exception: return f"## Raw Safety Data\n\n{str(safety_report)}" def format_agent_outputs(analysis_results: dict, safety_report=None) -> str: """Format individual agent outputs into a readable markdown view.""" md = "# 📊 Individual Agent Outputs\n\n" md += "Each agent's structured output is shown below.\n\n---\n\n" # Safety Guardian if safety_report: md += "## 🛡️ Agent 1: Safety Guardian\n\n" md += f"- **is_safe:** {safety_report.is_safe}\n" md += f"- **risk_level:** {safety_report.risk_level}\n" md += f"- **injection_detected:** {safety_report.injection_detected}\n" md += f"- **pii_found:** {safety_report.pii_found}\n" md += f"- **malicious_urls:** {safety_report.malicious_urls}\n" md += "\n---\n\n" agents = [ ("📄 Agent 2: Paper Extractor", "paper_extraction"), ("🔬 Agent 3: Methodology Critic", "methodology_critique"), ("🔍 Agent 4: Relevance Researcher", "relevance_report"), ("✍️ Agent 5: Review Synthesizer", "review_draft"), ("📏 Agent 6: Rubric Evaluator", "rubric_evaluation"), ("✨ Agent 7: Enhancer (Final)", "final_review"), ] for title, key in agents: raw = analysis_results.get(key, "{}") md += f"## {title}\n\n" try: data = json.loads(raw) if isinstance(data, dict): for k, v in data.items(): if isinstance(v, list): md += f"**{k}:**\n" for item in v: if isinstance(item, dict): md += f" - {json.dumps(item)}\n" else: md += f" - {item}\n" elif isinstance(v, dict): md += f"**{k}:**\n" for dk, dv in v.items(): md += f" - {dk}: {dv}\n" else: md += f"**{k}:** {v}\n" else: md += f"```\n{raw[:2000]}\n```\n" except (json.JSONDecodeError, TypeError): md += f"```\n{str(raw)[:2000]}\n```\n" md += "\n---\n\n" return md def create_download_file(full_review_md: str) -> str: """Create a temporary markdown file for download.""" if not full_review_md: return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"Paper_Review_{timestamp}.md" # Save to temporary file with open(filename, "w", encoding="utf-8") as f: f.write(full_review_md) return filename # ============================================================ # Main Pipeline — engineering_guardrails.md §3.4 # ============================================================ import gradio as gr # Import gradio for gr.Progress and gr.update def run_analysis_pipeline(pdf_file, progress=gr.Progress()) -> tuple: """Main pipeline with full error handling. Reference: engineering_guardrails.md §3.4 Pipeline-Level Error Handling Returns: (summary, review, score, safety, logs, summary_json, agents, download_file, download_visible) """ pipeline_log = PipelineLogger() empty_agents = "*Upload a PDF and click 'Analyze Paper' to see agent outputs.*" try: # === Step 0: File validation === pipeline_log.log_step("file_validation", "started") progress(0.05, desc="Validating file...") if pdf_file is None: pipeline_log.log_step("file_validation", "failed", error="No file uploaded") error_msg = "## ⚠️ No file uploaded\n\nPlease upload a PDF file." return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False) # Handle Gradio file object file_path = pdf_file.name if hasattr(pdf_file, 'name') else str(pdf_file) if not file_path.endswith(".pdf"): pipeline_log.log_step("file_validation", "failed", error="Not a PDF file") error_msg = "## ⚠️ Invalid file type\n\nPlease upload a PDF file." return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False) pipeline_log.log_step("file_validation", "passed") # === Step 1: Safety Gate === progress(0.1, desc="Running Safety Gate...") safety_result = run_safety_check(file_path, pipeline_log) if not safety_result["success"]: error_msg = f"## ⚠️ Safety check failed\n\n{safety_result.get('error', 'Unknown error')}" safety_md = format_safety_report(safety_result.get("safety_report")) return error_msg, error_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), empty_agents, None, gr.update(visible=False) safety_report = safety_result["safety_report"] if not safety_report.is_safe: pipeline_log.log_step("pipeline", "blocked", details="Safety gate blocked the document") safety_md = format_safety_report(safety_report) block_msg = ( "## 🛡️ Document Blocked by Safety Guardian\n\n" "This document was flagged as potentially unsafe and cannot be analyzed.\n\n" f"**Risk Level:** {safety_report.risk_level}\n\n" "Please review the Safety Report tab for details." ) return block_msg, block_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), format_agent_outputs({}, safety_report), None, gr.update(visible=False) # Get sanitized text for analysis sanitized_text = safety_report.sanitized_text or safety_result.get("raw_text", "") # === Step 2: Main Analysis Crew === pipeline_log.log_step("analysis_crew", "started") # We pass progress directly to the crew function if we want granular update, # but here we'll update it based on steps returning. # Actually simplified: we run the crew function which does sequential steps. # To get progress updates, we'd need to emit them from inside run_analysis_crew # or split run_analysis_crew into individual calls here. # Refactoring run_analysis_crew to be inline here for progress updates! # --- Import Logic --- from crewai import Crew, Process from agents.paper_extractor import paper_extractor, create_extraction_task from agents.methodology_critic import methodology_critic, create_critique_task from agents.relevance_researcher import relevance_researcher, create_research_task from agents.review_synthesizer import review_synthesizer, create_synthesis_task from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task from agents.enhancer import enhancer, create_enhancement_task from tools.citation_search import _reset_call_count _reset_call_count() analysis_results = {} # 1. Extraction progress(0.2, desc="Agent 1/6: Formatting & Extracting...") pipeline_log.log_step("paper_extractor", "started") try: extraction_task = create_extraction_task(sanitized_text) extraction_crew = Crew(agents=[paper_extractor], tasks=[extraction_task], process=Process.sequential, verbose=True) res = extraction_crew.kickoff() paper_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["paper_extraction"] = paper_json pipeline_log.log_step("paper_extractor", "completed") except Exception as e: pipeline_log.log_step("paper_extractor", "failed", error=str(e)) paper_json = json.dumps({"error": str(e)}) # 2a. Critique progress(0.35, desc="Agent 2/6: Critiquing Methodology...") pipeline_log.log_step("methodology_critic", "started") try: critique_task = create_critique_task(paper_json) critique_crew = Crew(agents=[methodology_critic], tasks=[critique_task], process=Process.sequential, verbose=True) res = critique_crew.kickoff() critique_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["methodology_critique"] = critique_json pipeline_log.log_step("methodology_critic", "completed") except Exception as e: pipeline_log.log_step("methodology_critic", "failed", error=str(e)) critique_json = json.dumps({"error": str(e)}) # 2b. Research progress(0.5, desc="Agent 3/6: Searching Related Work...") pipeline_log.log_step("relevance_researcher", "started") try: research_task = create_research_task(paper_json) research_crew = Crew(agents=[relevance_researcher], tasks=[research_task], process=Process.sequential, verbose=True) res = research_crew.kickoff() research_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["relevance_report"] = research_json pipeline_log.log_step("relevance_researcher", "completed") except Exception as e: pipeline_log.log_step("relevance_researcher", "failed", error=str(e)) research_json = json.dumps({"error": str(e)}) # 3. Synthesis progress(0.65, desc="Agent 4/6: Synthesizing Draft...") pipeline_log.log_step("review_synthesizer", "started") try: synthesis_task = create_synthesis_task(paper_json, critique_json, research_json) synthesis_crew = Crew(agents=[review_synthesizer], tasks=[synthesis_task], process=Process.sequential, verbose=True) res = synthesis_crew.kickoff() draft_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["review_draft"] = draft_json pipeline_log.log_step("review_synthesizer", "completed") except Exception as e: pipeline_log.log_step("review_synthesizer", "failed", error=str(e)) draft_json = json.dumps({"error": str(e)}) # 4. Rubric progress(0.8, desc="Agent 5/6: Scoring against Rubric...") pipeline_log.log_step("rubric_evaluator", "started") try: eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json) eval_crew = Crew(agents=[rubric_evaluator], tasks=[eval_task], process=Process.sequential, verbose=True) res = eval_crew.kickoff() rubric_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["rubric_evaluation"] = rubric_json pipeline_log.log_step("rubric_evaluator", "completed") except Exception as e: pipeline_log.log_step("rubric_evaluator", "failed", error=str(e)) rubric_json = json.dumps({"error": str(e)}) # 5. Enhancer progress(0.9, desc="Agent 6/6: Final Polish...") pipeline_log.log_step("enhancer", "started") try: enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json) enhance_crew = Crew(agents=[enhancer], tasks=[enhance_task], process=Process.sequential, verbose=True) res = enhance_crew.kickoff() final_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) analysis_results["final_review"] = final_json pipeline_log.log_step("enhancer", "completed") except Exception as e: pipeline_log.log_step("enhancer", "failed", error=str(e)) final_json = json.dumps({"error": str(e)}) pipeline_log.log_step("analysis_crew", "completed") # === Step 3: Format outputs === progress(0.95, desc="Formatting Report...") executive_summary = format_executive_summary(final_json) full_review = format_full_review(final_json) rubric_scorecard = format_rubric_scorecard(rubric_json) safety_md = format_safety_report(safety_report) agent_outputs_md = format_agent_outputs(analysis_results, safety_report) # Create download file download_file = create_download_file(full_review) progress(1.0, desc="Done!") return ( executive_summary, full_review, rubric_scorecard, safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), agent_outputs_md, download_file, gr.update(visible=True) # Show download button ) except Exception as e: pipeline_log.log_step("pipeline", "critical_failure", error=traceback.format_exc()) error_msg = ( "## ⚠️ Unexpected Error\n\n" "An unexpected error occurred. Please try again.\n\n" f"**Error:** {type(e).__name__}: {str(e)}" ) return ( error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), empty_agents, None, gr.update(visible=False) ) # ============================================================ # Gradio UI — system_design.md Lines 279-287 # ============================================================ import gradio as gr def build_ui(): """Build the Gradio interface with 5 tabs.""" with gr.Blocks( title="AI Research Paper Analyst", theme=gr.themes.Default(primary_hue="blue", secondary_hue="slate"), css=".main-header {text-align: center; margin-bottom: 2rem;} .upload-area {min-height: 150px;}" ) as app: gr.Markdown( """ # 🔬 AI Research Paper Analyst ### Automated Peer-Review System powered by Multi-Agent AI Upload a research paper (PDF) and receive a comprehensive peer-review analysis with methodology critique, novelty assessment, and a structured recommendation. --- """, elem_classes="main-header" ) with gr.Row(): with gr.Column(scale=1): pdf_input = gr.File( label="📄 Upload Research Paper (PDF)", file_types=[".pdf"], type="filepath", elem_classes="upload-area", ) analyze_btn = gr.Button( "🔍 Analyze Paper", variant="primary", size="lg", ) gr.Markdown( "*Analysis typically takes 1-3 minutes depending on paper length.*" ) # Output Components with gr.Tabs(): with gr.Tab("📋 Executive Summary"): summary_output = gr.Markdown( label="Executive Summary", value="*Upload a PDF and click 'Analyze Paper' to begin.*" ) # Download button hidden initially download_btn = gr.DownloadButton( label="💾 Download Full Report (Markdown)", visible=False ) with gr.Tab("📝 Full Review"): review_output = gr.Markdown( label="Full Review Report", value="*Detailed review will appear here after analysis.*" ) with gr.Tab("📊 Rubric Scorecard"): rubric_output = gr.Markdown( label="Rubric Scorecard", value="*Rubric evaluation will appear here after analysis.*" ) with gr.Tab("🛡️ Safety Report"): safety_output = gr.Markdown( label="Safety Report", value="*Safety scan results will appear here after analysis.*" ) with gr.Tab("� Agent Outputs"): agent_outputs = gr.Markdown( label="Per-Agent Structured Outputs", value="*Upload a PDF and click 'Analyze Paper' to see agent outputs.*" ) with gr.Tab("⚙️ Pipeline Logs"): # Renamed tab logs_output = gr.Textbox( label="Execution Log", lines=25, interactive=False, ) summary_json = gr.Code( label="Run Summary (JSON)", language="json" ) analyze_btn.click( fn=run_analysis_pipeline, inputs=[pdf_input], outputs=[ summary_output, review_output, rubric_output, safety_output, logs_output, summary_json, agent_outputs, download_btn, # File component download_btn, # For visibility update ], ) return app # ============================================================ # Entry Point # ============================================================ if __name__ == "__main__": # Verify API key is set if not os.getenv("OPENAI_API_KEY"): print("⚠️ WARNING: OPENAI_API_KEY not found in environment.") print(" Create a .env file with: OPENAI_API_KEY=your-key-here") print(" Or export it: export OPENAI_API_KEY=your-key-here") print() # Launch with environment variables handling port/host ui = build_ui() ui.launch()