""" Experiment Utilities for AegisLM Framework Helper functions for experiment tracking, analysis, and management. """ import uuid from datetime import datetime from typing import Dict, Any, Optional, List, Tuple from pathlib import Path import sys # Add parent directory to path for imports current_dir = Path(__file__).parent backend_dir = current_dir.parent if str(backend_dir) not in sys.path: sys.path.insert(0, str(backend_dir)) from schemas.experiment_schema import ( Experiment, ExperimentStatus, ConfigSnapshot, ResultSummary, ExperimentFilter, ExperimentComparison ) from experiments.experiment_manager import get_experiment_manager class ExperimentAnalyzer: """ Utility class for analyzing experiments and generating insights. """ def __init__(self): """Initialize experiment analyzer.""" self.manager = get_experiment_manager() def analyze_performance_trends(self, model_name: str, days: int = 30) -> Dict[str, Any]: """ Analyze performance trends for a specific model. Args: model_name: Model name to analyze days: Number of days to look back Returns: Performance trend analysis """ # Get experiments for the model filters = ExperimentFilter( model_name=model_name, created_after=datetime.utcnow() - timedelta(days=days), page_size=1000 ) experiments = self.manager.list_experiments(filters) # Calculate trends completed_experiments = [exp for exp in experiments.experiments if exp.status == ExperimentStatus.COMPLETED and exp.result_summary] if not completed_experiments: return {"error": "No completed experiments found for analysis"} # Sort by creation date completed_experiments.sort(key=lambda x: x.created_at) # Calculate metrics over time robustness_scores = [] risk_scores = [] success_rates = [] execution_times = [] timestamps = [] for exp in completed_experiments: robustness_scores.append(exp.result_summary.robustness_score) risk_scores.append(exp.result_summary.risk_score) success_rates.append(exp.result_summary.success_rate) execution_times.append(exp.result_summary.execution_time_ms or 0) timestamps.append(exp.created_at) # Calculate trends robustness_trend = self._calculate_trend(robustness_scores) risk_trend = self._calculate_trend(risk_scores) success_rate_trend = self._calculate_trend(success_rates) execution_time_trend = self._calculate_trend(execution_times) return { "model_name": model_name, "analysis_period_days": days, "total_experiments": len(completed_experiments), "trends": { "robustness_score": robustness_trend, "risk_score": risk_trend, "success_rate": success_rate_trend, "execution_time_ms": execution_time_trend }, "current_performance": { "robustness_score": robustness_scores[-1] if robustness_scores else 0, "risk_score": risk_scores[-1] if risk_scores else 0, "success_rate": success_rates[-1] if success_rates else 0, "execution_time_ms": execution_times[-1] if execution_times else 0 }, "performance_history": [ { "timestamp": timestamps[i].isoformat(), "robustness_score": robustness_scores[i], "risk_score": risk_scores[i], "success_rate": success_rates[i], "execution_time_ms": execution_times[i] } for i in range(len(completed_experiments)) ] } def compare_model_performance(self, model_names: List[str]) -> Dict[str, Any]: """ Compare performance across multiple models. Args: model_names: List of model names to compare Returns: Model performance comparison """ model_stats = {} for model_name in model_names: # Get latest completed experiments for model filters = ExperimentFilter( model_name=model_name, status=ExperimentStatus.COMPLETED, page_size=100 ) experiments = self.manager.list_experiments(filters) completed_experiments = [exp for exp in experiments.experiments if exp.result_summary] if completed_experiments: # Calculate average metrics robustness_scores = [exp.result_summary.robustness_score for exp in completed_experiments] risk_scores = [exp.result_summary.risk_score for exp in completed_experiments] success_rates = [exp.result_summary.success_rate for exp in completed_experiments] model_stats[model_name] = { "experiment_count": len(completed_experiments), "avg_robustness_score": sum(robustness_scores) / len(robustness_scores), "avg_risk_score": sum(risk_scores) / len(risk_scores), "avg_success_rate": sum(success_rates) / len(success_rates), "latest_robustness_score": robustness_scores[-1], "latest_risk_score": risk_scores[-1], "latest_success_rate": success_rates[-1] } else: model_stats[model_name] = { "experiment_count": 0, "error": "No completed experiments found" } # Determine best performing model best_model = None best_score = -1 for model_name, stats in model_stats.items(): if "error" not in stats: # Composite score (higher robustness and lower risk is better) composite_score = stats["avg_robustness_score"] - stats["avg_risk_score"] if composite_score > best_score: best_score = composite_score best_model = model_name return { "models_compared": model_names, "model_stats": model_stats, "best_performing_model": best_model, "comparison_timestamp": datetime.utcnow().isoformat() } def analyze_attack_effectiveness(self, attack_types: List[str]) -> Dict[str, Any]: """ Analyze effectiveness of different attack types. Args: attack_types: List of attack types to analyze Returns: Attack effectiveness analysis """ attack_stats = {} for attack_type in attack_types: # Get experiments using this attack type filters = ExperimentFilter( attack_types=[attack_type], status=ExperimentStatus.COMPLETED, page_size=1000 ) experiments = self.manager.list_experiments(filters) completed_experiments = [exp for exp in experiments.experiments if exp.result_summary] if completed_experiments: success_rates = [exp.result_summary.success_rate for exp in completed_experiments] total_attacks = [exp.result_summary.total_attacks for exp in completed_experiments] successful_attacks = [exp.result_summary.successful_attacks for exp in completed_experiments] attack_stats[attack_type] = { "experiment_count": len(completed_experiments), "avg_success_rate": sum(success_rates) / len(success_rates), "total_attacks": sum(total_attacks), "total_successful_attacks": sum(successful_attacks), "overall_success_rate": sum(successful_attacks) / sum(total_attacks) if sum(total_attacks) > 0 else 0, "max_success_rate": max(success_rates), "min_success_rate": min(success_rates) } else: attack_stats[attack_type] = { "experiment_count": 0, "error": "No completed experiments found" } # Sort by effectiveness sorted_attacks = sorted( [(attack_type, stats.get("overall_success_rate", 0)) for attack_type, stats in attack_stats.items() if "error" not in stats], key=lambda x: x[1], reverse=True ) return { "attack_types_analyzed": attack_types, "attack_stats": attack_stats, "most_effective_attack": sorted_attacks[0][0] if sorted_attacks else None, "least_effective_attack": sorted_attacks[-1][0] if sorted_attacks else None, "analysis_timestamp": datetime.utcnow().isoformat() } def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: """ Calculate trend statistics for a list of values. Args: values: List of numeric values Returns: Trend statistics """ if len(values) < 2: return {"trend": "insufficient_data"} # Simple linear regression for trend n = len(values) x = list(range(n)) # Calculate slope (trend) x_mean = sum(x) / n y_mean = sum(values) / n numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n)) denominator = sum((x[i] - x_mean) ** 2 for i in range(n)) slope = numerator / denominator if denominator != 0 else 0 # Determine trend direction if abs(slope) < 0.01: trend_direction = "stable" elif slope > 0: trend_direction = "improving" else: trend_direction = "declining" # Calculate change percentage if values[0] != 0: change_percentage = ((values[-1] - values[0]) / values[0]) * 100 else: change_percentage = 0 return { "trend": trend_direction, "slope": slope, "change_percentage": change_percentage, "start_value": values[0], "end_value": values[-1], "min_value": min(values), "max_value": max(values) } class ExperimentExporter: """ Utility class for exporting experiment data. """ def __init__(self): """Initialize experiment exporter.""" self.manager = get_experiment_manager() def export_experiment_to_dict(self, run_id: str) -> Optional[Dict[str, Any]]: """ Export experiment to dictionary format. Args: run_id: Experiment run ID Returns: Experiment data as dictionary """ experiment = self.manager.get_experiment(run_id) if not experiment: return None return experiment.dict() def export_experiments_to_csv(self, filters: Optional[ExperimentFilter] = None) -> str: """ Export experiments to CSV format. Args: filters: Optional filters to apply Returns: CSV string of experiments """ import csv import io experiments = self.manager.list_experiments(filters or ExperimentFilter()) if not experiments.experiments: return "No experiments found" # Create CSV output = io.StringIO() writer = csv.writer(output) # Write header header = [ "run_id", "experiment_name", "model_name", "dataset_name", "dataset_version", "status", "priority", "created_at", "completed_at", "robustness_score", "risk_score", "success_rate", "total_attacks", "execution_time_ms" ] writer.writerow(header) # Write data for exp in experiments.experiments: row = [ str(exp.run_id), exp.experiment_name or "", exp.model_name, exp.dataset_name or "", exp.dataset_version or "", exp.status.value, exp.priority.value, exp.created_at.isoformat(), exp.completed_at.isoformat() if exp.completed_at else "", exp.result_summary.robustness_score if exp.result_summary else "", exp.result_summary.risk_score if exp.result_summary else "", exp.result_summary.success_rate if exp.result_summary else "", exp.result_summary.total_attacks if exp.result_summary else "", exp.result_summary.execution_time_ms if exp.result_summary else "" ] writer.writerow(row) return output.getvalue() def export_experiment_summary(self, run_id: str) -> Optional[Dict[str, Any]]: """ Export experiment summary for reporting. Args: run_id: Experiment run ID Returns: Experiment summary """ experiment = self.manager.get_experiment(run_id) if not experiment: return None summary = { "experiment_info": { "run_id": str(experiment.run_id), "experiment_name": experiment.experiment_name, "description": experiment.description, "status": experiment.status.value, "priority": experiment.priority.value, "created_at": experiment.created_at.isoformat(), "started_at": experiment.started_at.isoformat() if experiment.started_at else None, "completed_at": experiment.completed_at.isoformat() if experiment.completed_at else None, "created_by": experiment.created_by, "tags": experiment.tags }, "configuration": { "model_name": experiment.config_snapshot.model_name, "dataset_name": experiment.config_snapshot.dataset_name, "dataset_version": experiment.config_snapshot.dataset_version, "attack_types": experiment.config_snapshot.attack_types, "prompt_count": experiment.config_snapshot.prompt_count, "max_iterations": experiment.config_snapshot.max_iterations, "mutation_enabled": experiment.config_snapshot.mutation_enabled }, "results": None, "error": None } if experiment.result_summary: summary["results"] = { "robustness_score": experiment.result_summary.robustness_score, "risk_score": experiment.result_summary.risk_score, "success_rate": experiment.result_summary.success_rate, "hallucination_rate": experiment.result_summary.hallucination_rate, "toxicity_rate": experiment.result_summary.toxicity_rate, "confidence_score": experiment.result_summary.confidence_score, "total_attacks": experiment.result_summary.total_attacks, "successful_attacks": experiment.result_summary.successful_attacks, "failed_attacks": experiment.result_summary.failed_attacks, "execution_time_ms": experiment.result_summary.execution_time_ms } if experiment.error_message: summary["error"] = experiment.error_message return summary def create_config_snapshot_from_pipeline_config(pipeline_config, model_name: str, model_config: Optional[Dict[str, Any]] = None) -> ConfigSnapshot: """ Create config snapshot from pipeline configuration. Args: pipeline_config: Pipeline configuration object model_name: Model name model_config: Optional model configuration Returns: Config snapshot for experiment tracking """ # Extract relevant fields from pipeline config return ConfigSnapshot( model_name=model_name, model_config=model_config or {}, dataset_name=getattr(pipeline_config, 'dataset_name', None), dataset_version=getattr(pipeline_config, 'dataset_version', None), dataset_config={}, attack_types=getattr(pipeline_config, 'attack_types', []), pipeline_config=pipeline_config.dict(), prompt_count=getattr(pipeline_config, 'num_prompts', 0), max_iterations=getattr(pipeline_config, 'max_iterations', 5), mutation_enabled=getattr(pipeline_config, 'mutation_enabled', True), weights=getattr(pipeline_config, 'weights', {}), environment={} ) def create_result_summary_from_pipeline_result(pipeline_result) -> ResultSummary: """ Create result summary from pipeline result. Args: pipeline_result: Pipeline result object Returns: Result summary for experiment tracking """ # Extract score card data score_card = pipeline_result.score_card return ResultSummary( robustness_score=score_card.robustness_score, risk_score=score_card.risk_score, success_rate=pipeline_result.success_rate, hallucination_rate=score_card.hallucination_rate, toxicity_rate=score_card.toxicity_rate, confidence_score=score_card.confidence_score, total_attacks=pipeline_result.total_attacks, successful_attacks=len(pipeline_result.successful_attacks), failed_attacks=len(pipeline_result.failed_attacks), execution_time_ms=pipeline_result.execution_time_ms ) # Import timedelta for date calculations from datetime import timedelta