""" Comparison Engine for AegisLM Multi-Run Analysis. Provides comprehensive comparison capabilities between multiple experiment runs, including score comparisons, metric deltas, and performance rankings. """ import uuid from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from datetime import datetime from enum import Enum import logging from experiments.experiment_manager import get_experiment_manager from schemas.experiment_schema import Experiment, ResultSummary, ExperimentStatus logger = logging.getLogger(__name__) class ComparisonMetric(str, Enum): """Available comparison metrics.""" ROBUSTNESS_SCORE = "robustness_score" RISK_SCORE = "risk_score" SUCCESS_RATE = "success_rate" HALLUCINATION_RATE = "hallucination_rate" TOXICITY_RATE = "toxicity_rate" CONFIDENCE_SCORE = "confidence_score" EXECUTION_TIME_MS = "execution_time_ms" TOTAL_ATTACKS = "total_attacks" SUCCESSFUL_ATTACKS = "successful_attacks" @dataclass class MetricDelta: """Metric delta between two runs.""" metric: ComparisonMetric value_a: float value_b: float delta: float delta_percent: float improvement: bool # True if improvement (lower risk, higher robustness) @dataclass class RunComparison: """Comparison data for a single run against others.""" run_id: str experiment_name: Optional[str] rank: int total_runs: int # Performance metrics robustness_score: float risk_score: float success_rate: float execution_time_ms: Optional[int] # Relative performance percentile_scores: Dict[str, float] = field(default_factory=dict) deltas_to_best: Dict[str, MetricDelta] = field(default_factory=dict) deltas_to_worst: Dict[str, MetricDelta] = field(default_factory=dict) # Classification is_best: bool = False is_worst: bool = False performance_tier: str = "average" # excellent, good, average, poor @dataclass class ComparisonResult: """Complete comparison result for multiple runs.""" run_ids: List[str] comparison_date: datetime total_runs: int # Rankings best_run: Optional[str] = None worst_run: Optional[str] = None rankings: List[RunComparison] = field(default_factory=list) # Overall metrics metric_averages: Dict[str, float] = field(default_factory=dict) metric_ranges: Dict[str, Tuple[float, float]] = field(default_factory=dict) # Performance insights improvement_opportunities: List[str] = field(default_factory=list) key_differences: List[str] = field(default_factory=list) consistency_score: float = 0.0 # Visualization data chart_data: Dict[str, Any] = field(default_factory=dict) class ComparisonEngine: """ Engine for comparing multiple experiment runs. Provides comprehensive analysis including rankings, deltas, performance insights, and visualization-ready data. """ def __init__(self): """Initialize comparison engine.""" self.experiment_manager = get_experiment_manager() async def compare_runs(self, run_ids: List[str]) -> ComparisonResult: """ Compare multiple experiment runs. Args: run_ids: List of run IDs to compare Returns: ComparisonResult: Comprehensive comparison analysis Raises: ValueError: If insufficient valid runs provided """ if len(run_ids) < 2: raise ValueError("At least 2 runs required for comparison") # Fetch experiment data experiments = await self._fetch_experiments(run_ids) if len(experiments) < 2: raise ValueError("Insufficient valid experiments for comparison") # Filter to completed experiments only completed_experiments = [ exp for exp in experiments if exp.status == ExperimentStatus.COMPLETED and exp.result_summary ] if len(completed_experiments) < 2: raise ValueError("At least 2 completed experiments required for comparison") logger.info(f"Comparing {len(completed_experiments)} completed experiments") # Perform comparison analysis result = ComparisonResult( run_ids=[exp.run_id.hex for exp in completed_experiments], comparison_date=datetime.utcnow(), total_runs=len(completed_experiments) ) # Calculate rankings and metrics await self._calculate_rankings(completed_experiments, result) # Calculate metric deltas await self._calculate_deltas(completed_experiments, result) # Generate performance insights await self._generate_insights(completed_experiments, result) # Create visualization data await self._create_chart_data(completed_experiments, result) return result async def _fetch_experiments(self, run_ids: List[str]) -> List[Experiment]: """ Fetch experiments by run IDs. Args: run_ids: List of run IDs Returns: List[Experiment]: Valid experiments """ experiments = [] for run_id in run_ids: try: # Convert string to UUID if needed if isinstance(run_id, str): try: run_uuid = uuid.UUID(run_id) except ValueError: logger.warning(f"Invalid run ID format: {run_id}") continue else: run_uuid = run_id # Fetch experiment experiment = self.experiment_manager.store.get_experiment(run_uuid) if experiment: experiments.append(experiment) else: logger.warning(f"Experiment not found: {run_id}") except Exception as e: logger.error(f"Error fetching experiment {run_id}: {e}") continue return experiments async def _calculate_rankings(self, experiments: List[Experiment], result: ComparisonResult): """ Calculate performance rankings for experiments. Args: experiments: List of experiments to rank result: Comparison result to update """ # Extract metrics for ranking metrics_data = [] for exp in experiments: if exp.result_summary: metrics_data.append({ 'run_id': exp.run_id.hex, 'experiment_name': exp.experiment_name, 'robustness_score': exp.result_summary.robustness_score, 'risk_score': exp.result_summary.risk_score, 'success_rate': exp.result_summary.success_rate, 'execution_time_ms': exp.result_summary.execution_time_ms or 0, 'hallucination_rate': exp.result_summary.hallucination_rate or 0.0, 'toxicity_rate': exp.result_summary.toxicity_rate or 0.0, 'confidence_score': exp.result_summary.confidence_score or 0.0, 'total_attacks': exp.result_summary.total_attacks, 'successful_attacks': exp.result_summary.successful_attacks }) # Calculate rankings for each metric rankings = {} for metric in ComparisonMetric: if metric.value in metrics_data[0]: # Sort by metric (higher is better for robustness/success/confidence, lower for risk/time) reverse = metric.value in ['robustness_score', 'success_rate', 'confidence_score', 'total_attacks', 'successful_attacks'] sorted_runs = sorted( metrics_data, key=lambda x: x[metric.value], reverse=reverse ) rankings[metric.value] = { run['run_id']: rank + 1 for rank, run in enumerate(sorted_runs) } # Calculate overall rankings (weighted average) overall_scores = {} weights = { 'robustness_score': 0.3, 'risk_score': -0.25, # Negative because lower is better 'success_rate': 0.2, 'confidence_score': 0.15, 'execution_time_ms': -0.1 # Negative because lower is better } for run_data in metrics_data: score = 0.0 total_weight = 0.0 for metric, weight in weights.items(): if metric in run_data and run_data[metric] is not None: # Normalize metric (0-1 scale) normalized = self._normalize_metric(metric, run_data[metric], metrics_data) score += normalized * abs(weight) total_weight += abs(weight) overall_scores[run_data['run_id']] = score / total_weight if total_weight > 0 else 0.0 # Sort by overall score sorted_overall = sorted(overall_scores.items(), key=lambda x: x[1], reverse=True) # Create run comparisons for rank, (run_id, score) in enumerate(sorted_overall): run_data = next(r for r in metrics_data if r['run_id'] == run_id) # Calculate percentile scores percentiles = {} for metric in ComparisonMetric: if metric.value in run_data: percentiles[metric.value] = self._calculate_percentile( run_data[metric.value], [r[metric.value] for r in metrics_data] ) # Determine performance tier tier = self._determine_performance_tier(rank, len(metrics_data)) comparison = RunComparison( run_id=run_id, experiment_name=run_data['experiment_name'], rank=rank + 1, total_runs=len(metrics_data), robustness_score=run_data['robustness_score'], risk_score=run_data['risk_score'], success_rate=run_data['success_rate'], execution_time_ms=run_data['execution_time_ms'], percentile_scores=percentiles, is_best=(rank == 0), is_worst=(rank == len(metrics_data) - 1), performance_tier=tier ) result.rankings.append(comparison) # Set best and worst runs if result.rankings: result.best_run = result.rankings[0].run_id result.worst_run = result.rankings[-1].run_id async def _calculate_deltas(self, experiments: List[Experiment], result: ComparisonResult): """ Calculate metric deltas between runs. Args: experiments: List of experiments result: Comparison result to update """ if not result.rankings: return best_run = next(r for r in result.rankings if r.is_best) worst_run = next(r for r in result.rankings if r.is_worst) # Calculate deltas for each run for run_comparison in result.rankings: # Deltas to best run for metric in ComparisonMetric: if hasattr(best_run, metric.value): best_value = getattr(best_run, metric.value) current_value = getattr(run_comparison, metric.value) if best_value is not None and current_value is not None: delta = current_value - best_value delta_percent = (delta / best_value * 100) if best_value != 0 else 0 # Determine if this is an improvement improvement = self._is_improvement(metric, delta) metric_delta = MetricDelta( metric=metric, value_a=current_value, value_b=best_value, delta=delta, delta_percent=delta_percent, improvement=improvement ) run_comparison.deltas_to_best[metric.value] = metric_delta # Deltas to worst run for metric in ComparisonMetric: if hasattr(worst_run, metric.value): worst_value = getattr(worst_run, metric.value) current_value = getattr(run_comparison, metric.value) if worst_value is not None and current_value is not None: delta = current_value - worst_value delta_percent = (delta / worst_value * 100) if worst_value != 0 else 0 improvement = self._is_improvement(metric, delta) metric_delta = MetricDelta( metric=metric, value_a=current_value, value_b=worst_value, delta=delta, delta_percent=delta_percent, improvement=improvement ) run_comparison.deltas_to_worst[metric.value] = metric_delta async def _generate_insights(self, experiments: List[Experiment], result: ComparisonResult): """ Generate performance insights and recommendations. Args: experiments: List of experiments result: Comparison result to update """ if not result.rankings: return # Calculate metric averages and ranges metrics = ['robustness_score', 'risk_score', 'success_rate', 'execution_time_ms'] for metric in metrics: values = [getattr(r, metric) for r in result.rankings if getattr(r, metric) is not None] if values: result.metric_averages[metric] = sum(values) / len(values) result.metric_ranges[metric] = (min(values), max(values)) # Generate improvement opportunities best_run = next(r for r in result.rankings if r.is_best) for metric in ComparisonMetric: if metric.value in ['risk_score', 'success_rate']: # Focus on key metrics avg_value = result.metric_averages.get(metric.value, 0) best_value = getattr(best_run, metric.value, 0) if avg_value < best_value * 0.9: # Significant gap result.improvement_opportunities.append( f"Improve {metric.value.replace('_', ' ')}: " f"average {avg_value:.3f} vs best {best_value:.3f}" ) # Identify key differences for metric in ComparisonMetric: if metric.value in result.metric_ranges: min_val, max_val = result.metric_ranges[metric.value] if min_val > 0: variation = (max_val - min_val) / min_val if variation > 0.5: # 50%+ variation result.key_differences.append( f"High variation in {metric.value.replace('_', ' ')}: " f"{min_val:.3f} - {max_val:.3f} ({variation:.1%} range)" ) # Calculate consistency score consistency_scores = [] for metric in ['robustness_score', 'risk_score', 'success_rate']: if metric in result.metric_ranges: min_val, max_val = result.metric_ranges[metric] avg_val = result.metric_averages.get(metric, 0) if avg_val > 0: consistency = 1 - ((max_val - min_val) / avg_val) consistency_scores.append(max(0, consistency)) result.consistency_score = sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.0 async def _create_chart_data(self, experiments: List[Experiment], result: ComparisonResult): """ Create visualization-ready data for charts. Args: experiments: List of experiments result: Comparison result to update """ # Radar chart data radar_data = { 'labels': ['Robustness', 'Low Risk', 'Success Rate', 'Confidence'], 'datasets': [] } for run_comp in result.rankings[:5]: # Top 5 runs radar_data['datasets'].append({ 'name': run_comp.experiment_name or run_comp.run_id[:8], 'data': [ run_comp.robustness_score, 1 - run_comp.risk_score, # Invert risk for display run_comp.success_rate, run_comp.percentile_scores.get('confidence_score', 0) ] }) # Bar chart data for key metrics bar_data = { 'labels': [r.experiment_name or r.run_id[:8] for r in result.rankings], 'metrics': { 'robustness_score': [r.robustness_score for r in result.rankings], 'risk_score': [r.risk_score for r in result.rankings], 'success_rate': [r.success_rate for r in result.rankings] } } # Time series data (if temporal data available) timeline_data = { 'dates': [exp.created_at.isoformat() for exp in experiments], 'robustness_scores': [exp.result_summary.robustness_score for exp in experiments if exp.result_summary], 'risk_scores': [exp.result_summary.risk_score for exp in experiments if exp.result_summary] } result.chart_data = { 'radar': radar_data, 'bar': bar_data, 'timeline': timeline_data } def _normalize_metric(self, metric: str, value: float, all_data: List[Dict[str, Any]]) -> float: """ Normalize metric value to 0-1 scale. Args: metric: Metric name value: Value to normalize all_data: All data points for scaling Returns: float: Normalized value (0-1) """ values = [d[metric] for d in all_data if d[metric] is not None] if not values: return 0.0 min_val, max_val = min(values), max(values) if max_val == min_val: return 0.5 # For risk and time, lower is better (invert normalization) if metric in ['risk_score', 'execution_time_ms']: return 1 - (value - min_val) / (max_val - min_val) else: return (value - min_val) / (max_val - min_val) def _calculate_percentile(self, value: float, all_values: List[float]) -> float: """ Calculate percentile rank for a value. Args: value: Value to calculate percentile for all_values: All values in the dataset Returns: float: Percentile (0-1) """ if not all_values: return 0.0 sorted_values = sorted(all_values) rank = sorted_values.index(value) if value in sorted_values else len(sorted_values) - 1 return (rank + 1) / len(sorted_values) def _determine_performance_tier(self, rank: int, total: int) -> str: """ Determine performance tier based on rank. Args: rank: Rank position (0-based) total: Total number of runs Returns: str: Performance tier """ percentile = (rank + 1) / total if percentile <= 0.25: return "excellent" elif percentile <= 0.5: return "good" elif percentile <= 0.75: return "average" else: return "poor" def _is_improvement(self, metric: ComparisonMetric, delta: float) -> bool: """ Determine if delta represents an improvement. Args: metric: Metric being compared delta: Delta value (current - reference) Returns: bool: True if improvement """ # For robustness, success, confidence: higher is better if metric in [ ComparisonMetric.ROBUSTNESS_SCORE, ComparisonMetric.SUCCESS_RATE, ComparisonMetric.CONFIDENCE_SCORE, ComparisonMetric.TOTAL_ATTACKS, ComparisonMetric.SUCCESSFUL_ATTACKS ]: return delta > 0 # For risk, time, rates: lower is better return delta < 0 # Global comparison engine instance comparison_engine = ComparisonEngine() async def get_comparison_engine() -> ComparisonEngine: """ Get the global comparison engine instance. Returns: ComparisonEngine: Global instance """ return comparison_engine