""" MediGuard AI RAG-Helper - Evolution Engine Outer Loop Director for SOP Evolution """ from collections.abc import Callable from typing import Any, Literal from pydantic import BaseModel, Field from src.config import ExplanationSOP from src.evaluation.evaluators import EvaluationResult class SOPGenePool: """Manages version control for evolving SOPs""" def __init__(self): self.pool: list[dict[str, Any]] = [] self.gene_pool: list[dict[str, Any]] = [] # Alias for compatibility self.version_counter = 0 def add( self, sop: ExplanationSOP, evaluation: EvaluationResult, parent_version: int | None = None, description: str = "", ): """Add a new SOP to the gene pool""" self.version_counter += 1 entry = { "version": self.version_counter, "sop": sop, "evaluation": evaluation, "parent": parent_version, "description": description, } self.pool.append(entry) self.gene_pool = self.pool # Keep in sync print(f"āœ“ Added SOP v{self.version_counter} to gene pool: {description}") def get_latest(self) -> dict[str, Any] | None: """Get the most recent SOP""" return self.pool[-1] if self.pool else None def get_by_version(self, version: int) -> dict[str, Any] | None: """Retrieve specific SOP version""" for entry in self.pool: if entry["version"] == version: return entry return None def get_best_by_metric(self, metric: str) -> dict[str, Any] | None: """Get SOP with highest score on specific metric""" if not self.pool: return None best = max(self.pool, key=lambda x: getattr(x["evaluation"], metric).score) return best def summary(self): """Print summary of all SOPs in pool""" print("\n" + "=" * 80) print("SOP GENE POOL SUMMARY") print("=" * 80) for entry in self.pool: v = entry["version"] p = entry["parent"] desc = entry["description"] e = entry["evaluation"] parent_str = "(Baseline)" if p is None else f"(Child of v{p})" print(f"\nSOP v{v} {parent_str}: {desc}") print(f" Clinical Accuracy: {e.clinical_accuracy.score:.2f}") print(f" Evidence Grounding: {e.evidence_grounding.score:.2f}") print(f" Actionability: {e.actionability.score:.2f}") print(f" Clarity: {e.clarity.score:.2f}") print(f" Safety & Completeness: {e.safety_completeness.score:.2f}") print("\n" + "=" * 80) class Diagnosis(BaseModel): """Structured diagnosis from Performance Diagnostician""" primary_weakness: Literal[ "clinical_accuracy", "evidence_grounding", "actionability", "clarity", "safety_completeness" ] root_cause_analysis: str = Field(description="Detailed analysis of why weakness occurred") recommendation: str = Field(description="High-level recommendation to fix the problem") class SOPMutation(BaseModel): """Single mutated SOP with description""" description: str = Field(description="Brief description of mutation strategy") # SOP fields from ExplanationSOP biomarker_analyzer_threshold: float = 0.15 disease_explainer_k: int = 5 linker_retrieval_k: int = 3 guideline_retrieval_k: int = 3 explainer_detail_level: Literal["concise", "detailed", "comprehensive"] = "detailed" use_guideline_agent: bool = True include_alternative_diagnoses: bool = True require_pdf_citations: bool = True use_confidence_assessor: bool = True critical_value_alert_mode: Literal["strict", "moderate", "permissive"] = "strict" class EvolvedSOPs(BaseModel): """Container for mutated SOPs from Architect""" mutations: list[SOPMutation] def performance_diagnostician(evaluation: EvaluationResult) -> Diagnosis: """ Analyzes 5D scores to identify primary weakness. Uses programmatic analysis for reliability and speed. """ print("\n" + "=" * 70) print("EXECUTING: Performance Diagnostician") print("=" * 70) # Find lowest score programmatically (no LLM needed) scores = { "clinical_accuracy": evaluation.clinical_accuracy.score, "evidence_grounding": evaluation.evidence_grounding.score, "actionability": evaluation.actionability.score, "clarity": evaluation.clarity.score, "safety_completeness": evaluation.safety_completeness.score, } reasonings = { "clinical_accuracy": evaluation.clinical_accuracy.reasoning, "evidence_grounding": evaluation.evidence_grounding.reasoning, "actionability": evaluation.actionability.reasoning, "clarity": evaluation.clarity.reasoning, "safety_completeness": evaluation.safety_completeness.reasoning, } primary_weakness = min(scores, key=scores.get) weakness_score = scores[primary_weakness] weakness_reasoning = reasonings[primary_weakness] # Generate detailed root cause analysis root_cause_map = { "clinical_accuracy": f"Clinical accuracy score ({weakness_score:.2f}) indicates potential issues with medical interpretations. {weakness_reasoning[:200]}", "evidence_grounding": f"Evidence grounding score ({weakness_score:.2f}) suggests insufficient citations. {weakness_reasoning[:200]}", "actionability": f"Actionability score ({weakness_score:.2f}) indicates recommendations lack specificity. {weakness_reasoning[:200]}", "clarity": f"Clarity score ({weakness_score:.2f}) suggests readability issues. {weakness_reasoning[:200]}", "safety_completeness": f"Safety score ({weakness_score:.2f}) indicates missing risk discussions. {weakness_reasoning[:200]}", } recommendation_map = { "clinical_accuracy": "Increase RAG depth to access more authoritative medical sources.", "evidence_grounding": "Enforce strict citation requirements and increase RAG depth.", "actionability": "Make recommendations more specific with concrete action items.", "clarity": "Simplify language and reduce technical jargon for better readability.", "safety_completeness": "Add explicit safety warnings and ensure complete risk coverage.", } diagnosis = Diagnosis( primary_weakness=primary_weakness, root_cause_analysis=root_cause_map[primary_weakness], recommendation=recommendation_map[primary_weakness], ) print("\nāœ“ Diagnosis complete") print(f" Primary weakness: {diagnosis.primary_weakness} ({weakness_score:.3f})") print(f" Recommendation: {diagnosis.recommendation}") return diagnosis def sop_architect(diagnosis: Diagnosis, current_sop: ExplanationSOP) -> EvolvedSOPs: """ Generates targeted SOP mutations to address diagnosed weakness. Uses programmatic generation for reliability. """ print("\n" + "=" * 70) print("EXECUTING: SOP Architect") print("=" * 70) print(f"Target weakness: {diagnosis.primary_weakness}") weakness = diagnosis.primary_weakness # Generate mutations based on weakness type if weakness == "clarity": mut1 = SOPMutation( disease_explainer_k=max(3, current_sop.disease_explainer_k - 1), linker_retrieval_k=max(2, current_sop.linker_retrieval_k - 1), guideline_retrieval_k=max(2, current_sop.guideline_retrieval_k - 1), explainer_detail_level="concise", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=current_sop.use_guideline_agent, include_alternative_diagnoses=False, require_pdf_citations=current_sop.require_pdf_citations, use_confidence_assessor=current_sop.use_confidence_assessor, critical_value_alert_mode=current_sop.critical_value_alert_mode, description="Reduce retrieval depth and use concise style for clarity", ) mut2 = SOPMutation( disease_explainer_k=current_sop.disease_explainer_k, linker_retrieval_k=current_sop.linker_retrieval_k, guideline_retrieval_k=current_sop.guideline_retrieval_k, explainer_detail_level="detailed", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=current_sop.use_guideline_agent, include_alternative_diagnoses=True, require_pdf_citations=False, use_confidence_assessor=current_sop.use_confidence_assessor, critical_value_alert_mode=current_sop.critical_value_alert_mode, description="Balanced detail with fewer citations for readability", ) elif weakness == "evidence_grounding": mut1 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 2), linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1), guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1), explainer_detail_level="comprehensive", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=current_sop.include_alternative_diagnoses, require_pdf_citations=True, use_confidence_assessor=current_sop.use_confidence_assessor, critical_value_alert_mode=current_sop.critical_value_alert_mode, description="Maximum RAG depth with strict citation requirements", ) mut2 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 1), linker_retrieval_k=current_sop.linker_retrieval_k, guideline_retrieval_k=current_sop.guideline_retrieval_k, explainer_detail_level="detailed", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=current_sop.include_alternative_diagnoses, require_pdf_citations=True, use_confidence_assessor=current_sop.use_confidence_assessor, critical_value_alert_mode=current_sop.critical_value_alert_mode, description="Moderate RAG increase with citation enforcement", ) elif weakness == "actionability": mut1 = SOPMutation( disease_explainer_k=current_sop.disease_explainer_k, linker_retrieval_k=current_sop.linker_retrieval_k, guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 2), explainer_detail_level="comprehensive", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=current_sop.include_alternative_diagnoses, require_pdf_citations=True, use_confidence_assessor=current_sop.use_confidence_assessor, critical_value_alert_mode="strict", description="Increase guideline retrieval for actionable recommendations", ) mut2 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 1), linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1), guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1), explainer_detail_level="detailed", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=True, require_pdf_citations=True, use_confidence_assessor=True, critical_value_alert_mode="strict", description="Comprehensive approach with all agents enabled", ) elif weakness == "clinical_accuracy": mut1 = SOPMutation( disease_explainer_k=10, linker_retrieval_k=5, guideline_retrieval_k=5, explainer_detail_level="comprehensive", biomarker_analyzer_threshold=max(0.10, current_sop.biomarker_analyzer_threshold - 0.05), use_guideline_agent=True, include_alternative_diagnoses=True, require_pdf_citations=True, use_confidence_assessor=True, critical_value_alert_mode="strict", description="Maximum RAG depth with strict thresholds for accuracy", ) mut2 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 2), linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1), guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1), explainer_detail_level="comprehensive", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=True, require_pdf_citations=True, use_confidence_assessor=True, critical_value_alert_mode="strict", description="High RAG depth with comprehensive detail", ) else: # safety_completeness mut1 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 1), linker_retrieval_k=current_sop.linker_retrieval_k, guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 2), explainer_detail_level="comprehensive", biomarker_analyzer_threshold=max(0.10, current_sop.biomarker_analyzer_threshold - 0.03), use_guideline_agent=True, include_alternative_diagnoses=True, require_pdf_citations=True, use_confidence_assessor=True, critical_value_alert_mode="strict", description="Strict safety mode with enhanced guidelines", ) mut2 = SOPMutation( disease_explainer_k=min(10, current_sop.disease_explainer_k + 2), linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1), guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1), explainer_detail_level="comprehensive", biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold, use_guideline_agent=True, include_alternative_diagnoses=True, require_pdf_citations=True, use_confidence_assessor=True, critical_value_alert_mode="strict", description="Maximum coverage with all safety features", ) evolved = EvolvedSOPs(mutations=[mut1, mut2]) print(f"\nāœ“ Generated {len(evolved.mutations)} mutations") for i, mut in enumerate(evolved.mutations, 1): print(f" {i}. {mut.description}") print(f" Disease K: {mut.disease_explainer_k}, Detail: {mut.explainer_detail_level}") return evolved def run_evolution_cycle( gene_pool: SOPGenePool, patient_input: Any, workflow_graph: Any, evaluation_func: Callable ) -> list[dict[str, Any]]: """ Executes one complete evolution cycle: 1. Diagnose current best SOP 2. Generate mutations 3. Test each mutation 4. Add to gene pool Returns: List of new entries added to pool """ print("\n" + "=" * 80) print("STARTING EVOLUTION CYCLE") print("=" * 80) # Get current best (for simplicity, use latest) current_best = gene_pool.get_latest() if not current_best: raise ValueError("Gene pool is empty. Add baseline SOP first.") parent_sop = current_best["sop"] parent_eval = current_best["evaluation"] parent_version = current_best["version"] print(f"\nImproving upon SOP v{parent_version}") # Step 1: Diagnose diagnosis = performance_diagnostician(parent_eval) # Step 2: Generate mutations evolved_sops = sop_architect(diagnosis, parent_sop) # Step 3: Test each mutation new_entries = [] for i, mutant_sop_model in enumerate(evolved_sops.mutations, 1): print(f"\n{'=' * 70}") print(f"TESTING MUTATION {i}/{len(evolved_sops.mutations)}: {mutant_sop_model.description}") print("=" * 70) # Convert SOPMutation to ExplanationSOP mutant_sop_dict = mutant_sop_model.model_dump() description = mutant_sop_dict.pop("description") mutant_sop = ExplanationSOP(**mutant_sop_dict) # Run workflow with mutated SOP from datetime import datetime graph_input = { "patient_biomarkers": patient_input.biomarkers, "model_prediction": patient_input.model_prediction, "patient_context": patient_input.patient_context, "plan": None, "sop": mutant_sop, "agent_outputs": [], "biomarker_flags": [], "safety_alerts": [], "biomarker_analysis": None, "final_response": None, "processing_timestamp": datetime.now().isoformat(), "sop_version": description, } try: final_state = workflow_graph.invoke(graph_input) # Evaluate output evaluation = evaluation_func( final_response=final_state["final_response"], agent_outputs=final_state["agent_outputs"], biomarkers=patient_input.biomarkers, ) # Add to gene pool gene_pool.add(sop=mutant_sop, evaluation=evaluation, parent_version=parent_version, description=description) new_entries.append({"sop": mutant_sop, "evaluation": evaluation, "description": description}) except Exception as e: print(f"āŒ Mutation {i} failed: {e}") continue print("\n" + "=" * 80) print("EVOLUTION CYCLE COMPLETE") print("=" * 80) return new_entries