T0X1N's picture
chore: codebase audit and fixes (ruff, mypy, pytest)
9659593
"""
MediGuard AI RAG-Helper - Evolution Engine
Outer Loop Director for SOP Evolution
"""
from collections.abc import Callable
from typing import Any, Literal
from pydantic import BaseModel, Field
from src.config import ExplanationSOP
from src.evaluation.evaluators import EvaluationResult
class SOPGenePool:
"""Manages version control for evolving SOPs"""
def __init__(self):
self.pool: list[dict[str, Any]] = []
self.gene_pool: list[dict[str, Any]] = [] # Alias for compatibility
self.version_counter = 0
def add(
self,
sop: ExplanationSOP,
evaluation: EvaluationResult,
parent_version: int | None = None,
description: str = "",
):
"""Add a new SOP to the gene pool"""
self.version_counter += 1
entry = {
"version": self.version_counter,
"sop": sop,
"evaluation": evaluation,
"parent": parent_version,
"description": description,
}
self.pool.append(entry)
self.gene_pool = self.pool # Keep in sync
print(f"✓ Added SOP v{self.version_counter} to gene pool: {description}")
def get_latest(self) -> dict[str, Any] | None:
"""Get the most recent SOP"""
return self.pool[-1] if self.pool else None
def get_by_version(self, version: int) -> dict[str, Any] | None:
"""Retrieve specific SOP version"""
for entry in self.pool:
if entry["version"] == version:
return entry
return None
def get_best_by_metric(self, metric: str) -> dict[str, Any] | None:
"""Get SOP with highest score on specific metric"""
if not self.pool:
return None
best = max(self.pool, key=lambda x: getattr(x["evaluation"], metric).score)
return best
def summary(self):
"""Print summary of all SOPs in pool"""
print("\n" + "=" * 80)
print("SOP GENE POOL SUMMARY")
print("=" * 80)
for entry in self.pool:
v = entry["version"]
p = entry["parent"]
desc = entry["description"]
e = entry["evaluation"]
parent_str = "(Baseline)" if p is None else f"(Child of v{p})"
print(f"\nSOP v{v} {parent_str}: {desc}")
print(f" Clinical Accuracy: {e.clinical_accuracy.score:.2f}")
print(f" Evidence Grounding: {e.evidence_grounding.score:.2f}")
print(f" Actionability: {e.actionability.score:.2f}")
print(f" Clarity: {e.clarity.score:.2f}")
print(f" Safety & Completeness: {e.safety_completeness.score:.2f}")
print("\n" + "=" * 80)
class Diagnosis(BaseModel):
"""Structured diagnosis from Performance Diagnostician"""
primary_weakness: Literal[
"clinical_accuracy", "evidence_grounding", "actionability", "clarity", "safety_completeness"
]
root_cause_analysis: str = Field(description="Detailed analysis of why weakness occurred")
recommendation: str = Field(description="High-level recommendation to fix the problem")
class SOPMutation(BaseModel):
"""Single mutated SOP with description"""
description: str = Field(description="Brief description of mutation strategy")
# SOP fields from ExplanationSOP
biomarker_analyzer_threshold: float = 0.15
disease_explainer_k: int = 5
linker_retrieval_k: int = 3
guideline_retrieval_k: int = 3
explainer_detail_level: Literal["concise", "detailed", "comprehensive"] = "detailed"
use_guideline_agent: bool = True
include_alternative_diagnoses: bool = True
require_pdf_citations: bool = True
use_confidence_assessor: bool = True
critical_value_alert_mode: Literal["strict", "moderate", "permissive"] = "strict"
class EvolvedSOPs(BaseModel):
"""Container for mutated SOPs from Architect"""
mutations: list[SOPMutation]
def performance_diagnostician(evaluation: EvaluationResult) -> Diagnosis:
"""
Analyzes 5D scores to identify primary weakness.
Uses programmatic analysis for reliability and speed.
"""
print("\n" + "=" * 70)
print("EXECUTING: Performance Diagnostician")
print("=" * 70)
# Find lowest score programmatically (no LLM needed)
scores = {
"clinical_accuracy": evaluation.clinical_accuracy.score,
"evidence_grounding": evaluation.evidence_grounding.score,
"actionability": evaluation.actionability.score,
"clarity": evaluation.clarity.score,
"safety_completeness": evaluation.safety_completeness.score,
}
reasonings = {
"clinical_accuracy": evaluation.clinical_accuracy.reasoning,
"evidence_grounding": evaluation.evidence_grounding.reasoning,
"actionability": evaluation.actionability.reasoning,
"clarity": evaluation.clarity.reasoning,
"safety_completeness": evaluation.safety_completeness.reasoning,
}
primary_weakness = min(scores, key=scores.get)
weakness_score = scores[primary_weakness]
weakness_reasoning = reasonings[primary_weakness]
# Generate detailed root cause analysis
root_cause_map = {
"clinical_accuracy": f"Clinical accuracy score ({weakness_score:.2f}) indicates potential issues with medical interpretations. {weakness_reasoning[:200]}",
"evidence_grounding": f"Evidence grounding score ({weakness_score:.2f}) suggests insufficient citations. {weakness_reasoning[:200]}",
"actionability": f"Actionability score ({weakness_score:.2f}) indicates recommendations lack specificity. {weakness_reasoning[:200]}",
"clarity": f"Clarity score ({weakness_score:.2f}) suggests readability issues. {weakness_reasoning[:200]}",
"safety_completeness": f"Safety score ({weakness_score:.2f}) indicates missing risk discussions. {weakness_reasoning[:200]}",
}
recommendation_map = {
"clinical_accuracy": "Increase RAG depth to access more authoritative medical sources.",
"evidence_grounding": "Enforce strict citation requirements and increase RAG depth.",
"actionability": "Make recommendations more specific with concrete action items.",
"clarity": "Simplify language and reduce technical jargon for better readability.",
"safety_completeness": "Add explicit safety warnings and ensure complete risk coverage.",
}
diagnosis = Diagnosis(
primary_weakness=primary_weakness,
root_cause_analysis=root_cause_map[primary_weakness],
recommendation=recommendation_map[primary_weakness],
)
print("\n✓ Diagnosis complete")
print(f" Primary weakness: {diagnosis.primary_weakness} ({weakness_score:.3f})")
print(f" Recommendation: {diagnosis.recommendation}")
return diagnosis
def sop_architect(diagnosis: Diagnosis, current_sop: ExplanationSOP) -> EvolvedSOPs:
"""
Generates targeted SOP mutations to address diagnosed weakness.
Uses programmatic generation for reliability.
"""
print("\n" + "=" * 70)
print("EXECUTING: SOP Architect")
print("=" * 70)
print(f"Target weakness: {diagnosis.primary_weakness}")
weakness = diagnosis.primary_weakness
# Generate mutations based on weakness type
if weakness == "clarity":
mut1 = SOPMutation(
disease_explainer_k=max(3, current_sop.disease_explainer_k - 1),
linker_retrieval_k=max(2, current_sop.linker_retrieval_k - 1),
guideline_retrieval_k=max(2, current_sop.guideline_retrieval_k - 1),
explainer_detail_level="concise",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=current_sop.use_guideline_agent,
include_alternative_diagnoses=False,
require_pdf_citations=current_sop.require_pdf_citations,
use_confidence_assessor=current_sop.use_confidence_assessor,
critical_value_alert_mode=current_sop.critical_value_alert_mode,
description="Reduce retrieval depth and use concise style for clarity",
)
mut2 = SOPMutation(
disease_explainer_k=current_sop.disease_explainer_k,
linker_retrieval_k=current_sop.linker_retrieval_k,
guideline_retrieval_k=current_sop.guideline_retrieval_k,
explainer_detail_level="detailed",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=current_sop.use_guideline_agent,
include_alternative_diagnoses=True,
require_pdf_citations=False,
use_confidence_assessor=current_sop.use_confidence_assessor,
critical_value_alert_mode=current_sop.critical_value_alert_mode,
description="Balanced detail with fewer citations for readability",
)
elif weakness == "evidence_grounding":
mut1 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 2),
linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1),
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1),
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=current_sop.include_alternative_diagnoses,
require_pdf_citations=True,
use_confidence_assessor=current_sop.use_confidence_assessor,
critical_value_alert_mode=current_sop.critical_value_alert_mode,
description="Maximum RAG depth with strict citation requirements",
)
mut2 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 1),
linker_retrieval_k=current_sop.linker_retrieval_k,
guideline_retrieval_k=current_sop.guideline_retrieval_k,
explainer_detail_level="detailed",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=current_sop.include_alternative_diagnoses,
require_pdf_citations=True,
use_confidence_assessor=current_sop.use_confidence_assessor,
critical_value_alert_mode=current_sop.critical_value_alert_mode,
description="Moderate RAG increase with citation enforcement",
)
elif weakness == "actionability":
mut1 = SOPMutation(
disease_explainer_k=current_sop.disease_explainer_k,
linker_retrieval_k=current_sop.linker_retrieval_k,
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 2),
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=current_sop.include_alternative_diagnoses,
require_pdf_citations=True,
use_confidence_assessor=current_sop.use_confidence_assessor,
critical_value_alert_mode="strict",
description="Increase guideline retrieval for actionable recommendations",
)
mut2 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 1),
linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1),
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1),
explainer_detail_level="detailed",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=True,
require_pdf_citations=True,
use_confidence_assessor=True,
critical_value_alert_mode="strict",
description="Comprehensive approach with all agents enabled",
)
elif weakness == "clinical_accuracy":
mut1 = SOPMutation(
disease_explainer_k=10,
linker_retrieval_k=5,
guideline_retrieval_k=5,
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=max(0.10, current_sop.biomarker_analyzer_threshold - 0.05),
use_guideline_agent=True,
include_alternative_diagnoses=True,
require_pdf_citations=True,
use_confidence_assessor=True,
critical_value_alert_mode="strict",
description="Maximum RAG depth with strict thresholds for accuracy",
)
mut2 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 2),
linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1),
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1),
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=True,
require_pdf_citations=True,
use_confidence_assessor=True,
critical_value_alert_mode="strict",
description="High RAG depth with comprehensive detail",
)
else: # safety_completeness
mut1 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 1),
linker_retrieval_k=current_sop.linker_retrieval_k,
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 2),
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=max(0.10, current_sop.biomarker_analyzer_threshold - 0.03),
use_guideline_agent=True,
include_alternative_diagnoses=True,
require_pdf_citations=True,
use_confidence_assessor=True,
critical_value_alert_mode="strict",
description="Strict safety mode with enhanced guidelines",
)
mut2 = SOPMutation(
disease_explainer_k=min(10, current_sop.disease_explainer_k + 2),
linker_retrieval_k=min(5, current_sop.linker_retrieval_k + 1),
guideline_retrieval_k=min(5, current_sop.guideline_retrieval_k + 1),
explainer_detail_level="comprehensive",
biomarker_analyzer_threshold=current_sop.biomarker_analyzer_threshold,
use_guideline_agent=True,
include_alternative_diagnoses=True,
require_pdf_citations=True,
use_confidence_assessor=True,
critical_value_alert_mode="strict",
description="Maximum coverage with all safety features",
)
evolved = EvolvedSOPs(mutations=[mut1, mut2])
print(f"\n✓ Generated {len(evolved.mutations)} mutations")
for i, mut in enumerate(evolved.mutations, 1):
print(f" {i}. {mut.description}")
print(f" Disease K: {mut.disease_explainer_k}, Detail: {mut.explainer_detail_level}")
return evolved
def run_evolution_cycle(
gene_pool: SOPGenePool, patient_input: Any, workflow_graph: Any, evaluation_func: Callable
) -> list[dict[str, Any]]:
"""
Executes one complete evolution cycle:
1. Diagnose current best SOP
2. Generate mutations
3. Test each mutation
4. Add to gene pool
Returns: List of new entries added to pool
"""
print("\n" + "=" * 80)
print("STARTING EVOLUTION CYCLE")
print("=" * 80)
# Get current best (for simplicity, use latest)
current_best = gene_pool.get_latest()
if not current_best:
raise ValueError("Gene pool is empty. Add baseline SOP first.")
parent_sop = current_best["sop"]
parent_eval = current_best["evaluation"]
parent_version = current_best["version"]
print(f"\nImproving upon SOP v{parent_version}")
# Step 1: Diagnose
diagnosis = performance_diagnostician(parent_eval)
# Step 2: Generate mutations
evolved_sops = sop_architect(diagnosis, parent_sop)
# Step 3: Test each mutation
new_entries = []
for i, mutant_sop_model in enumerate(evolved_sops.mutations, 1):
print(f"\n{'=' * 70}")
print(f"TESTING MUTATION {i}/{len(evolved_sops.mutations)}: {mutant_sop_model.description}")
print("=" * 70)
# Convert SOPMutation to ExplanationSOP
mutant_sop_dict = mutant_sop_model.model_dump()
description = mutant_sop_dict.pop("description")
mutant_sop = ExplanationSOP(**mutant_sop_dict)
# Run workflow with mutated SOP
from datetime import datetime
graph_input = {
"patient_biomarkers": patient_input.biomarkers,
"model_prediction": patient_input.model_prediction,
"patient_context": patient_input.patient_context,
"plan": None,
"sop": mutant_sop,
"agent_outputs": [],
"biomarker_flags": [],
"safety_alerts": [],
"biomarker_analysis": None,
"final_response": None,
"processing_timestamp": datetime.now().isoformat(),
"sop_version": description,
}
try:
final_state = workflow_graph.invoke(graph_input)
# Evaluate output
evaluation = evaluation_func(
final_response=final_state["final_response"],
agent_outputs=final_state["agent_outputs"],
biomarkers=patient_input.biomarkers,
)
# Add to gene pool
gene_pool.add(sop=mutant_sop, evaluation=evaluation, parent_version=parent_version, description=description)
new_entries.append({"sop": mutant_sop, "evaluation": evaluation, "description": description})
except Exception as e:
print(f"❌ Mutation {i} failed: {e}")
continue
print("\n" + "=" * 80)
print("EVOLUTION CYCLE COMPLETE")
print("=" * 80)
return new_entries