| """ |
| Comparison Engine for AegisLM Multi-Run Analysis. |
| |
| Provides comprehensive comparison capabilities between multiple experiment runs, |
| including score comparisons, metric deltas, and performance rankings. |
| """ |
|
|
| import uuid |
| from typing import Dict, List, Any, Optional, Tuple |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from enum import Enum |
| import logging |
|
|
| from experiments.experiment_manager import get_experiment_manager |
| from schemas.experiment_schema import Experiment, ResultSummary, ExperimentStatus |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ComparisonMetric(str, Enum): |
| """Available comparison metrics.""" |
| ROBUSTNESS_SCORE = "robustness_score" |
| RISK_SCORE = "risk_score" |
| SUCCESS_RATE = "success_rate" |
| HALLUCINATION_RATE = "hallucination_rate" |
| TOXICITY_RATE = "toxicity_rate" |
| CONFIDENCE_SCORE = "confidence_score" |
| EXECUTION_TIME_MS = "execution_time_ms" |
| TOTAL_ATTACKS = "total_attacks" |
| SUCCESSFUL_ATTACKS = "successful_attacks" |
|
|
|
|
| @dataclass |
| class MetricDelta: |
| """Metric delta between two runs.""" |
| metric: ComparisonMetric |
| value_a: float |
| value_b: float |
| delta: float |
| delta_percent: float |
| improvement: bool |
|
|
|
|
| @dataclass |
| class RunComparison: |
| """Comparison data for a single run against others.""" |
| run_id: str |
| experiment_name: Optional[str] |
| rank: int |
| total_runs: int |
| |
| |
| robustness_score: float |
| risk_score: float |
| success_rate: float |
| execution_time_ms: Optional[int] |
| |
| |
| percentile_scores: Dict[str, float] = field(default_factory=dict) |
| deltas_to_best: Dict[str, MetricDelta] = field(default_factory=dict) |
| deltas_to_worst: Dict[str, MetricDelta] = field(default_factory=dict) |
| |
| |
| is_best: bool = False |
| is_worst: bool = False |
| performance_tier: str = "average" |
|
|
|
|
| @dataclass |
| class ComparisonResult: |
| """Complete comparison result for multiple runs.""" |
| run_ids: List[str] |
| comparison_date: datetime |
| total_runs: int |
| |
| |
| best_run: Optional[str] = None |
| worst_run: Optional[str] = None |
| rankings: List[RunComparison] = field(default_factory=list) |
| |
| |
| metric_averages: Dict[str, float] = field(default_factory=dict) |
| metric_ranges: Dict[str, Tuple[float, float]] = field(default_factory=dict) |
| |
| |
| improvement_opportunities: List[str] = field(default_factory=list) |
| key_differences: List[str] = field(default_factory=list) |
| consistency_score: float = 0.0 |
| |
| |
| chart_data: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| class ComparisonEngine: |
| """ |
| Engine for comparing multiple experiment runs. |
| |
| Provides comprehensive analysis including rankings, deltas, |
| performance insights, and visualization-ready data. |
| """ |
| |
| def __init__(self): |
| """Initialize comparison engine.""" |
| self.experiment_manager = get_experiment_manager() |
| |
| async def compare_runs(self, run_ids: List[str]) -> ComparisonResult: |
| """ |
| Compare multiple experiment runs. |
| |
| Args: |
| run_ids: List of run IDs to compare |
| |
| Returns: |
| ComparisonResult: Comprehensive comparison analysis |
| |
| Raises: |
| ValueError: If insufficient valid runs provided |
| """ |
| if len(run_ids) < 2: |
| raise ValueError("At least 2 runs required for comparison") |
| |
| |
| experiments = await self._fetch_experiments(run_ids) |
| |
| if len(experiments) < 2: |
| raise ValueError("Insufficient valid experiments for comparison") |
| |
| |
| completed_experiments = [ |
| exp for exp in experiments |
| if exp.status == ExperimentStatus.COMPLETED and exp.result_summary |
| ] |
| |
| if len(completed_experiments) < 2: |
| raise ValueError("At least 2 completed experiments required for comparison") |
| |
| logger.info(f"Comparing {len(completed_experiments)} completed experiments") |
| |
| |
| result = ComparisonResult( |
| run_ids=[exp.run_id.hex for exp in completed_experiments], |
| comparison_date=datetime.utcnow(), |
| total_runs=len(completed_experiments) |
| ) |
| |
| |
| await self._calculate_rankings(completed_experiments, result) |
| |
| |
| await self._calculate_deltas(completed_experiments, result) |
| |
| |
| await self._generate_insights(completed_experiments, result) |
| |
| |
| await self._create_chart_data(completed_experiments, result) |
| |
| return result |
| |
| async def _fetch_experiments(self, run_ids: List[str]) -> List[Experiment]: |
| """ |
| Fetch experiments by run IDs. |
| |
| Args: |
| run_ids: List of run IDs |
| |
| Returns: |
| List[Experiment]: Valid experiments |
| """ |
| experiments = [] |
| |
| for run_id in run_ids: |
| try: |
| |
| if isinstance(run_id, str): |
| try: |
| run_uuid = uuid.UUID(run_id) |
| except ValueError: |
| logger.warning(f"Invalid run ID format: {run_id}") |
| continue |
| else: |
| run_uuid = run_id |
| |
| |
| experiment = self.experiment_manager.store.get_experiment(run_uuid) |
| if experiment: |
| experiments.append(experiment) |
| else: |
| logger.warning(f"Experiment not found: {run_id}") |
| |
| except Exception as e: |
| logger.error(f"Error fetching experiment {run_id}: {e}") |
| continue |
| |
| return experiments |
| |
| async def _calculate_rankings(self, experiments: List[Experiment], result: ComparisonResult): |
| """ |
| Calculate performance rankings for experiments. |
| |
| Args: |
| experiments: List of experiments to rank |
| result: Comparison result to update |
| """ |
| |
| metrics_data = [] |
| for exp in experiments: |
| if exp.result_summary: |
| metrics_data.append({ |
| 'run_id': exp.run_id.hex, |
| 'experiment_name': exp.experiment_name, |
| 'robustness_score': exp.result_summary.robustness_score, |
| 'risk_score': exp.result_summary.risk_score, |
| 'success_rate': exp.result_summary.success_rate, |
| 'execution_time_ms': exp.result_summary.execution_time_ms or 0, |
| 'hallucination_rate': exp.result_summary.hallucination_rate or 0.0, |
| 'toxicity_rate': exp.result_summary.toxicity_rate or 0.0, |
| 'confidence_score': exp.result_summary.confidence_score or 0.0, |
| 'total_attacks': exp.result_summary.total_attacks, |
| 'successful_attacks': exp.result_summary.successful_attacks |
| }) |
| |
| |
| rankings = {} |
| for metric in ComparisonMetric: |
| if metric.value in metrics_data[0]: |
| |
| reverse = metric.value in ['robustness_score', 'success_rate', 'confidence_score', 'total_attacks', 'successful_attacks'] |
| |
| sorted_runs = sorted( |
| metrics_data, |
| key=lambda x: x[metric.value], |
| reverse=reverse |
| ) |
| |
| rankings[metric.value] = { |
| run['run_id']: rank + 1 |
| for rank, run in enumerate(sorted_runs) |
| } |
| |
| |
| overall_scores = {} |
| weights = { |
| 'robustness_score': 0.3, |
| 'risk_score': -0.25, |
| 'success_rate': 0.2, |
| 'confidence_score': 0.15, |
| 'execution_time_ms': -0.1 |
| } |
| |
| for run_data in metrics_data: |
| score = 0.0 |
| total_weight = 0.0 |
| |
| for metric, weight in weights.items(): |
| if metric in run_data and run_data[metric] is not None: |
| |
| normalized = self._normalize_metric(metric, run_data[metric], metrics_data) |
| score += normalized * abs(weight) |
| total_weight += abs(weight) |
| |
| overall_scores[run_data['run_id']] = score / total_weight if total_weight > 0 else 0.0 |
| |
| |
| sorted_overall = sorted(overall_scores.items(), key=lambda x: x[1], reverse=True) |
| |
| |
| for rank, (run_id, score) in enumerate(sorted_overall): |
| run_data = next(r for r in metrics_data if r['run_id'] == run_id) |
| |
| |
| percentiles = {} |
| for metric in ComparisonMetric: |
| if metric.value in run_data: |
| percentiles[metric.value] = self._calculate_percentile( |
| run_data[metric.value], |
| [r[metric.value] for r in metrics_data] |
| ) |
| |
| |
| tier = self._determine_performance_tier(rank, len(metrics_data)) |
| |
| comparison = RunComparison( |
| run_id=run_id, |
| experiment_name=run_data['experiment_name'], |
| rank=rank + 1, |
| total_runs=len(metrics_data), |
| robustness_score=run_data['robustness_score'], |
| risk_score=run_data['risk_score'], |
| success_rate=run_data['success_rate'], |
| execution_time_ms=run_data['execution_time_ms'], |
| percentile_scores=percentiles, |
| is_best=(rank == 0), |
| is_worst=(rank == len(metrics_data) - 1), |
| performance_tier=tier |
| ) |
| |
| result.rankings.append(comparison) |
| |
| |
| if result.rankings: |
| result.best_run = result.rankings[0].run_id |
| result.worst_run = result.rankings[-1].run_id |
| |
| async def _calculate_deltas(self, experiments: List[Experiment], result: ComparisonResult): |
| """ |
| Calculate metric deltas between runs. |
| |
| Args: |
| experiments: List of experiments |
| result: Comparison result to update |
| """ |
| if not result.rankings: |
| return |
| |
| best_run = next(r for r in result.rankings if r.is_best) |
| worst_run = next(r for r in result.rankings if r.is_worst) |
| |
| |
| for run_comparison in result.rankings: |
| |
| for metric in ComparisonMetric: |
| if hasattr(best_run, metric.value): |
| best_value = getattr(best_run, metric.value) |
| current_value = getattr(run_comparison, metric.value) |
| |
| if best_value is not None and current_value is not None: |
| delta = current_value - best_value |
| delta_percent = (delta / best_value * 100) if best_value != 0 else 0 |
| |
| |
| improvement = self._is_improvement(metric, delta) |
| |
| metric_delta = MetricDelta( |
| metric=metric, |
| value_a=current_value, |
| value_b=best_value, |
| delta=delta, |
| delta_percent=delta_percent, |
| improvement=improvement |
| ) |
| |
| run_comparison.deltas_to_best[metric.value] = metric_delta |
| |
| |
| for metric in ComparisonMetric: |
| if hasattr(worst_run, metric.value): |
| worst_value = getattr(worst_run, metric.value) |
| current_value = getattr(run_comparison, metric.value) |
| |
| if worst_value is not None and current_value is not None: |
| delta = current_value - worst_value |
| delta_percent = (delta / worst_value * 100) if worst_value != 0 else 0 |
| |
| improvement = self._is_improvement(metric, delta) |
| |
| metric_delta = MetricDelta( |
| metric=metric, |
| value_a=current_value, |
| value_b=worst_value, |
| delta=delta, |
| delta_percent=delta_percent, |
| improvement=improvement |
| ) |
| |
| run_comparison.deltas_to_worst[metric.value] = metric_delta |
| |
| async def _generate_insights(self, experiments: List[Experiment], result: ComparisonResult): |
| """ |
| Generate performance insights and recommendations. |
| |
| Args: |
| experiments: List of experiments |
| result: Comparison result to update |
| """ |
| if not result.rankings: |
| return |
| |
| |
| metrics = ['robustness_score', 'risk_score', 'success_rate', 'execution_time_ms'] |
| |
| for metric in metrics: |
| values = [getattr(r, metric) for r in result.rankings if getattr(r, metric) is not None] |
| if values: |
| result.metric_averages[metric] = sum(values) / len(values) |
| result.metric_ranges[metric] = (min(values), max(values)) |
| |
| |
| best_run = next(r for r in result.rankings if r.is_best) |
| |
| for metric in ComparisonMetric: |
| if metric.value in ['risk_score', 'success_rate']: |
| avg_value = result.metric_averages.get(metric.value, 0) |
| best_value = getattr(best_run, metric.value, 0) |
| |
| if avg_value < best_value * 0.9: |
| result.improvement_opportunities.append( |
| f"Improve {metric.value.replace('_', ' ')}: " |
| f"average {avg_value:.3f} vs best {best_value:.3f}" |
| ) |
| |
| |
| for metric in ComparisonMetric: |
| if metric.value in result.metric_ranges: |
| min_val, max_val = result.metric_ranges[metric.value] |
| if min_val > 0: |
| variation = (max_val - min_val) / min_val |
| if variation > 0.5: |
| result.key_differences.append( |
| f"High variation in {metric.value.replace('_', ' ')}: " |
| f"{min_val:.3f} - {max_val:.3f} ({variation:.1%} range)" |
| ) |
| |
| |
| consistency_scores = [] |
| for metric in ['robustness_score', 'risk_score', 'success_rate']: |
| if metric in result.metric_ranges: |
| min_val, max_val = result.metric_ranges[metric] |
| avg_val = result.metric_averages.get(metric, 0) |
| if avg_val > 0: |
| consistency = 1 - ((max_val - min_val) / avg_val) |
| consistency_scores.append(max(0, consistency)) |
| |
| result.consistency_score = sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.0 |
| |
| async def _create_chart_data(self, experiments: List[Experiment], result: ComparisonResult): |
| """ |
| Create visualization-ready data for charts. |
| |
| Args: |
| experiments: List of experiments |
| result: Comparison result to update |
| """ |
| |
| radar_data = { |
| 'labels': ['Robustness', 'Low Risk', 'Success Rate', 'Confidence'], |
| 'datasets': [] |
| } |
| |
| for run_comp in result.rankings[:5]: |
| radar_data['datasets'].append({ |
| 'name': run_comp.experiment_name or run_comp.run_id[:8], |
| 'data': [ |
| run_comp.robustness_score, |
| 1 - run_comp.risk_score, |
| run_comp.success_rate, |
| run_comp.percentile_scores.get('confidence_score', 0) |
| ] |
| }) |
| |
| |
| bar_data = { |
| 'labels': [r.experiment_name or r.run_id[:8] for r in result.rankings], |
| 'metrics': { |
| 'robustness_score': [r.robustness_score for r in result.rankings], |
| 'risk_score': [r.risk_score for r in result.rankings], |
| 'success_rate': [r.success_rate for r in result.rankings] |
| } |
| } |
| |
| |
| timeline_data = { |
| 'dates': [exp.created_at.isoformat() for exp in experiments], |
| 'robustness_scores': [exp.result_summary.robustness_score for exp in experiments if exp.result_summary], |
| 'risk_scores': [exp.result_summary.risk_score for exp in experiments if exp.result_summary] |
| } |
| |
| result.chart_data = { |
| 'radar': radar_data, |
| 'bar': bar_data, |
| 'timeline': timeline_data |
| } |
| |
| def _normalize_metric(self, metric: str, value: float, all_data: List[Dict[str, Any]]) -> float: |
| """ |
| Normalize metric value to 0-1 scale. |
| |
| Args: |
| metric: Metric name |
| value: Value to normalize |
| all_data: All data points for scaling |
| |
| Returns: |
| float: Normalized value (0-1) |
| """ |
| values = [d[metric] for d in all_data if d[metric] is not None] |
| if not values: |
| return 0.0 |
| |
| min_val, max_val = min(values), max(values) |
| if max_val == min_val: |
| return 0.5 |
| |
| |
| if metric in ['risk_score', 'execution_time_ms']: |
| return 1 - (value - min_val) / (max_val - min_val) |
| else: |
| return (value - min_val) / (max_val - min_val) |
| |
| def _calculate_percentile(self, value: float, all_values: List[float]) -> float: |
| """ |
| Calculate percentile rank for a value. |
| |
| Args: |
| value: Value to calculate percentile for |
| all_values: All values in the dataset |
| |
| Returns: |
| float: Percentile (0-1) |
| """ |
| if not all_values: |
| return 0.0 |
| |
| sorted_values = sorted(all_values) |
| rank = sorted_values.index(value) if value in sorted_values else len(sorted_values) - 1 |
| return (rank + 1) / len(sorted_values) |
| |
| def _determine_performance_tier(self, rank: int, total: int) -> str: |
| """ |
| Determine performance tier based on rank. |
| |
| Args: |
| rank: Rank position (0-based) |
| total: Total number of runs |
| |
| Returns: |
| str: Performance tier |
| """ |
| percentile = (rank + 1) / total |
| |
| if percentile <= 0.25: |
| return "excellent" |
| elif percentile <= 0.5: |
| return "good" |
| elif percentile <= 0.75: |
| return "average" |
| else: |
| return "poor" |
| |
| def _is_improvement(self, metric: ComparisonMetric, delta: float) -> bool: |
| """ |
| Determine if delta represents an improvement. |
| |
| Args: |
| metric: Metric being compared |
| delta: Delta value (current - reference) |
| |
| Returns: |
| bool: True if improvement |
| """ |
| |
| if metric in [ |
| ComparisonMetric.ROBUSTNESS_SCORE, |
| ComparisonMetric.SUCCESS_RATE, |
| ComparisonMetric.CONFIDENCE_SCORE, |
| ComparisonMetric.TOTAL_ATTACKS, |
| ComparisonMetric.SUCCESSFUL_ATTACKS |
| ]: |
| return delta > 0 |
| |
| |
| return delta < 0 |
|
|
|
|
| |
| comparison_engine = ComparisonEngine() |
|
|
|
|
| async def get_comparison_engine() -> ComparisonEngine: |
| """ |
| Get the global comparison engine instance. |
| |
| Returns: |
| ComparisonEngine: Global instance |
| """ |
| return comparison_engine |
|
|