ALM-2 / backend /experiments /experiment_utils.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Experiment Utilities for AegisLM Framework
Helper functions for experiment tracking, analysis, and management.
"""
import uuid
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
import sys
# Add parent directory to path for imports
current_dir = Path(__file__).parent
backend_dir = current_dir.parent
if str(backend_dir) not in sys.path:
sys.path.insert(0, str(backend_dir))
from schemas.experiment_schema import (
Experiment, ExperimentStatus, ConfigSnapshot, ResultSummary,
ExperimentFilter, ExperimentComparison
)
from experiments.experiment_manager import get_experiment_manager
class ExperimentAnalyzer:
"""
Utility class for analyzing experiments and generating insights.
"""
def __init__(self):
"""Initialize experiment analyzer."""
self.manager = get_experiment_manager()
def analyze_performance_trends(self, model_name: str, days: int = 30) -> Dict[str, Any]:
"""
Analyze performance trends for a specific model.
Args:
model_name: Model name to analyze
days: Number of days to look back
Returns:
Performance trend analysis
"""
# Get experiments for the model
filters = ExperimentFilter(
model_name=model_name,
created_after=datetime.utcnow() - timedelta(days=days),
page_size=1000
)
experiments = self.manager.list_experiments(filters)
# Calculate trends
completed_experiments = [exp for exp in experiments.experiments
if exp.status == ExperimentStatus.COMPLETED and exp.result_summary]
if not completed_experiments:
return {"error": "No completed experiments found for analysis"}
# Sort by creation date
completed_experiments.sort(key=lambda x: x.created_at)
# Calculate metrics over time
robustness_scores = []
risk_scores = []
success_rates = []
execution_times = []
timestamps = []
for exp in completed_experiments:
robustness_scores.append(exp.result_summary.robustness_score)
risk_scores.append(exp.result_summary.risk_score)
success_rates.append(exp.result_summary.success_rate)
execution_times.append(exp.result_summary.execution_time_ms or 0)
timestamps.append(exp.created_at)
# Calculate trends
robustness_trend = self._calculate_trend(robustness_scores)
risk_trend = self._calculate_trend(risk_scores)
success_rate_trend = self._calculate_trend(success_rates)
execution_time_trend = self._calculate_trend(execution_times)
return {
"model_name": model_name,
"analysis_period_days": days,
"total_experiments": len(completed_experiments),
"trends": {
"robustness_score": robustness_trend,
"risk_score": risk_trend,
"success_rate": success_rate_trend,
"execution_time_ms": execution_time_trend
},
"current_performance": {
"robustness_score": robustness_scores[-1] if robustness_scores else 0,
"risk_score": risk_scores[-1] if risk_scores else 0,
"success_rate": success_rates[-1] if success_rates else 0,
"execution_time_ms": execution_times[-1] if execution_times else 0
},
"performance_history": [
{
"timestamp": timestamps[i].isoformat(),
"robustness_score": robustness_scores[i],
"risk_score": risk_scores[i],
"success_rate": success_rates[i],
"execution_time_ms": execution_times[i]
}
for i in range(len(completed_experiments))
]
}
def compare_model_performance(self, model_names: List[str]) -> Dict[str, Any]:
"""
Compare performance across multiple models.
Args:
model_names: List of model names to compare
Returns:
Model performance comparison
"""
model_stats = {}
for model_name in model_names:
# Get latest completed experiments for model
filters = ExperimentFilter(
model_name=model_name,
status=ExperimentStatus.COMPLETED,
page_size=100
)
experiments = self.manager.list_experiments(filters)
completed_experiments = [exp for exp in experiments.experiments
if exp.result_summary]
if completed_experiments:
# Calculate average metrics
robustness_scores = [exp.result_summary.robustness_score for exp in completed_experiments]
risk_scores = [exp.result_summary.risk_score for exp in completed_experiments]
success_rates = [exp.result_summary.success_rate for exp in completed_experiments]
model_stats[model_name] = {
"experiment_count": len(completed_experiments),
"avg_robustness_score": sum(robustness_scores) / len(robustness_scores),
"avg_risk_score": sum(risk_scores) / len(risk_scores),
"avg_success_rate": sum(success_rates) / len(success_rates),
"latest_robustness_score": robustness_scores[-1],
"latest_risk_score": risk_scores[-1],
"latest_success_rate": success_rates[-1]
}
else:
model_stats[model_name] = {
"experiment_count": 0,
"error": "No completed experiments found"
}
# Determine best performing model
best_model = None
best_score = -1
for model_name, stats in model_stats.items():
if "error" not in stats:
# Composite score (higher robustness and lower risk is better)
composite_score = stats["avg_robustness_score"] - stats["avg_risk_score"]
if composite_score > best_score:
best_score = composite_score
best_model = model_name
return {
"models_compared": model_names,
"model_stats": model_stats,
"best_performing_model": best_model,
"comparison_timestamp": datetime.utcnow().isoformat()
}
def analyze_attack_effectiveness(self, attack_types: List[str]) -> Dict[str, Any]:
"""
Analyze effectiveness of different attack types.
Args:
attack_types: List of attack types to analyze
Returns:
Attack effectiveness analysis
"""
attack_stats = {}
for attack_type in attack_types:
# Get experiments using this attack type
filters = ExperimentFilter(
attack_types=[attack_type],
status=ExperimentStatus.COMPLETED,
page_size=1000
)
experiments = self.manager.list_experiments(filters)
completed_experiments = [exp for exp in experiments.experiments
if exp.result_summary]
if completed_experiments:
success_rates = [exp.result_summary.success_rate for exp in completed_experiments]
total_attacks = [exp.result_summary.total_attacks for exp in completed_experiments]
successful_attacks = [exp.result_summary.successful_attacks for exp in completed_experiments]
attack_stats[attack_type] = {
"experiment_count": len(completed_experiments),
"avg_success_rate": sum(success_rates) / len(success_rates),
"total_attacks": sum(total_attacks),
"total_successful_attacks": sum(successful_attacks),
"overall_success_rate": sum(successful_attacks) / sum(total_attacks) if sum(total_attacks) > 0 else 0,
"max_success_rate": max(success_rates),
"min_success_rate": min(success_rates)
}
else:
attack_stats[attack_type] = {
"experiment_count": 0,
"error": "No completed experiments found"
}
# Sort by effectiveness
sorted_attacks = sorted(
[(attack_type, stats.get("overall_success_rate", 0))
for attack_type, stats in attack_stats.items()
if "error" not in stats],
key=lambda x: x[1],
reverse=True
)
return {
"attack_types_analyzed": attack_types,
"attack_stats": attack_stats,
"most_effective_attack": sorted_attacks[0][0] if sorted_attacks else None,
"least_effective_attack": sorted_attacks[-1][0] if sorted_attacks else None,
"analysis_timestamp": datetime.utcnow().isoformat()
}
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]:
"""
Calculate trend statistics for a list of values.
Args:
values: List of numeric values
Returns:
Trend statistics
"""
if len(values) < 2:
return {"trend": "insufficient_data"}
# Simple linear regression for trend
n = len(values)
x = list(range(n))
# Calculate slope (trend)
x_mean = sum(x) / n
y_mean = sum(values) / n
numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
slope = numerator / denominator if denominator != 0 else 0
# Determine trend direction
if abs(slope) < 0.01:
trend_direction = "stable"
elif slope > 0:
trend_direction = "improving"
else:
trend_direction = "declining"
# Calculate change percentage
if values[0] != 0:
change_percentage = ((values[-1] - values[0]) / values[0]) * 100
else:
change_percentage = 0
return {
"trend": trend_direction,
"slope": slope,
"change_percentage": change_percentage,
"start_value": values[0],
"end_value": values[-1],
"min_value": min(values),
"max_value": max(values)
}
class ExperimentExporter:
"""
Utility class for exporting experiment data.
"""
def __init__(self):
"""Initialize experiment exporter."""
self.manager = get_experiment_manager()
def export_experiment_to_dict(self, run_id: str) -> Optional[Dict[str, Any]]:
"""
Export experiment to dictionary format.
Args:
run_id: Experiment run ID
Returns:
Experiment data as dictionary
"""
experiment = self.manager.get_experiment(run_id)
if not experiment:
return None
return experiment.dict()
def export_experiments_to_csv(self, filters: Optional[ExperimentFilter] = None) -> str:
"""
Export experiments to CSV format.
Args:
filters: Optional filters to apply
Returns:
CSV string of experiments
"""
import csv
import io
experiments = self.manager.list_experiments(filters or ExperimentFilter())
if not experiments.experiments:
return "No experiments found"
# Create CSV
output = io.StringIO()
writer = csv.writer(output)
# Write header
header = [
"run_id", "experiment_name", "model_name", "dataset_name",
"dataset_version", "status", "priority", "created_at",
"completed_at", "robustness_score", "risk_score",
"success_rate", "total_attacks", "execution_time_ms"
]
writer.writerow(header)
# Write data
for exp in experiments.experiments:
row = [
str(exp.run_id),
exp.experiment_name or "",
exp.model_name,
exp.dataset_name or "",
exp.dataset_version or "",
exp.status.value,
exp.priority.value,
exp.created_at.isoformat(),
exp.completed_at.isoformat() if exp.completed_at else "",
exp.result_summary.robustness_score if exp.result_summary else "",
exp.result_summary.risk_score if exp.result_summary else "",
exp.result_summary.success_rate if exp.result_summary else "",
exp.result_summary.total_attacks if exp.result_summary else "",
exp.result_summary.execution_time_ms if exp.result_summary else ""
]
writer.writerow(row)
return output.getvalue()
def export_experiment_summary(self, run_id: str) -> Optional[Dict[str, Any]]:
"""
Export experiment summary for reporting.
Args:
run_id: Experiment run ID
Returns:
Experiment summary
"""
experiment = self.manager.get_experiment(run_id)
if not experiment:
return None
summary = {
"experiment_info": {
"run_id": str(experiment.run_id),
"experiment_name": experiment.experiment_name,
"description": experiment.description,
"status": experiment.status.value,
"priority": experiment.priority.value,
"created_at": experiment.created_at.isoformat(),
"started_at": experiment.started_at.isoformat() if experiment.started_at else None,
"completed_at": experiment.completed_at.isoformat() if experiment.completed_at else None,
"created_by": experiment.created_by,
"tags": experiment.tags
},
"configuration": {
"model_name": experiment.config_snapshot.model_name,
"dataset_name": experiment.config_snapshot.dataset_name,
"dataset_version": experiment.config_snapshot.dataset_version,
"attack_types": experiment.config_snapshot.attack_types,
"prompt_count": experiment.config_snapshot.prompt_count,
"max_iterations": experiment.config_snapshot.max_iterations,
"mutation_enabled": experiment.config_snapshot.mutation_enabled
},
"results": None,
"error": None
}
if experiment.result_summary:
summary["results"] = {
"robustness_score": experiment.result_summary.robustness_score,
"risk_score": experiment.result_summary.risk_score,
"success_rate": experiment.result_summary.success_rate,
"hallucination_rate": experiment.result_summary.hallucination_rate,
"toxicity_rate": experiment.result_summary.toxicity_rate,
"confidence_score": experiment.result_summary.confidence_score,
"total_attacks": experiment.result_summary.total_attacks,
"successful_attacks": experiment.result_summary.successful_attacks,
"failed_attacks": experiment.result_summary.failed_attacks,
"execution_time_ms": experiment.result_summary.execution_time_ms
}
if experiment.error_message:
summary["error"] = experiment.error_message
return summary
def create_config_snapshot_from_pipeline_config(pipeline_config, model_name: str,
model_config: Optional[Dict[str, Any]] = None) -> ConfigSnapshot:
"""
Create config snapshot from pipeline configuration.
Args:
pipeline_config: Pipeline configuration object
model_name: Model name
model_config: Optional model configuration
Returns:
Config snapshot for experiment tracking
"""
# Extract relevant fields from pipeline config
return ConfigSnapshot(
model_name=model_name,
model_config=model_config or {},
dataset_name=getattr(pipeline_config, 'dataset_name', None),
dataset_version=getattr(pipeline_config, 'dataset_version', None),
dataset_config={},
attack_types=getattr(pipeline_config, 'attack_types', []),
pipeline_config=pipeline_config.dict(),
prompt_count=getattr(pipeline_config, 'num_prompts', 0),
max_iterations=getattr(pipeline_config, 'max_iterations', 5),
mutation_enabled=getattr(pipeline_config, 'mutation_enabled', True),
weights=getattr(pipeline_config, 'weights', {}),
environment={}
)
def create_result_summary_from_pipeline_result(pipeline_result) -> ResultSummary:
"""
Create result summary from pipeline result.
Args:
pipeline_result: Pipeline result object
Returns:
Result summary for experiment tracking
"""
# Extract score card data
score_card = pipeline_result.score_card
return ResultSummary(
robustness_score=score_card.robustness_score,
risk_score=score_card.risk_score,
success_rate=pipeline_result.success_rate,
hallucination_rate=score_card.hallucination_rate,
toxicity_rate=score_card.toxicity_rate,
confidence_score=score_card.confidence_score,
total_attacks=pipeline_result.total_attacks,
successful_attacks=len(pipeline_result.successful_attacks),
failed_attacks=len(pipeline_result.failed_attacks),
execution_time_ms=pipeline_result.execution_time_ms
)
# Import timedelta for date calculations
from datetime import timedelta