ALM-2 / backend /analytics /aggregation_utils.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Aggregation Utilities for AegisLM Analytics.
Provides comprehensive metrics aggregation capabilities including
statistical analysis, percentile calculations, and data summarization.
"""
import statistics
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import logging
from schemas.experiment_schema import Experiment, ResultSummary
logger = logging.getLogger(__name__)
class AggregationMethod(str, Enum):
"""Available aggregation methods."""
MEAN = "mean"
MEDIAN = "median"
MIN = "min"
MAX = "max"
SUM = "sum"
STD_DEV = "std_dev"
VARIANCE = "variance"
COUNT = "count"
PERCENTILE_25 = "percentile_25"
PERCENTILE_75 = "percentile_75"
PERCENTILE_90 = "percentile_90"
PERCENTILE_95 = "percentile_95"
@dataclass
class MetricStats:
"""Statistical summary for a metric."""
metric_name: str
count: int
mean: float
median: float
min_value: float
max_value: float
std_deviation: float
variance: float
sum: float
# Percentiles
p25: float
p75: float
p90: float
p95: float
# Additional statistics
range_size: float
coefficient_of_variation: float
# Distribution info
outliers_count: int
outliers_values: List[float]
# Quality indicators
data_quality_score: float # 0-1 based on completeness and consistency
@dataclass
class AggregatedMetrics:
"""Container for aggregated experiment metrics."""
total_experiments: int
completed_experiments: int
failed_experiments: int
success_rate: float
# Metric statistics
robustness_stats: Optional[MetricStats] = None
risk_stats: Optional[MetricStats] = None
success_rate_stats: Optional[MetricStats] = None
confidence_stats: Optional[MetricStats] = None
hallucination_stats: Optional[MetricStats] = None
toxicity_stats: Optional[MetricStats] = None
execution_time_stats: Optional[MetricStats] = None
# Time-based aggregations
time_period_days: int = 0
avg_experiments_per_day: float = 0.0
# Model and dataset aggregations
model_distribution: Dict[str, int] = None
dataset_distribution: Dict[str, int] = None
attack_type_distribution: Dict[str, int] = None
# Performance summary
overall_health_score: float = 0.0
performance_tiers: Dict[str, int] = None # excellent, good, average, poor
class AggregationUtils:
"""
Utilities for aggregating experiment metrics.
Provides comprehensive statistical analysis and data summarization
capabilities for experiment analytics.
"""
def __init__(self):
"""Initialize aggregation utilities."""
pass
async def aggregate_metrics(self, experiments: List[Experiment]) -> AggregatedMetrics:
"""
Aggregate metrics across multiple experiments.
Args:
experiments: List of experiments to aggregate
Returns:
AggregatedMetrics: Comprehensive aggregation results
"""
if not experiments:
return AggregatedMetrics(
total_experiments=0,
completed_experiments=0,
failed_experiments=0,
success_rate=0.0
)
logger.info(f"Aggregating metrics for {len(experiments)} experiments")
# Basic counts
total_experiments = len(experiments)
completed_experiments = len([
exp for exp in experiments
if exp.status.value == 'completed' and exp.result_summary
])
failed_experiments = len([
exp for exp in experiments
if exp.status.value == 'failed'
])
success_rate = completed_experiments / total_experiments if total_experiments > 0 else 0.0
# Create aggregated metrics
aggregated = AggregatedMetrics(
total_experiments=total_experiments,
completed_experiments=completed_experiments,
failed_experiments=failed_experiments,
success_rate=success_rate
)
# Calculate metric statistics
await self._calculate_metric_stats(experiments, aggregated)
# Calculate time-based aggregations
await self._calculate_time_aggregations(experiments, aggregated)
# Calculate distribution aggregations
await self._calculate_distribution_aggregations(experiments, aggregated)
# Calculate performance summary
await self._calculate_performance_summary(experiments, aggregated)
return aggregated
async def _calculate_metric_stats(self, experiments: List[Experiment], aggregated: AggregatedMetrics):
"""
Calculate statistical summaries for each metric.
Args:
experiments: List of experiments
aggregated: AggregatedMetrics to update
"""
# Extract metric values
metrics_mapping = {
'robustness_stats': 'robustness_score',
'risk_stats': 'risk_score',
'success_rate_stats': 'success_rate',
'confidence_stats': 'confidence_score',
'hallucination_stats': 'hallucination_rate',
'toxicity_stats': 'toxicity_rate',
'execution_time_stats': 'execution_time_ms'
}
for stats_attr, metric_name in metrics_mapping.items():
values = []
for exp in experiments:
if exp.result_summary and hasattr(exp.result_summary, metric_name):
value = getattr(exp.result_summary, metric_name)
if value is not None:
values.append(float(value))
if values:
stats = await self._calculate_metric_statistics(metric_name, values)
setattr(aggregated, stats_attr, stats)
async def _calculate_metric_statistics(self, metric_name: str, values: List[float]) -> MetricStats:
"""
Calculate comprehensive statistics for a metric.
Args:
metric_name: Name of the metric
values: List of metric values
Returns:
MetricStats: Statistical summary
"""
if not values:
raise ValueError(f"No values provided for metric {metric_name}")
# Basic statistics
count = len(values)
mean_val = statistics.mean(values)
median_val = statistics.median(values)
min_val = min(values)
max_val = max(values)
sum_val = sum(values)
# Standard deviation and variance
if count > 1:
std_dev = statistics.stdev(values)
variance = statistics.variance(values)
else:
std_dev = 0.0
variance = 0.0
# Percentiles
sorted_values = sorted(values)
p25 = self._calculate_percentile(sorted_values, 25)
p75 = self._calculate_percentile(sorted_values, 75)
p90 = self._calculate_percentile(sorted_values, 90)
p95 = self._calculate_percentile(sorted_values, 95)
# Additional statistics
range_size = max_val - min_val
coefficient_of_variation = std_dev / mean_val if mean_val != 0 else 0.0
# Outlier detection using IQR method
q1, q3 = p25, p75
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = [v for v in values if v < lower_bound or v > upper_bound]
# Data quality score based on completeness and consistency
completeness_score = 1.0 # All values are present
consistency_score = max(0.0, 1.0 - coefficient_of_variation) # Lower variation = higher consistency
data_quality_score = (completeness_score + consistency_score) / 2.0
return MetricStats(
metric_name=metric_name,
count=count,
mean=mean_val,
median=median_val,
min_value=min_val,
max_value=max_val,
std_deviation=std_dev,
variance=variance,
sum=sum_val,
p25=p25,
p75=p75,
p90=p90,
p95=p95,
range_size=range_size,
coefficient_of_variation=coefficient_of_variation,
outliers_count=len(outliers),
outliers_values=outliers,
data_quality_score=data_quality_score
)
def _calculate_percentile(self, sorted_values: List[float], percentile: int) -> float:
"""
Calculate percentile value from sorted values.
Args:
sorted_values: Sorted list of values
percentile: Percentile to calculate (0-100)
Returns:
float: Percentile value
"""
if not sorted_values:
return 0.0
index = (percentile / 100) * (len(sorted_values) - 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower_index = int(index)
upper_index = lower_index + 1
weight = index - lower_index
return sorted_values[lower_index] * (1 - weight) + sorted_values[upper_index] * weight
async def _calculate_time_aggregations(self, experiments: List[Experiment], aggregated: AggregatedMetrics):
"""
Calculate time-based aggregations.
Args:
experiments: List of experiments
aggregated: AggregatedMetrics to update
"""
if not experiments:
return
# Calculate time period
timestamps = [exp.created_at for exp in experiments]
min_time = min(timestamps)
max_time = max(timestamps)
time_period = max_time - min_time
time_period_days = max(1, time_period.days)
aggregated.time_period_days = time_period_days
aggregated.avg_experiments_per_day = len(experiments) / time_period_days
async def _calculate_distribution_aggregations(self, experiments: List[Experiment], aggregated: AggregatedMetrics):
"""
Calculate distribution aggregations for models, datasets, and attack types.
Args:
experiments: List of experiments
aggregated: AggregatedMetrics to update
"""
# Model distribution
model_counts = {}
for exp in experiments:
model_name = exp.model_name
model_counts[model_name] = model_counts.get(model_name, 0) + 1
aggregated.model_distribution = model_counts
# Dataset distribution
dataset_counts = {}
for exp in experiments:
if exp.dataset_name:
dataset_name = exp.dataset_name
dataset_counts[dataset_name] = dataset_counts.get(dataset_name, 0) + 1
aggregated.dataset_distribution = dataset_counts
# Attack type distribution
attack_counts = {}
for exp in experiments:
for attack_type in exp.attack_types:
attack_counts[attack_type] = attack_counts.get(attack_type, 0) + 1
aggregated.attack_type_distribution = attack_counts
async def _calculate_performance_summary(self, experiments: List[Experiment], aggregated: AggregatedMetrics):
"""
Calculate performance summary and health score.
Args:
experiments: List of experiments
aggregated: AggregatedMetrics to update
"""
if not aggregated.robustness_stats or not aggregated.risk_stats:
return
# Calculate health score based on key metrics
health_factors = []
# Robustness (higher is better)
robustness_score = min(1.0, aggregated.robustness_stats.mean / 0.8) # Normalize to 0.8 as excellent
health_factors.append(robustness_score)
# Risk (lower is better)
risk_score = max(0.0, 1.0 - (aggregated.risk_stats.mean / 0.5)) # Normalize to 0.5 as high risk
health_factors.append(risk_score)
# Success rate (higher is better)
if aggregated.success_rate_stats:
success_score = aggregated.success_rate_stats.mean
health_factors.append(success_score)
# Data quality
quality_factors = [
aggregated.robustness_stats.data_quality_score,
aggregated.risk_stats.data_quality_score
]
if aggregated.success_rate_stats:
quality_factors.append(aggregated.success_rate_stats.data_quality_score)
data_quality_score = sum(quality_factors) / len(quality_factors)
health_factors.append(data_quality_score)
# Overall health score
aggregated.overall_health_score = sum(health_factors) / len(health_factors)
# Calculate performance tiers
performance_tiers = {"excellent": 0, "good": 0, "average": 0, "poor": 0}
for exp in experiments:
if exp.result_summary:
robustness = exp.result_summary.robustness_score
risk = exp.result_summary.risk_score
# Simple tier classification
if robustness >= 0.8 and risk <= 0.1:
performance_tiers["excellent"] += 1
elif robustness >= 0.6 and risk <= 0.2:
performance_tiers["good"] += 1
elif robustness >= 0.4 and risk <= 0.3:
performance_tiers["average"] += 1
else:
performance_tiers["poor"] += 1
aggregated.performance_tiers = performance_tiers
async def aggregate_by_time_window(
self,
experiments: List[Experiment],
window_days: int = 7
) -> Dict[str, AggregatedMetrics]:
"""
Aggregate metrics by time windows.
Args:
experiments: List of experiments
window_days: Size of time window in days
Returns:
Dict[str, AggregatedMetrics]: Aggregations by time window
"""
if not experiments:
return {}
# Sort experiments by creation time
sorted_experiments = sorted(experiments, key=lambda x: x.created_at)
# Group by time windows
windowed_experiments = {}
for exp in sorted_experiments:
# Calculate window key
days_since_start = (exp.created_at - sorted_experiments[0].created_at).days
window_key = f"week_{days_since_start // window_days}"
if window_key not in windowed_experiments:
windowed_experiments[window_key] = []
windowed_experiments[window_key].append(exp)
# Aggregate each window
windowed_aggregations = {}
for window_key, window_exps in windowed_experiments.items():
aggregation = await self.aggregate_metrics(window_exps)
windowed_aggregations[window_key] = aggregation
return windowed_aggregations
async def aggregate_by_model(self, experiments: List[Experiment]) -> Dict[str, AggregatedMetrics]:
"""
Aggregate metrics by model.
Args:
experiments: List of experiments
Returns:
Dict[str, AggregatedMetrics]: Aggregations by model
"""
model_experiments = {}
for exp in experiments:
model_name = exp.model_name
if model_name not in model_experiments:
model_experiments[model_name] = []
model_experiments[model_name].append(exp)
model_aggregations = {}
for model_name, model_exps in model_experiments.items():
aggregation = await self.aggregate_metrics(model_exps)
model_aggregations[model_name] = aggregation
return model_aggregations
async def aggregate_by_dataset(self, experiments: List[Experiment]) -> Dict[str, AggregatedMetrics]:
"""
Aggregate metrics by dataset.
Args:
experiments: List of experiments
Returns:
Dict[str, AggregatedMetrics]: Aggregations by dataset
"""
dataset_experiments = {}
for exp in experiments:
dataset_name = exp.dataset_name or "unknown"
if dataset_name not in dataset_experiments:
dataset_experiments[dataset_name] = []
dataset_experiments[dataset_name].append(exp)
dataset_aggregations = {}
for dataset_name, dataset_exps in dataset_experiments.items():
aggregation = await self.aggregate_metrics(dataset_exps)
dataset_aggregations[dataset_name] = aggregation
return dataset_aggregations
async def get_top_performers(
self,
experiments: List[Experiment],
metric: str = 'robustness_score',
top_n: int = 5
) -> List[Tuple[str, float, str]]:
"""
Get top performing experiments by metric.
Args:
experiments: List of experiments
metric: Metric to rank by
top_n: Number of top performers to return
Returns:
List[Tuple[str, float, str]]: (run_id, metric_value, experiment_name)
"""
valid_experiments = []
for exp in experiments:
if exp.result_summary and hasattr(exp.result_summary, metric):
value = getattr(exp.result_summary, metric)
if value is not None:
valid_experiments.append((exp.run_id.hex, float(value), exp.experiment_name))
# Sort by metric value (descending for most metrics)
reverse_order = metric not in ['risk_score', 'execution_time_ms']
sorted_experiments = sorted(valid_experiments, key=lambda x: x[1], reverse=reverse_order)
return sorted_experiments[:top_n]
async def get_summary_statistics(self, aggregated: AggregatedMetrics) -> Dict[str, Any]:
"""
Get summary statistics from aggregated metrics.
Args:
aggregated: Aggregated metrics
Returns:
Dict[str, Any]: Summary statistics
"""
summary = {
'experiment_summary': {
'total': aggregated.total_experiments,
'completed': aggregated.completed_experiments,
'failed': aggregated.failed_experiments,
'success_rate': aggregated.success_rate
},
'time_summary': {
'period_days': aggregated.time_period_days,
'avg_per_day': aggregated.avg_experiments_per_day
},
'health_score': aggregated.overall_health_score,
'performance_tiers': aggregated.performance_tiers or {}
}
# Add metric summaries
metric_stats = {
'robustness': aggregated.robustness_stats,
'risk': aggregated.risk_stats,
'success_rate': aggregated.success_rate_stats,
'confidence': aggregated.confidence_stats,
'execution_time': aggregated.execution_time_stats
}
for metric_name, stats in metric_stats.items():
if stats:
summary[f'{metric_name}_summary'] = {
'mean': stats.mean,
'median': stats.median,
'std_dev': stats.std_deviation,
'min': stats.min_value,
'max': stats.max_value,
'quality_score': stats.data_quality_score
}
return summary
# Global aggregation utilities instance
aggregation_utils = AggregationUtils()
async def get_aggregation_utils() -> AggregationUtils:
"""
Get the global aggregation utilities instance.
Returns:
AggregationUtils: Global instance
"""
return aggregation_utils