| | """ |
| | AI Research Paper Analyst β Main Application |
| | |
| | CrewAI Hierarchical Pipeline + Gradio UI |
| | |
| | Reference: system_design.md β Complete System Architecture |
| | Reference: engineering_guardrails.md β Β§3 Error Handling, Β§5 Observability |
| | |
| | Pipeline Flow: |
| | Gate 1: Safety Guardian β blocks if unsafe |
| | Step 1: Paper Extractor |
| | Step 2: Methodology Critic + Relevance Researcher (parallel concept, sequential in CrewAI) |
| | Step 3: Review Synthesizer |
| | Step 4: Rubric Evaluator |
| | Step 5: Enhancer |
| | Gate 2: Quality Check (programmatic) |
| | """ |
| |
|
| | import os |
| | import sys |
| | import io |
| | import time |
| | import json |
| | import re |
| | import traceback |
| | import threading |
| | from datetime import datetime |
| | from typing import Optional |
| |
|
| | import gradio as gr |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | |
| | |
| | if sys.platform.startswith("linux"): |
| | try: |
| | __import__("pysqlite3") |
| | sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") |
| | except ImportError: |
| | pass |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class PipelineLogger: |
| | """Logs every agent step + tool result for observability. |
| | |
| | SECURITY: Never logs raw PII, API keys, or secrets. |
| | All text content is truncated to prevent accidental PII in logs. |
| | Reference: engineering_guardrails.md Β§5.4 Logging Security Rules |
| | """ |
| |
|
| | def __init__(self): |
| | self.logs: list[dict] = [] |
| | self.start_time = time.time() |
| |
|
| | def log_step( |
| | self, |
| | agent_name: str, |
| | status: str, |
| | details: Optional[str] = None, |
| | error: Optional[str] = None, |
| | tool_name: Optional[str] = None, |
| | tokens_used: Optional[int] = None, |
| | ): |
| | entry = { |
| | "timestamp": datetime.now().isoformat(), |
| | "elapsed_seconds": round(time.time() - self.start_time, 2), |
| | "agent": agent_name, |
| | "status": status, |
| | } |
| |
|
| | if details: |
| | entry["details"] = details |
| | if error: |
| | entry["error"] = self._sanitize(error) |
| | if tool_name: |
| | entry["tool"] = tool_name |
| | if tokens_used: |
| | entry["tokens_used"] = tokens_used |
| |
|
| | self.logs.append(entry) |
| |
|
| | def _sanitize(self, text: str) -> str: |
| | text = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED_API_KEY]', text) |
| | text = re.sub(r'key["\s:=]+["\']?[a-zA-Z0-9]{20,}', '[REDACTED_KEY]', text) |
| | return text |
| |
|
| | def _truncate(self, text: str, max_chars: int = 200) -> str: |
| | if len(text) <= max_chars: |
| | return text |
| | return text[:max_chars] + f"... [truncated, {len(text)} chars total]" |
| |
|
| | def get_summary(self) -> dict: |
| | total_time = round(time.time() - self.start_time, 2) |
| | agent_times = {} |
| | for log in self.logs: |
| | agent = log["agent"] |
| | if agent not in agent_times: |
| | agent_times[agent] = {"status": "unknown", "elapsed": 0} |
| | agent_times[agent]["status"] = log["status"] |
| | agent_times[agent]["elapsed"] = log["elapsed_seconds"] |
| |
|
| | failed_agents = [ |
| | name for name, info in agent_times.items() |
| | if info["status"] == "failed" |
| | ] |
| |
|
| | return { |
| | "total_time_seconds": total_time, |
| | "total_steps": len(self.logs), |
| | "agents_completed": len([ |
| | a for a in agent_times.values() if a["status"] == "completed" |
| | ]), |
| | "agents_failed": failed_agents, |
| | "per_agent": agent_times, |
| | } |
| |
|
| | def get_logs_for_display(self) -> str: |
| | lines = [] |
| | lines.append("=" * 60) |
| | lines.append("PIPELINE EXECUTION LOG") |
| | lines.append("=" * 60) |
| |
|
| | for entry in self.logs: |
| | timestamp = entry["elapsed_seconds"] |
| | agent = entry["agent"] |
| | status = entry["status"] |
| |
|
| | emoji = { |
| | "started": "π", |
| | "completed": "β
", |
| | "failed": "β", |
| | "retrying": "π", |
| | "passed": "β
", |
| | }.get(status, "π") |
| |
|
| | line = f"[{timestamp:>6.1f}s] {emoji} {agent}: {status}" |
| |
|
| | if entry.get("tool"): |
| | line += f" (tool: {entry['tool']})" |
| | if entry.get("details"): |
| | line += f" β {entry['details']}" |
| | if entry.get("error"): |
| | line += f"\n β οΈ Error: {entry['error']}" |
| | if entry.get("tokens_used"): |
| | line += f" [{entry['tokens_used']} tokens]" |
| |
|
| | lines.append(line) |
| |
|
| | summary = self.get_summary() |
| | lines.append("") |
| | lines.append("-" * 60) |
| | lines.append("SUMMARY") |
| | lines.append(f" Total time: {summary['total_time_seconds']}s") |
| | lines.append(f" Steps logged: {summary['total_steps']}") |
| | lines.append(f" Agents completed: {summary['agents_completed']}") |
| | if summary['agents_failed']: |
| | lines.append(f" Agents failed: {', '.join(summary['agents_failed'])}") |
| | lines.append("=" * 60) |
| |
|
| | return "\n".join(lines) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class StreamCapture: |
| | """Captures stdout to show live agent process in the UI. |
| | Tees output to both the real terminal and an internal buffer.""" |
| |
|
| | def __init__(self): |
| | self.buffer = io.StringIO() |
| | self._real_stdout = sys.stdout |
| | self._real_stderr = sys.stderr |
| | self.lock = threading.Lock() |
| |
|
| | def write(self, text): |
| | with self.lock: |
| | self.buffer.write(text) |
| | self._real_stdout.write(text) |
| |
|
| | def flush(self): |
| | self._real_stdout.flush() |
| |
|
| | def get_output(self) -> str: |
| | with self.lock: |
| | return self.buffer.getvalue() |
| |
|
| | def __enter__(self): |
| | sys.stdout = self |
| | sys.stderr = self |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | sys.stdout = self._real_stdout |
| | sys.stderr = self._real_stderr |
| |
|
| | def isatty(self): |
| | """Mock isatty check to prevent AttributeError.""" |
| | return False |
| |
|
| | def fileno(self): |
| | """Delegate fileno to real stdout if needed.""" |
| | return self._real_stdout.fileno() |
| |
|
| | @property |
| | def encoding(self): |
| | return self._real_stdout.encoding |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def run_safety_check(file_path: str, pipeline_log: PipelineLogger) -> dict: |
| | """Run safety checks programmatically β no LLM needed. |
| | |
| | Calls the 3 safety tools directly as Python functions for speed and accuracy. |
| | This avoids LLM hallucinations and takes <1 second instead of ~480s. |
| | """ |
| | import json as _json |
| | from tools.pdf_parser import pdf_parser_tool |
| | from tools.pii_detector import pii_detector_tool |
| | from tools.injection_scanner import prompt_injection_scanner_tool |
| | from tools.url_validator import url_validator_tool |
| | from schemas.models import SafetyReport |
| |
|
| | pipeline_log.log_step("safety_guardian", "started") |
| |
|
| | |
| | pipeline_log.log_step("safety_guardian", "started", tool_name="pdf_parser", |
| | details="Extracting raw text from PDF") |
| | raw_text = pdf_parser_tool.run(file_path) |
| |
|
| | if raw_text.startswith("ERROR:"): |
| | pipeline_log.log_step("safety_guardian", "failed", |
| | error=f"PDF extraction failed: {raw_text}") |
| | return { |
| | "success": False, |
| | "error": raw_text, |
| | "safety_report": None, |
| | "raw_text": None, |
| | } |
| |
|
| | pipeline_log.log_step("safety_guardian", "completed", tool_name="pdf_parser", |
| | details=f"{len(raw_text)} chars extracted") |
| |
|
| | |
| | |
| | scan_text = raw_text[:15000] |
| |
|
| | |
| | pipeline_log.log_step("safety_guardian", "started", tool_name="pii_detector", |
| | details="Scanning for PII") |
| | pii_result = _json.loads(pii_detector_tool.run(scan_text)) |
| | pii_found = pii_result.get("findings", []) |
| | redacted_text = pii_result.get("redacted_text", raw_text) |
| | |
| | if len(raw_text) > 15000: |
| | redacted_text = redacted_text + raw_text[15000:] |
| | pipeline_log.log_step("safety_guardian", "completed", tool_name="pii_detector", |
| | details=f"{len(pii_found)} PII types found") |
| |
|
| | |
| | pipeline_log.log_step("safety_guardian", "started", tool_name="injection_scanner", |
| | details="Checking for prompt injections") |
| | injection_result = _json.loads(prompt_injection_scanner_tool.run(scan_text)) |
| | injection_detected = not injection_result.get("is_safe", True) |
| | pipeline_log.log_step("safety_guardian", "completed", tool_name="injection_scanner", |
| | details=f"injection_detected={injection_detected}") |
| |
|
| | |
| | pipeline_log.log_step("safety_guardian", "started", tool_name="url_validator", |
| | details="Validating URLs") |
| | url_result = _json.loads(url_validator_tool.run(scan_text)) |
| | malicious_urls = url_result.get("malicious_urls", []) |
| | pipeline_log.log_step("safety_guardian", "completed", tool_name="url_validator", |
| | details=f"{len(malicious_urls)} malicious URLs found") |
| |
|
| | |
| | is_safe = (not injection_detected) and (len(malicious_urls) == 0) |
| |
|
| | if injection_detected or len(malicious_urls) > 0: |
| | risk_level = "high" |
| | elif len(pii_found) > 0: |
| | risk_level = "medium" |
| | else: |
| | risk_level = "low" |
| |
|
| | safety_report = SafetyReport( |
| | is_safe=is_safe, |
| | pii_found=pii_found, |
| | injection_detected=injection_detected, |
| | malicious_urls=malicious_urls, |
| | sanitized_text=redacted_text, |
| | risk_level=risk_level, |
| | ) |
| |
|
| | pipeline_log.log_step("safety_guardian", "completed", |
| | details=f"is_safe={is_safe}, risk_level={risk_level}") |
| |
|
| | return { |
| | "success": True, |
| | "safety_report": safety_report, |
| | "raw_text": raw_text, |
| | } |
| |
|
| |
|
| | def run_analysis_crew(sanitized_text: str, pipeline_log: PipelineLogger) -> dict: |
| | """Run the main analysis crew (Steps 1-5). |
| | |
| | Reference: system_design.md β Manager Delegation Order (Lines 59-64) |
| | Reference: engineering_guardrails.md β Β§3 Error Handling + Β§5.3 Callbacks |
| | """ |
| | from crewai import Crew, Process |
| |
|
| | from agents.paper_extractor import paper_extractor, create_extraction_task |
| | from agents.methodology_critic import methodology_critic, create_critique_task |
| | from agents.relevance_researcher import relevance_researcher, create_research_task |
| | from agents.review_synthesizer import review_synthesizer, create_synthesis_task |
| | from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task |
| | from agents.enhancer import enhancer, create_enhancement_task |
| |
|
| | from tools.citation_search import _reset_call_count |
| | _reset_call_count() |
| |
|
| | |
| | pipeline_log.log_step("paper_extractor", "started") |
| | try: |
| | extraction_task = create_extraction_task(sanitized_text) |
| | extraction_crew = Crew( |
| | agents=[paper_extractor], |
| | tasks=[extraction_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | extraction_result = extraction_crew.kickoff() |
| |
|
| | if hasattr(extraction_result, 'pydantic') and extraction_result.pydantic: |
| | paper_data = extraction_result.pydantic |
| | paper_json = paper_data.model_dump_json(indent=2) |
| | else: |
| | paper_json = str(extraction_result.raw) if hasattr(extraction_result, 'raw') else str(extraction_result) |
| |
|
| | pipeline_log.log_step("paper_extractor", "completed", |
| | details=f"Extracted paper data") |
| | except Exception as e: |
| | pipeline_log.log_step("paper_extractor", "failed", error=str(e)) |
| | paper_json = json.dumps({"error": f"Extraction failed: {str(e)}", "raw_text": sanitized_text[:5000]}) |
| |
|
| | |
| | pipeline_log.log_step("methodology_critic", "started") |
| | try: |
| | critique_task = create_critique_task(paper_json) |
| | critique_crew = Crew( |
| | agents=[methodology_critic], |
| | tasks=[critique_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | critique_result = critique_crew.kickoff() |
| |
|
| | if hasattr(critique_result, 'pydantic') and critique_result.pydantic: |
| | critique_data = critique_result.pydantic |
| | critique_json = critique_data.model_dump_json(indent=2) |
| | else: |
| | critique_json = str(critique_result.raw) if hasattr(critique_result, 'raw') else str(critique_result) |
| |
|
| | pipeline_log.log_step("methodology_critic", "completed", |
| | details="Critique completed") |
| | except Exception as e: |
| | pipeline_log.log_step("methodology_critic", "failed", error=str(e)) |
| | critique_json = json.dumps({"error": f"Critique failed: {str(e)}"}) |
| |
|
| | |
| | pipeline_log.log_step("relevance_researcher", "started") |
| | try: |
| | research_task = create_research_task(paper_json) |
| | research_crew = Crew( |
| | agents=[relevance_researcher], |
| | tasks=[research_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | research_result = research_crew.kickoff() |
| |
|
| | if hasattr(research_result, 'pydantic') and research_result.pydantic: |
| | research_data = research_result.pydantic |
| | research_json = research_data.model_dump_json(indent=2) |
| | else: |
| | research_json = str(research_result.raw) if hasattr(research_result, 'raw') else str(research_result) |
| |
|
| | pipeline_log.log_step("relevance_researcher", "completed", |
| | details="Research completed") |
| | except Exception as e: |
| | pipeline_log.log_step("relevance_researcher", "failed", error=str(e)) |
| | research_json = json.dumps({"error": f"Research failed: {str(e)}"}) |
| |
|
| | |
| | pipeline_log.log_step("review_synthesizer", "started") |
| | try: |
| | synthesis_task = create_synthesis_task(paper_json, critique_json, research_json) |
| | synthesis_crew = Crew( |
| | agents=[review_synthesizer], |
| | tasks=[synthesis_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | synthesis_result = synthesis_crew.kickoff() |
| |
|
| | if hasattr(synthesis_result, 'pydantic') and synthesis_result.pydantic: |
| | draft_data = synthesis_result.pydantic |
| | draft_json = draft_data.model_dump_json(indent=2) |
| | else: |
| | draft_json = str(synthesis_result.raw) if hasattr(synthesis_result, 'raw') else str(synthesis_result) |
| |
|
| | pipeline_log.log_step("review_synthesizer", "completed", |
| | details="Draft review synthesized") |
| | except Exception as e: |
| | pipeline_log.log_step("review_synthesizer", "failed", error=str(e)) |
| | draft_json = json.dumps({"error": f"Synthesis failed: {str(e)}"}) |
| |
|
| | |
| | pipeline_log.log_step("rubric_evaluator", "started") |
| | try: |
| | eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json) |
| | eval_crew = Crew( |
| | agents=[rubric_evaluator], |
| | tasks=[eval_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | eval_result = eval_crew.kickoff() |
| |
|
| | if hasattr(eval_result, 'pydantic') and eval_result.pydantic: |
| | rubric_data = eval_result.pydantic |
| | rubric_json = rubric_data.model_dump_json(indent=2) |
| | else: |
| | rubric_json = str(eval_result.raw) if hasattr(eval_result, 'raw') else str(eval_result) |
| |
|
| | pipeline_log.log_step("rubric_evaluator", "completed", |
| | details=f"Rubric evaluation completed") |
| | except Exception as e: |
| | pipeline_log.log_step("rubric_evaluator", "failed", error=str(e)) |
| | rubric_json = json.dumps({"error": f"Evaluation failed: {str(e)}"}) |
| |
|
| | |
| | pipeline_log.log_step("enhancer", "started") |
| | try: |
| | enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json) |
| | enhance_crew = Crew( |
| | agents=[enhancer], |
| | tasks=[enhance_task], |
| | process=Process.sequential, |
| | verbose=True, |
| | ) |
| | enhance_result = enhance_crew.kickoff() |
| |
|
| | if hasattr(enhance_result, 'pydantic') and enhance_result.pydantic: |
| | final_data = enhance_result.pydantic |
| | final_json = final_data.model_dump_json(indent=2) |
| | else: |
| | final_json = str(enhance_result.raw) if hasattr(enhance_result, 'raw') else str(enhance_result) |
| |
|
| | pipeline_log.log_step("enhancer", "completed", |
| | details="Final review produced") |
| | except Exception as e: |
| | pipeline_log.log_step("enhancer", "failed", error=str(e)) |
| | final_json = json.dumps({"error": f"Enhancement failed: {str(e)}"}) |
| |
|
| | return { |
| | "paper_extraction": paper_json, |
| | "methodology_critique": critique_json, |
| | "relevance_report": research_json, |
| | "review_draft": draft_json, |
| | "rubric_evaluation": rubric_json, |
| | "final_review": final_json, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def format_executive_summary(final_json: str) -> str: |
| | """Format the executive summary tab.""" |
| | try: |
| | data = json.loads(final_json) |
| | if "error" in data: |
| | return f"## β οΈ Error\n\n{data['error']}" |
| |
|
| | recommendation = data.get("recommendation", "N/A") |
| | emoji = {"Accept": "β
", "Revise": "π", "Reject": "β"}.get(recommendation, "β") |
| |
|
| | md = f"# {emoji} Recommendation: **{recommendation}**\n\n" |
| | md += f"**Confidence:** {data.get('confidence_score', 'N/A')}/5\n\n" |
| | md += f"**Rubric Score:** {data.get('rubric_total', 'N/A')}/15\n\n" |
| | md += "---\n\n" |
| | md += f"## Executive Summary\n\n{data.get('executive_summary', 'N/A')}\n\n" |
| |
|
| | metadata = data.get("paper_metadata", {}) |
| | if metadata: |
| | md += "---\n\n## Paper Information\n\n" |
| | md += f"- **Title:** {metadata.get('title', 'N/A')}\n" |
| | md += f"- **Authors:** {metadata.get('authors', 'N/A')}\n" |
| | md += f"- **Type:** {metadata.get('paper_type', 'N/A')}\n" |
| |
|
| | return md |
| | except (json.JSONDecodeError, KeyError): |
| | return f"## Raw Output\n\n{final_json}" |
| |
|
| |
|
| | def format_full_review(final_json: str) -> str: |
| | """Format the full review tab.""" |
| | try: |
| | data = json.loads(final_json) |
| | if "error" in data: |
| | return f"## β οΈ Error\n\n{data['error']}" |
| |
|
| | md = "# Full Peer Review Report\n\n" |
| |
|
| | md += "## Strengths\n\n" |
| | for s in data.get("strengths", []): |
| | md += f"- {s}\n" |
| |
|
| | md += "\n## Weaknesses\n\n" |
| | for w in data.get("weaknesses", []): |
| | md += f"- {w}\n" |
| |
|
| | md += f"\n## Methodology Assessment\n\n{data.get('methodology_assessment', 'N/A')}\n" |
| | md += f"\n## Novelty Assessment\n\n{data.get('novelty_assessment', 'N/A')}\n" |
| | md += f"\n## Related Work Context\n\n{data.get('related_work_context', 'N/A')}\n" |
| |
|
| | md += "\n## Questions for Authors\n\n" |
| | for q in data.get("questions_for_authors", []): |
| | md += f"1. {q}\n" |
| |
|
| | if data.get("improvement_log"): |
| | md += "\n## Enhancement Log\n\n" |
| | for item in data["improvement_log"]: |
| | md += f"- {item}\n" |
| |
|
| | return md |
| | except (json.JSONDecodeError, KeyError): |
| | return f"## Raw Output\n\n{final_json}" |
| |
|
| |
|
| | def format_rubric_scorecard(rubric_json: str) -> str: |
| | """Format the rubric scorecard tab.""" |
| | try: |
| | data = json.loads(rubric_json) |
| | if "error" in data: |
| | return f"## β οΈ Error\n\n{data['error']}" |
| |
|
| | total = data.get("total_score", "?") |
| | passed = data.get("passed", False) |
| | status_emoji = "β
PASSED" if passed else "β NEEDS IMPROVEMENT" |
| |
|
| | md = f"# Rubric Scorecard β {total}/15 {status_emoji}\n\n" |
| |
|
| | scores = data.get("scores", {}) |
| | feedback = data.get("feedback_per_criterion", {}) |
| |
|
| | categories = [ |
| | ("π Content Completeness", [ |
| | "title_authors_correct", "abstract_summarized", |
| | "methodology_described", "strengths_sufficient", |
| | "weaknesses_sufficient", "limitations_acknowledged", |
| | "related_work_present", |
| | ]), |
| | ("π¬ Analytical Depth", [ |
| | "novelty_assessed", "reproducibility_discussed", |
| | "evidence_quality_evaluated", "contribution_stated", |
| | ]), |
| | ("π Review Quality", [ |
| | "recommendation_justified", "actionable_questions", |
| | "no_hallucinated_citations", "professional_coherent", |
| | ]), |
| | ] |
| |
|
| | for cat_name, criteria in categories: |
| | md += f"### {cat_name}\n\n" |
| | md += "| # | Criterion | Score | Feedback |\n" |
| | md += "|---|-----------|-------|----------|\n" |
| | for name in criteria: |
| | score = scores.get(name, "?") |
| | score_emoji = "β
" if score == 1 else "β" if score == 0 else "β" |
| | fb = feedback.get(name, "N/A") |
| | display_name = name.replace("_", " ").title() |
| | md += f"| | {display_name} | {score_emoji} {score} | {fb} |\n" |
| | md += "\n" |
| |
|
| | return md |
| | except (json.JSONDecodeError, KeyError): |
| | return f"## Raw Output\n\n{rubric_json}" |
| |
|
| |
|
| | def format_safety_report(safety_report) -> str: |
| | """Format the safety report tab.""" |
| | if safety_report is None: |
| | return "## β οΈ Safety check was not completed." |
| |
|
| | try: |
| | if hasattr(safety_report, 'model_dump'): |
| | data = safety_report.model_dump() |
| | else: |
| | data = json.loads(str(safety_report)) |
| |
|
| | is_safe = data.get("is_safe", False) |
| | status = "β
SAFE β Document passed all safety checks" if is_safe else "β UNSAFE β Issues detected" |
| |
|
| | md = f"# Safety Report β {status}\n\n" |
| | md += f"**Risk Level:** {data.get('risk_level', 'N/A')}\n\n" |
| |
|
| | if data.get("pii_found"): |
| | md += "## PII Detected\n\n" |
| | for pii in data["pii_found"]: |
| | md += f"- π {pii}\n" |
| | md += "\n*All PII has been redacted before analysis.*\n\n" |
| | else: |
| | md += "## PII: β
None detected\n\n" |
| |
|
| | md += f"## Prompt Injection: {'β DETECTED' if data.get('injection_detected') else 'β
None detected'}\n\n" |
| |
|
| | if data.get("malicious_urls"): |
| | md += "## Malicious URLs\n\n" |
| | for url in data["malicious_urls"]: |
| | md += f"- β οΈ {url}\n" |
| | else: |
| | md += "## URLs: β
No malicious URLs found\n\n" |
| |
|
| | return md |
| | except Exception: |
| | return f"## Raw Safety Data\n\n{str(safety_report)}" |
| |
|
| |
|
| | def format_agent_outputs(analysis_results: dict, safety_report=None) -> str: |
| | """Format individual agent outputs into a readable markdown view.""" |
| | md = "# π Individual Agent Outputs\n\n" |
| | md += "Each agent's structured output is shown below.\n\n---\n\n" |
| |
|
| | |
| | if safety_report: |
| | md += "## π‘οΈ Agent 1: Safety Guardian\n\n" |
| | md += f"- **is_safe:** {safety_report.is_safe}\n" |
| | md += f"- **risk_level:** {safety_report.risk_level}\n" |
| | md += f"- **injection_detected:** {safety_report.injection_detected}\n" |
| | md += f"- **pii_found:** {safety_report.pii_found}\n" |
| | md += f"- **malicious_urls:** {safety_report.malicious_urls}\n" |
| | md += "\n---\n\n" |
| |
|
| | agents = [ |
| | ("π Agent 2: Paper Extractor", "paper_extraction"), |
| | ("π¬ Agent 3: Methodology Critic", "methodology_critique"), |
| | ("π Agent 4: Relevance Researcher", "relevance_report"), |
| | ("βοΈ Agent 5: Review Synthesizer", "review_draft"), |
| | ("π Agent 6: Rubric Evaluator", "rubric_evaluation"), |
| | ("β¨ Agent 7: Enhancer (Final)", "final_review"), |
| | ] |
| |
|
| | for title, key in agents: |
| | raw = analysis_results.get(key, "{}") |
| | md += f"## {title}\n\n" |
| | try: |
| | data = json.loads(raw) |
| | if isinstance(data, dict): |
| | for k, v in data.items(): |
| | if isinstance(v, list): |
| | md += f"**{k}:**\n" |
| | for item in v: |
| | if isinstance(item, dict): |
| | md += f" - {json.dumps(item)}\n" |
| | else: |
| | md += f" - {item}\n" |
| | elif isinstance(v, dict): |
| | md += f"**{k}:**\n" |
| | for dk, dv in v.items(): |
| | md += f" - {dk}: {dv}\n" |
| | else: |
| | md += f"**{k}:** {v}\n" |
| | else: |
| | md += f"```\n{raw[:2000]}\n```\n" |
| | except (json.JSONDecodeError, TypeError): |
| | md += f"```\n{str(raw)[:2000]}\n```\n" |
| | md += "\n---\n\n" |
| |
|
| | return md |
| |
|
| |
|
| | def create_download_file(full_review_md: str) -> str: |
| | """Create a temporary markdown file for download.""" |
| | if not full_review_md: |
| | return None |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"Paper_Review_{timestamp}.md" |
| | |
| | |
| | with open(filename, "w", encoding="utf-8") as f: |
| | f.write(full_review_md) |
| | |
| | return filename |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | import gradio as gr |
| |
|
| | def run_analysis_pipeline(pdf_file, progress=gr.Progress()) -> tuple: |
| | """Main pipeline with full error handling. |
| | |
| | Reference: engineering_guardrails.md Β§3.4 Pipeline-Level Error Handling |
| | Returns: |
| | (summary, review, score, safety, logs, summary_json, agents, download_file, download_visible) |
| | """ |
| | pipeline_log = PipelineLogger() |
| | empty_agents = "*Upload a PDF and click 'Analyze Paper' to see agent outputs.*" |
| |
|
| | try: |
| | |
| | pipeline_log.log_step("file_validation", "started") |
| | progress(0.05, desc="Validating file...") |
| |
|
| | if pdf_file is None: |
| | pipeline_log.log_step("file_validation", "failed", error="No file uploaded") |
| | error_msg = "## β οΈ No file uploaded\n\nPlease upload a PDF file." |
| | return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False) |
| |
|
| | |
| | file_path = pdf_file.name if hasattr(pdf_file, 'name') else str(pdf_file) |
| |
|
| | if not file_path.endswith(".pdf"): |
| | pipeline_log.log_step("file_validation", "failed", error="Not a PDF file") |
| | error_msg = "## β οΈ Invalid file type\n\nPlease upload a PDF file." |
| | return error_msg, error_msg, "", "", pipeline_log.get_logs_for_display(), "{}", empty_agents, None, gr.update(visible=False) |
| |
|
| | pipeline_log.log_step("file_validation", "passed") |
| |
|
| | |
| | progress(0.1, desc="Running Safety Gate...") |
| | safety_result = run_safety_check(file_path, pipeline_log) |
| |
|
| | if not safety_result["success"]: |
| | error_msg = f"## β οΈ Safety check failed\n\n{safety_result.get('error', 'Unknown error')}" |
| | safety_md = format_safety_report(safety_result.get("safety_report")) |
| |
|
| | return error_msg, error_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), empty_agents, None, gr.update(visible=False) |
| |
|
| | safety_report = safety_result["safety_report"] |
| |
|
| | if not safety_report.is_safe: |
| | pipeline_log.log_step("pipeline", "blocked", details="Safety gate blocked the document") |
| | safety_md = format_safety_report(safety_report) |
| | block_msg = ( |
| | "## π‘οΈ Document Blocked by Safety Guardian\n\n" |
| | "This document was flagged as potentially unsafe and cannot be analyzed.\n\n" |
| | f"**Risk Level:** {safety_report.risk_level}\n\n" |
| | "Please review the Safety Report tab for details." |
| | ) |
| |
|
| | return block_msg, block_msg, "", safety_md, pipeline_log.get_logs_for_display(), json.dumps(pipeline_log.get_summary(), indent=2), format_agent_outputs({}, safety_report), None, gr.update(visible=False) |
| |
|
| | |
| | sanitized_text = safety_report.sanitized_text or safety_result.get("raw_text", "") |
| |
|
| | |
| | pipeline_log.log_step("analysis_crew", "started") |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from crewai import Crew, Process |
| | from agents.paper_extractor import paper_extractor, create_extraction_task |
| | from agents.methodology_critic import methodology_critic, create_critique_task |
| | from agents.relevance_researcher import relevance_researcher, create_research_task |
| | from agents.review_synthesizer import review_synthesizer, create_synthesis_task |
| | from agents.rubric_evaluator import rubric_evaluator, create_evaluation_task |
| | from agents.enhancer import enhancer, create_enhancement_task |
| | from tools.citation_search import _reset_call_count |
| | |
| | _reset_call_count() |
| | analysis_results = {} |
| |
|
| | |
| | progress(0.2, desc="Agent 1/6: Formatting & Extracting...") |
| | pipeline_log.log_step("paper_extractor", "started") |
| | try: |
| | extraction_task = create_extraction_task(sanitized_text) |
| | extraction_crew = Crew(agents=[paper_extractor], tasks=[extraction_task], process=Process.sequential, verbose=True) |
| | res = extraction_crew.kickoff() |
| | paper_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["paper_extraction"] = paper_json |
| | pipeline_log.log_step("paper_extractor", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("paper_extractor", "failed", error=str(e)) |
| | paper_json = json.dumps({"error": str(e)}) |
| |
|
| | |
| | progress(0.35, desc="Agent 2/6: Critiquing Methodology...") |
| | pipeline_log.log_step("methodology_critic", "started") |
| | try: |
| | critique_task = create_critique_task(paper_json) |
| | critique_crew = Crew(agents=[methodology_critic], tasks=[critique_task], process=Process.sequential, verbose=True) |
| | res = critique_crew.kickoff() |
| | critique_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["methodology_critique"] = critique_json |
| | pipeline_log.log_step("methodology_critic", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("methodology_critic", "failed", error=str(e)) |
| | critique_json = json.dumps({"error": str(e)}) |
| |
|
| | |
| | progress(0.5, desc="Agent 3/6: Searching Related Work...") |
| | pipeline_log.log_step("relevance_researcher", "started") |
| | try: |
| | research_task = create_research_task(paper_json) |
| | research_crew = Crew(agents=[relevance_researcher], tasks=[research_task], process=Process.sequential, verbose=True) |
| | res = research_crew.kickoff() |
| | research_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["relevance_report"] = research_json |
| | pipeline_log.log_step("relevance_researcher", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("relevance_researcher", "failed", error=str(e)) |
| | research_json = json.dumps({"error": str(e)}) |
| |
|
| | |
| | progress(0.65, desc="Agent 4/6: Synthesizing Draft...") |
| | pipeline_log.log_step("review_synthesizer", "started") |
| | try: |
| | synthesis_task = create_synthesis_task(paper_json, critique_json, research_json) |
| | synthesis_crew = Crew(agents=[review_synthesizer], tasks=[synthesis_task], process=Process.sequential, verbose=True) |
| | res = synthesis_crew.kickoff() |
| | draft_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["review_draft"] = draft_json |
| | pipeline_log.log_step("review_synthesizer", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("review_synthesizer", "failed", error=str(e)) |
| | draft_json = json.dumps({"error": str(e)}) |
| |
|
| | |
| | progress(0.8, desc="Agent 5/6: Scoring against Rubric...") |
| | pipeline_log.log_step("rubric_evaluator", "started") |
| | try: |
| | eval_task = create_evaluation_task(draft_json, paper_json, critique_json, research_json) |
| | eval_crew = Crew(agents=[rubric_evaluator], tasks=[eval_task], process=Process.sequential, verbose=True) |
| | res = eval_crew.kickoff() |
| | rubric_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["rubric_evaluation"] = rubric_json |
| | pipeline_log.log_step("rubric_evaluator", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("rubric_evaluator", "failed", error=str(e)) |
| | rubric_json = json.dumps({"error": str(e)}) |
| |
|
| | |
| | progress(0.9, desc="Agent 6/6: Final Polish...") |
| | pipeline_log.log_step("enhancer", "started") |
| | try: |
| | enhance_task = create_enhancement_task(draft_json, rubric_json, paper_json) |
| | enhance_crew = Crew(agents=[enhancer], tasks=[enhance_task], process=Process.sequential, verbose=True) |
| | res = enhance_crew.kickoff() |
| | final_json = res.pydantic.model_dump_json(indent=2) if hasattr(res, 'pydantic') and res.pydantic else str(res.raw) |
| | analysis_results["final_review"] = final_json |
| | pipeline_log.log_step("enhancer", "completed") |
| | except Exception as e: |
| | pipeline_log.log_step("enhancer", "failed", error=str(e)) |
| | final_json = json.dumps({"error": str(e)}) |
| |
|
| | pipeline_log.log_step("analysis_crew", "completed") |
| |
|
| | |
| | progress(0.95, desc="Formatting Report...") |
| | |
| | executive_summary = format_executive_summary(final_json) |
| | full_review = format_full_review(final_json) |
| | rubric_scorecard = format_rubric_scorecard(rubric_json) |
| | safety_md = format_safety_report(safety_report) |
| | agent_outputs_md = format_agent_outputs(analysis_results, safety_report) |
| | |
| | |
| | download_file = create_download_file(full_review) |
| | |
| | progress(1.0, desc="Done!") |
| |
|
| | return ( |
| | executive_summary, |
| | full_review, |
| | rubric_scorecard, |
| | safety_md, |
| | pipeline_log.get_logs_for_display(), |
| | json.dumps(pipeline_log.get_summary(), indent=2), |
| | agent_outputs_md, |
| | download_file, |
| | gr.update(visible=True) |
| | ) |
| |
|
| | except Exception as e: |
| | pipeline_log.log_step("pipeline", "critical_failure", error=traceback.format_exc()) |
| | error_msg = ( |
| | "## β οΈ Unexpected Error\n\n" |
| | "An unexpected error occurred. Please try again.\n\n" |
| | f"**Error:** {type(e).__name__}: {str(e)}" |
| | ) |
| | return ( |
| | error_msg, error_msg, "", "", |
| | pipeline_log.get_logs_for_display(), |
| | json.dumps(pipeline_log.get_summary(), indent=2), |
| | empty_agents, |
| | None, |
| | gr.update(visible=False) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | import gradio as gr |
| |
|
| | def build_ui(): |
| | """Build the Gradio interface with 5 tabs.""" |
| |
|
| | with gr.Blocks( |
| | title="AI Research Paper Analyst", |
| | theme=gr.themes.Default(primary_hue="blue", secondary_hue="slate"), |
| | css=".main-header {text-align: center; margin-bottom: 2rem;} .upload-area {min-height: 150px;}" |
| | ) as app: |
| |
|
| | gr.Markdown( |
| | """ |
| | # π¬ AI Research Paper Analyst |
| | ### Automated Peer-Review System powered by Multi-Agent AI |
| | |
| | Upload a research paper (PDF) and receive a comprehensive peer-review analysis |
| | with methodology critique, novelty assessment, and a structured recommendation. |
| | |
| | --- |
| | """, |
| | elem_classes="main-header" |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | pdf_input = gr.File( |
| | label="π Upload Research Paper (PDF)", |
| | file_types=[".pdf"], |
| | type="filepath", |
| | elem_classes="upload-area", |
| | ) |
| | analyze_btn = gr.Button( |
| | "π Analyze Paper", |
| | variant="primary", |
| | size="lg", |
| | ) |
| | gr.Markdown( |
| | "*Analysis typically takes 1-3 minutes depending on paper length.*" |
| | ) |
| |
|
| | |
| | with gr.Tabs(): |
| | with gr.Tab("π Executive Summary"): |
| | summary_output = gr.Markdown( |
| | label="Executive Summary", |
| | value="*Upload a PDF and click 'Analyze Paper' to begin.*" |
| | ) |
| | |
| | download_btn = gr.DownloadButton( |
| | label="πΎ Download Full Report (Markdown)", |
| | visible=False |
| | ) |
| |
|
| | with gr.Tab("π Full Review"): |
| | review_output = gr.Markdown( |
| | label="Full Review Report", |
| | value="*Detailed review will appear here after analysis.*" |
| | ) |
| |
|
| | with gr.Tab("π Rubric Scorecard"): |
| | rubric_output = gr.Markdown( |
| | label="Rubric Scorecard", |
| | value="*Rubric evaluation will appear here after analysis.*" |
| | ) |
| |
|
| | with gr.Tab("π‘οΈ Safety Report"): |
| | safety_output = gr.Markdown( |
| | label="Safety Report", |
| | value="*Safety scan results will appear here after analysis.*" |
| | ) |
| |
|
| | with gr.Tab("οΏ½ Agent Outputs"): |
| | agent_outputs = gr.Markdown( |
| | label="Per-Agent Structured Outputs", |
| | value="*Upload a PDF and click 'Analyze Paper' to see agent outputs.*" |
| | ) |
| |
|
| | with gr.Tab("βοΈ Pipeline Logs"): |
| | logs_output = gr.Textbox( |
| | label="Execution Log", |
| | lines=25, |
| | interactive=False, |
| | ) |
| | summary_json = gr.Code( |
| | label="Run Summary (JSON)", |
| | language="json" |
| | ) |
| |
|
| | analyze_btn.click( |
| | fn=run_analysis_pipeline, |
| | inputs=[pdf_input], |
| | outputs=[ |
| | summary_output, |
| | review_output, |
| | rubric_output, |
| | safety_output, |
| | logs_output, |
| | summary_json, |
| | agent_outputs, |
| | download_btn, |
| | download_btn, |
| | ], |
| | ) |
| |
|
| | return app |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | |
| | if not os.getenv("OPENAI_API_KEY"): |
| | print("β οΈ WARNING: OPENAI_API_KEY not found in environment.") |
| | print(" Create a .env file with: OPENAI_API_KEY=your-key-here") |
| | print(" Or export it: export OPENAI_API_KEY=your-key-here") |
| | print() |
| |
|
| | |
| | ui = build_ui() |
| | ui.launch() |
| |
|