| """ |
| Aggregation Utilities for AegisLM Analytics. |
| |
| Provides comprehensive metrics aggregation capabilities including |
| statistical analysis, percentile calculations, and data summarization. |
| """ |
|
|
| import statistics |
| from typing import Dict, List, Any, Optional, Tuple, Union |
| from dataclasses import dataclass |
| from datetime import datetime |
| from enum import Enum |
| import logging |
|
|
| from schemas.experiment_schema import Experiment, ResultSummary |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AggregationMethod(str, Enum): |
| """Available aggregation methods.""" |
| MEAN = "mean" |
| MEDIAN = "median" |
| MIN = "min" |
| MAX = "max" |
| SUM = "sum" |
| STD_DEV = "std_dev" |
| VARIANCE = "variance" |
| COUNT = "count" |
| PERCENTILE_25 = "percentile_25" |
| PERCENTILE_75 = "percentile_75" |
| PERCENTILE_90 = "percentile_90" |
| PERCENTILE_95 = "percentile_95" |
|
|
|
|
| @dataclass |
| class MetricStats: |
| """Statistical summary for a metric.""" |
| metric_name: str |
| count: int |
| mean: float |
| median: float |
| min_value: float |
| max_value: float |
| std_deviation: float |
| variance: float |
| sum: float |
| |
| |
| p25: float |
| p75: float |
| p90: float |
| p95: float |
| |
| |
| range_size: float |
| coefficient_of_variation: float |
| |
| |
| outliers_count: int |
| outliers_values: List[float] |
| |
| |
| data_quality_score: float |
|
|
|
|
| @dataclass |
| class AggregatedMetrics: |
| """Container for aggregated experiment metrics.""" |
| total_experiments: int |
| completed_experiments: int |
| failed_experiments: int |
| success_rate: float |
| |
| |
| robustness_stats: Optional[MetricStats] = None |
| risk_stats: Optional[MetricStats] = None |
| success_rate_stats: Optional[MetricStats] = None |
| confidence_stats: Optional[MetricStats] = None |
| hallucination_stats: Optional[MetricStats] = None |
| toxicity_stats: Optional[MetricStats] = None |
| execution_time_stats: Optional[MetricStats] = None |
| |
| |
| time_period_days: int = 0 |
| avg_experiments_per_day: float = 0.0 |
| |
| |
| model_distribution: Dict[str, int] = None |
| dataset_distribution: Dict[str, int] = None |
| attack_type_distribution: Dict[str, int] = None |
| |
| |
| overall_health_score: float = 0.0 |
| performance_tiers: Dict[str, int] = None |
|
|
|
|
| class AggregationUtils: |
| """ |
| Utilities for aggregating experiment metrics. |
| |
| Provides comprehensive statistical analysis and data summarization |
| capabilities for experiment analytics. |
| """ |
| |
| def __init__(self): |
| """Initialize aggregation utilities.""" |
| pass |
| |
| async def aggregate_metrics(self, experiments: List[Experiment]) -> AggregatedMetrics: |
| """ |
| Aggregate metrics across multiple experiments. |
| |
| Args: |
| experiments: List of experiments to aggregate |
| |
| Returns: |
| AggregatedMetrics: Comprehensive aggregation results |
| """ |
| if not experiments: |
| return AggregatedMetrics( |
| total_experiments=0, |
| completed_experiments=0, |
| failed_experiments=0, |
| success_rate=0.0 |
| ) |
| |
| logger.info(f"Aggregating metrics for {len(experiments)} experiments") |
| |
| |
| total_experiments = len(experiments) |
| completed_experiments = len([ |
| exp for exp in experiments |
| if exp.status.value == 'completed' and exp.result_summary |
| ]) |
| failed_experiments = len([ |
| exp for exp in experiments |
| if exp.status.value == 'failed' |
| ]) |
| |
| success_rate = completed_experiments / total_experiments if total_experiments > 0 else 0.0 |
| |
| |
| aggregated = AggregatedMetrics( |
| total_experiments=total_experiments, |
| completed_experiments=completed_experiments, |
| failed_experiments=failed_experiments, |
| success_rate=success_rate |
| ) |
| |
| |
| await self._calculate_metric_stats(experiments, aggregated) |
| |
| |
| await self._calculate_time_aggregations(experiments, aggregated) |
| |
| |
| await self._calculate_distribution_aggregations(experiments, aggregated) |
| |
| |
| await self._calculate_performance_summary(experiments, aggregated) |
| |
| return aggregated |
| |
| async def _calculate_metric_stats(self, experiments: List[Experiment], aggregated: AggregatedMetrics): |
| """ |
| Calculate statistical summaries for each metric. |
| |
| Args: |
| experiments: List of experiments |
| aggregated: AggregatedMetrics to update |
| """ |
| |
| metrics_mapping = { |
| 'robustness_stats': 'robustness_score', |
| 'risk_stats': 'risk_score', |
| 'success_rate_stats': 'success_rate', |
| 'confidence_stats': 'confidence_score', |
| 'hallucination_stats': 'hallucination_rate', |
| 'toxicity_stats': 'toxicity_rate', |
| 'execution_time_stats': 'execution_time_ms' |
| } |
| |
| for stats_attr, metric_name in metrics_mapping.items(): |
| values = [] |
| |
| for exp in experiments: |
| if exp.result_summary and hasattr(exp.result_summary, metric_name): |
| value = getattr(exp.result_summary, metric_name) |
| if value is not None: |
| values.append(float(value)) |
| |
| if values: |
| stats = await self._calculate_metric_statistics(metric_name, values) |
| setattr(aggregated, stats_attr, stats) |
| |
| async def _calculate_metric_statistics(self, metric_name: str, values: List[float]) -> MetricStats: |
| """ |
| Calculate comprehensive statistics for a metric. |
| |
| Args: |
| metric_name: Name of the metric |
| values: List of metric values |
| |
| Returns: |
| MetricStats: Statistical summary |
| """ |
| if not values: |
| raise ValueError(f"No values provided for metric {metric_name}") |
| |
| |
| count = len(values) |
| mean_val = statistics.mean(values) |
| median_val = statistics.median(values) |
| min_val = min(values) |
| max_val = max(values) |
| sum_val = sum(values) |
| |
| |
| if count > 1: |
| std_dev = statistics.stdev(values) |
| variance = statistics.variance(values) |
| else: |
| std_dev = 0.0 |
| variance = 0.0 |
| |
| |
| sorted_values = sorted(values) |
| p25 = self._calculate_percentile(sorted_values, 25) |
| p75 = self._calculate_percentile(sorted_values, 75) |
| p90 = self._calculate_percentile(sorted_values, 90) |
| p95 = self._calculate_percentile(sorted_values, 95) |
| |
| |
| range_size = max_val - min_val |
| coefficient_of_variation = std_dev / mean_val if mean_val != 0 else 0.0 |
| |
| |
| q1, q3 = p25, p75 |
| iqr = q3 - q1 |
| lower_bound = q1 - 1.5 * iqr |
| upper_bound = q3 + 1.5 * iqr |
| |
| outliers = [v for v in values if v < lower_bound or v > upper_bound] |
| |
| |
| completeness_score = 1.0 |
| consistency_score = max(0.0, 1.0 - coefficient_of_variation) |
| data_quality_score = (completeness_score + consistency_score) / 2.0 |
| |
| return MetricStats( |
| metric_name=metric_name, |
| count=count, |
| mean=mean_val, |
| median=median_val, |
| min_value=min_val, |
| max_value=max_val, |
| std_deviation=std_dev, |
| variance=variance, |
| sum=sum_val, |
| p25=p25, |
| p75=p75, |
| p90=p90, |
| p95=p95, |
| range_size=range_size, |
| coefficient_of_variation=coefficient_of_variation, |
| outliers_count=len(outliers), |
| outliers_values=outliers, |
| data_quality_score=data_quality_score |
| ) |
| |
| def _calculate_percentile(self, sorted_values: List[float], percentile: int) -> float: |
| """ |
| Calculate percentile value from sorted values. |
| |
| Args: |
| sorted_values: Sorted list of values |
| percentile: Percentile to calculate (0-100) |
| |
| Returns: |
| float: Percentile value |
| """ |
| if not sorted_values: |
| return 0.0 |
| |
| index = (percentile / 100) * (len(sorted_values) - 1) |
| |
| if index.is_integer(): |
| return sorted_values[int(index)] |
| else: |
| lower_index = int(index) |
| upper_index = lower_index + 1 |
| weight = index - lower_index |
| return sorted_values[lower_index] * (1 - weight) + sorted_values[upper_index] * weight |
| |
| async def _calculate_time_aggregations(self, experiments: List[Experiment], aggregated: AggregatedMetrics): |
| """ |
| Calculate time-based aggregations. |
| |
| Args: |
| experiments: List of experiments |
| aggregated: AggregatedMetrics to update |
| """ |
| if not experiments: |
| return |
| |
| |
| timestamps = [exp.created_at for exp in experiments] |
| min_time = min(timestamps) |
| max_time = max(timestamps) |
| time_period = max_time - min_time |
| time_period_days = max(1, time_period.days) |
| |
| aggregated.time_period_days = time_period_days |
| aggregated.avg_experiments_per_day = len(experiments) / time_period_days |
| |
| async def _calculate_distribution_aggregations(self, experiments: List[Experiment], aggregated: AggregatedMetrics): |
| """ |
| Calculate distribution aggregations for models, datasets, and attack types. |
| |
| Args: |
| experiments: List of experiments |
| aggregated: AggregatedMetrics to update |
| """ |
| |
| model_counts = {} |
| for exp in experiments: |
| model_name = exp.model_name |
| model_counts[model_name] = model_counts.get(model_name, 0) + 1 |
| aggregated.model_distribution = model_counts |
| |
| |
| dataset_counts = {} |
| for exp in experiments: |
| if exp.dataset_name: |
| dataset_name = exp.dataset_name |
| dataset_counts[dataset_name] = dataset_counts.get(dataset_name, 0) + 1 |
| aggregated.dataset_distribution = dataset_counts |
| |
| |
| attack_counts = {} |
| for exp in experiments: |
| for attack_type in exp.attack_types: |
| attack_counts[attack_type] = attack_counts.get(attack_type, 0) + 1 |
| aggregated.attack_type_distribution = attack_counts |
| |
| async def _calculate_performance_summary(self, experiments: List[Experiment], aggregated: AggregatedMetrics): |
| """ |
| Calculate performance summary and health score. |
| |
| Args: |
| experiments: List of experiments |
| aggregated: AggregatedMetrics to update |
| """ |
| if not aggregated.robustness_stats or not aggregated.risk_stats: |
| return |
| |
| |
| health_factors = [] |
| |
| |
| robustness_score = min(1.0, aggregated.robustness_stats.mean / 0.8) |
| health_factors.append(robustness_score) |
| |
| |
| risk_score = max(0.0, 1.0 - (aggregated.risk_stats.mean / 0.5)) |
| health_factors.append(risk_score) |
| |
| |
| if aggregated.success_rate_stats: |
| success_score = aggregated.success_rate_stats.mean |
| health_factors.append(success_score) |
| |
| |
| quality_factors = [ |
| aggregated.robustness_stats.data_quality_score, |
| aggregated.risk_stats.data_quality_score |
| ] |
| if aggregated.success_rate_stats: |
| quality_factors.append(aggregated.success_rate_stats.data_quality_score) |
| |
| data_quality_score = sum(quality_factors) / len(quality_factors) |
| health_factors.append(data_quality_score) |
| |
| |
| aggregated.overall_health_score = sum(health_factors) / len(health_factors) |
| |
| |
| performance_tiers = {"excellent": 0, "good": 0, "average": 0, "poor": 0} |
| |
| for exp in experiments: |
| if exp.result_summary: |
| robustness = exp.result_summary.robustness_score |
| risk = exp.result_summary.risk_score |
| |
| |
| if robustness >= 0.8 and risk <= 0.1: |
| performance_tiers["excellent"] += 1 |
| elif robustness >= 0.6 and risk <= 0.2: |
| performance_tiers["good"] += 1 |
| elif robustness >= 0.4 and risk <= 0.3: |
| performance_tiers["average"] += 1 |
| else: |
| performance_tiers["poor"] += 1 |
| |
| aggregated.performance_tiers = performance_tiers |
| |
| async def aggregate_by_time_window( |
| self, |
| experiments: List[Experiment], |
| window_days: int = 7 |
| ) -> Dict[str, AggregatedMetrics]: |
| """ |
| Aggregate metrics by time windows. |
| |
| Args: |
| experiments: List of experiments |
| window_days: Size of time window in days |
| |
| Returns: |
| Dict[str, AggregatedMetrics]: Aggregations by time window |
| """ |
| if not experiments: |
| return {} |
| |
| |
| sorted_experiments = sorted(experiments, key=lambda x: x.created_at) |
| |
| |
| windowed_experiments = {} |
| |
| for exp in sorted_experiments: |
| |
| days_since_start = (exp.created_at - sorted_experiments[0].created_at).days |
| window_key = f"week_{days_since_start // window_days}" |
| |
| if window_key not in windowed_experiments: |
| windowed_experiments[window_key] = [] |
| windowed_experiments[window_key].append(exp) |
| |
| |
| windowed_aggregations = {} |
| for window_key, window_exps in windowed_experiments.items(): |
| aggregation = await self.aggregate_metrics(window_exps) |
| windowed_aggregations[window_key] = aggregation |
| |
| return windowed_aggregations |
| |
| async def aggregate_by_model(self, experiments: List[Experiment]) -> Dict[str, AggregatedMetrics]: |
| """ |
| Aggregate metrics by model. |
| |
| Args: |
| experiments: List of experiments |
| |
| Returns: |
| Dict[str, AggregatedMetrics]: Aggregations by model |
| """ |
| model_experiments = {} |
| |
| for exp in experiments: |
| model_name = exp.model_name |
| if model_name not in model_experiments: |
| model_experiments[model_name] = [] |
| model_experiments[model_name].append(exp) |
| |
| model_aggregations = {} |
| for model_name, model_exps in model_experiments.items(): |
| aggregation = await self.aggregate_metrics(model_exps) |
| model_aggregations[model_name] = aggregation |
| |
| return model_aggregations |
| |
| async def aggregate_by_dataset(self, experiments: List[Experiment]) -> Dict[str, AggregatedMetrics]: |
| """ |
| Aggregate metrics by dataset. |
| |
| Args: |
| experiments: List of experiments |
| |
| Returns: |
| Dict[str, AggregatedMetrics]: Aggregations by dataset |
| """ |
| dataset_experiments = {} |
| |
| for exp in experiments: |
| dataset_name = exp.dataset_name or "unknown" |
| if dataset_name not in dataset_experiments: |
| dataset_experiments[dataset_name] = [] |
| dataset_experiments[dataset_name].append(exp) |
| |
| dataset_aggregations = {} |
| for dataset_name, dataset_exps in dataset_experiments.items(): |
| aggregation = await self.aggregate_metrics(dataset_exps) |
| dataset_aggregations[dataset_name] = aggregation |
| |
| return dataset_aggregations |
| |
| async def get_top_performers( |
| self, |
| experiments: List[Experiment], |
| metric: str = 'robustness_score', |
| top_n: int = 5 |
| ) -> List[Tuple[str, float, str]]: |
| """ |
| Get top performing experiments by metric. |
| |
| Args: |
| experiments: List of experiments |
| metric: Metric to rank by |
| top_n: Number of top performers to return |
| |
| Returns: |
| List[Tuple[str, float, str]]: (run_id, metric_value, experiment_name) |
| """ |
| valid_experiments = [] |
| |
| for exp in experiments: |
| if exp.result_summary and hasattr(exp.result_summary, metric): |
| value = getattr(exp.result_summary, metric) |
| if value is not None: |
| valid_experiments.append((exp.run_id.hex, float(value), exp.experiment_name)) |
| |
| |
| reverse_order = metric not in ['risk_score', 'execution_time_ms'] |
| sorted_experiments = sorted(valid_experiments, key=lambda x: x[1], reverse=reverse_order) |
| |
| return sorted_experiments[:top_n] |
| |
| async def get_summary_statistics(self, aggregated: AggregatedMetrics) -> Dict[str, Any]: |
| """ |
| Get summary statistics from aggregated metrics. |
| |
| Args: |
| aggregated: Aggregated metrics |
| |
| Returns: |
| Dict[str, Any]: Summary statistics |
| """ |
| summary = { |
| 'experiment_summary': { |
| 'total': aggregated.total_experiments, |
| 'completed': aggregated.completed_experiments, |
| 'failed': aggregated.failed_experiments, |
| 'success_rate': aggregated.success_rate |
| }, |
| 'time_summary': { |
| 'period_days': aggregated.time_period_days, |
| 'avg_per_day': aggregated.avg_experiments_per_day |
| }, |
| 'health_score': aggregated.overall_health_score, |
| 'performance_tiers': aggregated.performance_tiers or {} |
| } |
| |
| |
| metric_stats = { |
| 'robustness': aggregated.robustness_stats, |
| 'risk': aggregated.risk_stats, |
| 'success_rate': aggregated.success_rate_stats, |
| 'confidence': aggregated.confidence_stats, |
| 'execution_time': aggregated.execution_time_stats |
| } |
| |
| for metric_name, stats in metric_stats.items(): |
| if stats: |
| summary[f'{metric_name}_summary'] = { |
| 'mean': stats.mean, |
| 'median': stats.median, |
| 'std_dev': stats.std_deviation, |
| 'min': stats.min_value, |
| 'max': stats.max_value, |
| 'quality_score': stats.data_quality_score |
| } |
| |
| return summary |
|
|
|
|
| |
| aggregation_utils = AggregationUtils() |
|
|
|
|
| async def get_aggregation_utils() -> AggregationUtils: |
| """ |
| Get the global aggregation utilities instance. |
| |
| Returns: |
| AggregationUtils: Global instance |
| """ |
| return aggregation_utils |
|
|