Spaces:
Running
Running
| """ | |
| Pareto Frontier Analysis | |
| Identifies optimal trade-offs in multi-objective optimization | |
| """ | |
| from typing import Any | |
| import matplotlib | |
| import numpy as np | |
| matplotlib.use("Agg") # Use non-interactive backend | |
| import matplotlib.pyplot as plt | |
| def identify_pareto_front(gene_pool_entries: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """ | |
| Identifies non-dominated solutions (Pareto Frontier). | |
| A solution is dominated if another solution is: | |
| - Better or equal on ALL metrics | |
| - Strictly better on AT LEAST ONE metric | |
| """ | |
| pareto_front = [] | |
| for i, candidate in enumerate(gene_pool_entries): | |
| is_dominated = False | |
| # Get candidate's 5D score vector | |
| cand_scores = np.array(candidate["evaluation"].to_vector()) | |
| for j, other in enumerate(gene_pool_entries): | |
| if i == j: | |
| continue | |
| # Get other solution's 5D vector | |
| other_scores = np.array(other["evaluation"].to_vector()) | |
| # Check domination: other >= candidate on ALL, other > candidate on SOME | |
| if np.all(other_scores >= cand_scores) and np.any(other_scores > cand_scores): | |
| is_dominated = True | |
| break | |
| if not is_dominated: | |
| pareto_front.append(candidate) | |
| return pareto_front | |
| def visualize_pareto_frontier(pareto_front: list[dict[str, Any]]): | |
| """ | |
| Creates two visualizations: | |
| 1. Parallel coordinates plot (5D) | |
| 2. Radar chart (5D profile) | |
| """ | |
| if not pareto_front: | |
| print("No solutions on Pareto front to visualize") | |
| return | |
| fig = plt.figure(figsize=(18, 7)) | |
| # --- Plot 1: Bar Chart (since pandas might not be available) --- | |
| ax1 = plt.subplot(1, 2, 1) | |
| metrics = ["Clinical\nAccuracy", "Evidence\nGrounding", "Actionability", "Clarity", "Safety"] | |
| x = np.arange(len(metrics)) | |
| width = 0.8 / len(pareto_front) | |
| for idx, entry in enumerate(pareto_front): | |
| e = entry["evaluation"] | |
| scores = [ | |
| e.clinical_accuracy.score, | |
| e.evidence_grounding.score, | |
| e.actionability.score, | |
| e.clarity.score, | |
| e.safety_completeness.score, | |
| ] | |
| offset = (idx - len(pareto_front) / 2) * width + width / 2 | |
| label = f"SOP v{entry['version']}" | |
| ax1.bar(x + offset, scores, width, label=label, alpha=0.8) | |
| ax1.set_xlabel("Metrics", fontsize=12) | |
| ax1.set_ylabel("Score", fontsize=12) | |
| ax1.set_title("5D Performance Comparison (Bar Chart)", fontsize=14) | |
| ax1.set_xticks(x) | |
| ax1.set_xticklabels(metrics, fontsize=10) | |
| ax1.set_ylim(0, 1.0) | |
| ax1.legend(loc="upper left") | |
| ax1.grid(True, alpha=0.3, axis="y") | |
| # --- Plot 2: Radar Chart --- | |
| ax2 = plt.subplot(1, 2, 2, projection="polar") | |
| categories = ["Clinical\nAccuracy", "Evidence\nGrounding", "Actionability", "Clarity", "Safety"] | |
| num_vars = len(categories) | |
| angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() | |
| angles += angles[:1] | |
| for entry in pareto_front: | |
| e = entry["evaluation"] | |
| values = [ | |
| e.clinical_accuracy.score, | |
| e.evidence_grounding.score, | |
| e.actionability.score, | |
| e.clarity.score, | |
| e.safety_completeness.score, | |
| ] | |
| values += values[:1] | |
| desc = entry.get("description", "")[:30] | |
| label = f"SOP v{entry['version']}: {desc}" | |
| ax2.plot(angles, values, "o-", linewidth=2, label=label) | |
| ax2.fill(angles, values, alpha=0.15) | |
| ax2.set_xticks(angles[:-1]) | |
| ax2.set_xticklabels(categories, size=10) | |
| ax2.set_ylim(0, 1) | |
| ax2.set_title("5D Performance Profiles (Radar Chart)", size=14, y=1.08) | |
| ax2.legend(loc="upper left", bbox_to_anchor=(1.2, 1.0), fontsize=9) | |
| ax2.grid(True) | |
| plt.tight_layout() | |
| # Create data directory if it doesn't exist | |
| from pathlib import Path | |
| data_dir = Path("data") | |
| data_dir.mkdir(exist_ok=True) | |
| output_path = data_dir / "pareto_frontier_analysis.png" | |
| plt.savefig(output_path, dpi=300, bbox_inches="tight") | |
| plt.close() | |
| print(f"\n✓ Visualization saved to: {output_path}") | |
| def print_pareto_summary(pareto_front: list[dict[str, Any]]): | |
| """Print human-readable summary of Pareto frontier""" | |
| print("\n" + "=" * 80) | |
| print("PARETO FRONTIER ANALYSIS") | |
| print("=" * 80) | |
| print(f"\nFound {len(pareto_front)} optimal (non-dominated) solutions:\n") | |
| for entry in pareto_front: | |
| v = entry["version"] | |
| p = entry.get("parent") | |
| desc = entry.get("description", "Baseline") | |
| e = entry["evaluation"] | |
| print(f"SOP v{v} {f'(Child of v{p})' if p else '(Baseline)'}") | |
| print(f" Description: {desc}") | |
| print(f" Clinical Accuracy: {e.clinical_accuracy.score:.3f}") | |
| print(f" Evidence Grounding: {e.evidence_grounding.score:.3f}") | |
| print(f" Actionability: {e.actionability.score:.3f}") | |
| print(f" Clarity: {e.clarity.score:.3f}") | |
| print(f" Safety & Completeness: {e.safety_completeness.score:.3f}") | |
| # Calculate average | |
| avg_score = np.mean(e.to_vector()) | |
| print(f" Average Score: {avg_score:.3f}") | |
| print() | |
| print("=" * 80) | |
| print("\nRECOMMENDATION:") | |
| print("Review the visualizations and choose the SOP that best matches") | |
| print("your strategic priorities (e.g., maximum accuracy vs. clarity).") | |
| print("=" * 80) | |
| def analyze_improvements(gene_pool_entries: list[dict[str, Any]]): | |
| """Analyze improvements over baseline""" | |
| if len(gene_pool_entries) < 2: | |
| print("\n⚠️ Not enough SOPs to analyze improvements") | |
| return | |
| baseline = gene_pool_entries[0] | |
| baseline_scores = np.array(baseline["evaluation"].to_vector()) | |
| print("\n" + "=" * 80) | |
| print("IMPROVEMENT ANALYSIS") | |
| print("=" * 80) | |
| print(f"\nBaseline (v{baseline['version']}): {baseline.get('description', 'Initial')}") | |
| print(f" Average Score: {np.mean(baseline_scores):.3f}") | |
| improvements_found = False | |
| for entry in gene_pool_entries[1:]: | |
| scores = np.array(entry["evaluation"].to_vector()) | |
| avg_score = np.mean(scores) | |
| baseline_avg = np.mean(baseline_scores) | |
| if avg_score > baseline_avg: | |
| improvements_found = True | |
| improvement_pct = ((avg_score - baseline_avg) / baseline_avg) * 100 | |
| print(f"\n✓ SOP v{entry['version']}: {entry.get('description', '')}") | |
| print(f" Average Score: {avg_score:.3f} (+{improvement_pct:.1f}% vs baseline)") | |
| # Show per-metric improvements | |
| metric_names = [ | |
| "Clinical Accuracy", | |
| "Evidence Grounding", | |
| "Actionability", | |
| "Clarity", | |
| "Safety & Completeness", | |
| ] | |
| for i, (name, score, baseline_score) in enumerate(zip(metric_names, scores, baseline_scores)): | |
| diff = score - baseline_score | |
| if abs(diff) > 0.01: # Show significant changes | |
| symbol = "↑" if diff > 0 else "↓" | |
| print(f" {name}: {score:.3f} {symbol} ({diff:+.3f})") | |
| if not improvements_found: | |
| print("\n⚠️ No improvements found over baseline yet") | |
| print(" Consider running more evolution cycles or adjusting mutation strategies") | |
| print("\n" + "=" * 80) | |