| """ |
| Experiment Utilities for AegisLM Framework |
| |
| Helper functions for experiment tracking, analysis, and management. |
| """ |
|
|
| import uuid |
| from datetime import datetime |
| from typing import Dict, Any, Optional, List, Tuple |
| from pathlib import Path |
| import sys |
|
|
| |
| current_dir = Path(__file__).parent |
| backend_dir = current_dir.parent |
| if str(backend_dir) not in sys.path: |
| sys.path.insert(0, str(backend_dir)) |
|
|
| from schemas.experiment_schema import ( |
| Experiment, ExperimentStatus, ConfigSnapshot, ResultSummary, |
| ExperimentFilter, ExperimentComparison |
| ) |
| from experiments.experiment_manager import get_experiment_manager |
|
|
|
|
| class ExperimentAnalyzer: |
| """ |
| Utility class for analyzing experiments and generating insights. |
| """ |
| |
| def __init__(self): |
| """Initialize experiment analyzer.""" |
| self.manager = get_experiment_manager() |
| |
| def analyze_performance_trends(self, model_name: str, days: int = 30) -> Dict[str, Any]: |
| """ |
| Analyze performance trends for a specific model. |
| |
| Args: |
| model_name: Model name to analyze |
| days: Number of days to look back |
| |
| Returns: |
| Performance trend analysis |
| """ |
| |
| filters = ExperimentFilter( |
| model_name=model_name, |
| created_after=datetime.utcnow() - timedelta(days=days), |
| page_size=1000 |
| ) |
| |
| experiments = self.manager.list_experiments(filters) |
| |
| |
| completed_experiments = [exp for exp in experiments.experiments |
| if exp.status == ExperimentStatus.COMPLETED and exp.result_summary] |
| |
| if not completed_experiments: |
| return {"error": "No completed experiments found for analysis"} |
| |
| |
| completed_experiments.sort(key=lambda x: x.created_at) |
| |
| |
| robustness_scores = [] |
| risk_scores = [] |
| success_rates = [] |
| execution_times = [] |
| timestamps = [] |
| |
| for exp in completed_experiments: |
| robustness_scores.append(exp.result_summary.robustness_score) |
| risk_scores.append(exp.result_summary.risk_score) |
| success_rates.append(exp.result_summary.success_rate) |
| execution_times.append(exp.result_summary.execution_time_ms or 0) |
| timestamps.append(exp.created_at) |
| |
| |
| robustness_trend = self._calculate_trend(robustness_scores) |
| risk_trend = self._calculate_trend(risk_scores) |
| success_rate_trend = self._calculate_trend(success_rates) |
| execution_time_trend = self._calculate_trend(execution_times) |
| |
| return { |
| "model_name": model_name, |
| "analysis_period_days": days, |
| "total_experiments": len(completed_experiments), |
| "trends": { |
| "robustness_score": robustness_trend, |
| "risk_score": risk_trend, |
| "success_rate": success_rate_trend, |
| "execution_time_ms": execution_time_trend |
| }, |
| "current_performance": { |
| "robustness_score": robustness_scores[-1] if robustness_scores else 0, |
| "risk_score": risk_scores[-1] if risk_scores else 0, |
| "success_rate": success_rates[-1] if success_rates else 0, |
| "execution_time_ms": execution_times[-1] if execution_times else 0 |
| }, |
| "performance_history": [ |
| { |
| "timestamp": timestamps[i].isoformat(), |
| "robustness_score": robustness_scores[i], |
| "risk_score": risk_scores[i], |
| "success_rate": success_rates[i], |
| "execution_time_ms": execution_times[i] |
| } |
| for i in range(len(completed_experiments)) |
| ] |
| } |
| |
| def compare_model_performance(self, model_names: List[str]) -> Dict[str, Any]: |
| """ |
| Compare performance across multiple models. |
| |
| Args: |
| model_names: List of model names to compare |
| |
| Returns: |
| Model performance comparison |
| """ |
| model_stats = {} |
| |
| for model_name in model_names: |
| |
| filters = ExperimentFilter( |
| model_name=model_name, |
| status=ExperimentStatus.COMPLETED, |
| page_size=100 |
| ) |
| |
| experiments = self.manager.list_experiments(filters) |
| completed_experiments = [exp for exp in experiments.experiments |
| if exp.result_summary] |
| |
| if completed_experiments: |
| |
| robustness_scores = [exp.result_summary.robustness_score for exp in completed_experiments] |
| risk_scores = [exp.result_summary.risk_score for exp in completed_experiments] |
| success_rates = [exp.result_summary.success_rate for exp in completed_experiments] |
| |
| model_stats[model_name] = { |
| "experiment_count": len(completed_experiments), |
| "avg_robustness_score": sum(robustness_scores) / len(robustness_scores), |
| "avg_risk_score": sum(risk_scores) / len(risk_scores), |
| "avg_success_rate": sum(success_rates) / len(success_rates), |
| "latest_robustness_score": robustness_scores[-1], |
| "latest_risk_score": risk_scores[-1], |
| "latest_success_rate": success_rates[-1] |
| } |
| else: |
| model_stats[model_name] = { |
| "experiment_count": 0, |
| "error": "No completed experiments found" |
| } |
| |
| |
| best_model = None |
| best_score = -1 |
| |
| for model_name, stats in model_stats.items(): |
| if "error" not in stats: |
| |
| composite_score = stats["avg_robustness_score"] - stats["avg_risk_score"] |
| if composite_score > best_score: |
| best_score = composite_score |
| best_model = model_name |
| |
| return { |
| "models_compared": model_names, |
| "model_stats": model_stats, |
| "best_performing_model": best_model, |
| "comparison_timestamp": datetime.utcnow().isoformat() |
| } |
| |
| def analyze_attack_effectiveness(self, attack_types: List[str]) -> Dict[str, Any]: |
| """ |
| Analyze effectiveness of different attack types. |
| |
| Args: |
| attack_types: List of attack types to analyze |
| |
| Returns: |
| Attack effectiveness analysis |
| """ |
| attack_stats = {} |
| |
| for attack_type in attack_types: |
| |
| filters = ExperimentFilter( |
| attack_types=[attack_type], |
| status=ExperimentStatus.COMPLETED, |
| page_size=1000 |
| ) |
| |
| experiments = self.manager.list_experiments(filters) |
| completed_experiments = [exp for exp in experiments.experiments |
| if exp.result_summary] |
| |
| if completed_experiments: |
| success_rates = [exp.result_summary.success_rate for exp in completed_experiments] |
| total_attacks = [exp.result_summary.total_attacks for exp in completed_experiments] |
| successful_attacks = [exp.result_summary.successful_attacks for exp in completed_experiments] |
| |
| attack_stats[attack_type] = { |
| "experiment_count": len(completed_experiments), |
| "avg_success_rate": sum(success_rates) / len(success_rates), |
| "total_attacks": sum(total_attacks), |
| "total_successful_attacks": sum(successful_attacks), |
| "overall_success_rate": sum(successful_attacks) / sum(total_attacks) if sum(total_attacks) > 0 else 0, |
| "max_success_rate": max(success_rates), |
| "min_success_rate": min(success_rates) |
| } |
| else: |
| attack_stats[attack_type] = { |
| "experiment_count": 0, |
| "error": "No completed experiments found" |
| } |
| |
| |
| sorted_attacks = sorted( |
| [(attack_type, stats.get("overall_success_rate", 0)) |
| for attack_type, stats in attack_stats.items() |
| if "error" not in stats], |
| key=lambda x: x[1], |
| reverse=True |
| ) |
| |
| return { |
| "attack_types_analyzed": attack_types, |
| "attack_stats": attack_stats, |
| "most_effective_attack": sorted_attacks[0][0] if sorted_attacks else None, |
| "least_effective_attack": sorted_attacks[-1][0] if sorted_attacks else None, |
| "analysis_timestamp": datetime.utcnow().isoformat() |
| } |
| |
| def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: |
| """ |
| Calculate trend statistics for a list of values. |
| |
| Args: |
| values: List of numeric values |
| |
| Returns: |
| Trend statistics |
| """ |
| if len(values) < 2: |
| return {"trend": "insufficient_data"} |
| |
| |
| n = len(values) |
| x = list(range(n)) |
| |
| |
| x_mean = sum(x) / n |
| y_mean = sum(values) / n |
| |
| numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n)) |
| denominator = sum((x[i] - x_mean) ** 2 for i in range(n)) |
| |
| slope = numerator / denominator if denominator != 0 else 0 |
| |
| |
| if abs(slope) < 0.01: |
| trend_direction = "stable" |
| elif slope > 0: |
| trend_direction = "improving" |
| else: |
| trend_direction = "declining" |
| |
| |
| if values[0] != 0: |
| change_percentage = ((values[-1] - values[0]) / values[0]) * 100 |
| else: |
| change_percentage = 0 |
| |
| return { |
| "trend": trend_direction, |
| "slope": slope, |
| "change_percentage": change_percentage, |
| "start_value": values[0], |
| "end_value": values[-1], |
| "min_value": min(values), |
| "max_value": max(values) |
| } |
|
|
|
|
| class ExperimentExporter: |
| """ |
| Utility class for exporting experiment data. |
| """ |
| |
| def __init__(self): |
| """Initialize experiment exporter.""" |
| self.manager = get_experiment_manager() |
| |
| def export_experiment_to_dict(self, run_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Export experiment to dictionary format. |
| |
| Args: |
| run_id: Experiment run ID |
| |
| Returns: |
| Experiment data as dictionary |
| """ |
| experiment = self.manager.get_experiment(run_id) |
| if not experiment: |
| return None |
| |
| return experiment.dict() |
| |
| def export_experiments_to_csv(self, filters: Optional[ExperimentFilter] = None) -> str: |
| """ |
| Export experiments to CSV format. |
| |
| Args: |
| filters: Optional filters to apply |
| |
| Returns: |
| CSV string of experiments |
| """ |
| import csv |
| import io |
| |
| experiments = self.manager.list_experiments(filters or ExperimentFilter()) |
| |
| if not experiments.experiments: |
| return "No experiments found" |
| |
| |
| output = io.StringIO() |
| writer = csv.writer(output) |
| |
| |
| header = [ |
| "run_id", "experiment_name", "model_name", "dataset_name", |
| "dataset_version", "status", "priority", "created_at", |
| "completed_at", "robustness_score", "risk_score", |
| "success_rate", "total_attacks", "execution_time_ms" |
| ] |
| writer.writerow(header) |
| |
| |
| for exp in experiments.experiments: |
| row = [ |
| str(exp.run_id), |
| exp.experiment_name or "", |
| exp.model_name, |
| exp.dataset_name or "", |
| exp.dataset_version or "", |
| exp.status.value, |
| exp.priority.value, |
| exp.created_at.isoformat(), |
| exp.completed_at.isoformat() if exp.completed_at else "", |
| exp.result_summary.robustness_score if exp.result_summary else "", |
| exp.result_summary.risk_score if exp.result_summary else "", |
| exp.result_summary.success_rate if exp.result_summary else "", |
| exp.result_summary.total_attacks if exp.result_summary else "", |
| exp.result_summary.execution_time_ms if exp.result_summary else "" |
| ] |
| writer.writerow(row) |
| |
| return output.getvalue() |
| |
| def export_experiment_summary(self, run_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Export experiment summary for reporting. |
| |
| Args: |
| run_id: Experiment run ID |
| |
| Returns: |
| Experiment summary |
| """ |
| experiment = self.manager.get_experiment(run_id) |
| if not experiment: |
| return None |
| |
| summary = { |
| "experiment_info": { |
| "run_id": str(experiment.run_id), |
| "experiment_name": experiment.experiment_name, |
| "description": experiment.description, |
| "status": experiment.status.value, |
| "priority": experiment.priority.value, |
| "created_at": experiment.created_at.isoformat(), |
| "started_at": experiment.started_at.isoformat() if experiment.started_at else None, |
| "completed_at": experiment.completed_at.isoformat() if experiment.completed_at else None, |
| "created_by": experiment.created_by, |
| "tags": experiment.tags |
| }, |
| "configuration": { |
| "model_name": experiment.config_snapshot.model_name, |
| "dataset_name": experiment.config_snapshot.dataset_name, |
| "dataset_version": experiment.config_snapshot.dataset_version, |
| "attack_types": experiment.config_snapshot.attack_types, |
| "prompt_count": experiment.config_snapshot.prompt_count, |
| "max_iterations": experiment.config_snapshot.max_iterations, |
| "mutation_enabled": experiment.config_snapshot.mutation_enabled |
| }, |
| "results": None, |
| "error": None |
| } |
| |
| if experiment.result_summary: |
| summary["results"] = { |
| "robustness_score": experiment.result_summary.robustness_score, |
| "risk_score": experiment.result_summary.risk_score, |
| "success_rate": experiment.result_summary.success_rate, |
| "hallucination_rate": experiment.result_summary.hallucination_rate, |
| "toxicity_rate": experiment.result_summary.toxicity_rate, |
| "confidence_score": experiment.result_summary.confidence_score, |
| "total_attacks": experiment.result_summary.total_attacks, |
| "successful_attacks": experiment.result_summary.successful_attacks, |
| "failed_attacks": experiment.result_summary.failed_attacks, |
| "execution_time_ms": experiment.result_summary.execution_time_ms |
| } |
| |
| if experiment.error_message: |
| summary["error"] = experiment.error_message |
| |
| return summary |
|
|
|
|
| def create_config_snapshot_from_pipeline_config(pipeline_config, model_name: str, |
| model_config: Optional[Dict[str, Any]] = None) -> ConfigSnapshot: |
| """ |
| Create config snapshot from pipeline configuration. |
| |
| Args: |
| pipeline_config: Pipeline configuration object |
| model_name: Model name |
| model_config: Optional model configuration |
| |
| Returns: |
| Config snapshot for experiment tracking |
| """ |
| |
| return ConfigSnapshot( |
| model_name=model_name, |
| model_config=model_config or {}, |
| dataset_name=getattr(pipeline_config, 'dataset_name', None), |
| dataset_version=getattr(pipeline_config, 'dataset_version', None), |
| dataset_config={}, |
| attack_types=getattr(pipeline_config, 'attack_types', []), |
| pipeline_config=pipeline_config.dict(), |
| prompt_count=getattr(pipeline_config, 'num_prompts', 0), |
| max_iterations=getattr(pipeline_config, 'max_iterations', 5), |
| mutation_enabled=getattr(pipeline_config, 'mutation_enabled', True), |
| weights=getattr(pipeline_config, 'weights', {}), |
| environment={} |
| ) |
|
|
|
|
| def create_result_summary_from_pipeline_result(pipeline_result) -> ResultSummary: |
| """ |
| Create result summary from pipeline result. |
| |
| Args: |
| pipeline_result: Pipeline result object |
| |
| Returns: |
| Result summary for experiment tracking |
| """ |
| |
| score_card = pipeline_result.score_card |
| |
| return ResultSummary( |
| robustness_score=score_card.robustness_score, |
| risk_score=score_card.risk_score, |
| success_rate=pipeline_result.success_rate, |
| hallucination_rate=score_card.hallucination_rate, |
| toxicity_rate=score_card.toxicity_rate, |
| confidence_score=score_card.confidence_score, |
| total_attacks=pipeline_result.total_attacks, |
| successful_attacks=len(pipeline_result.successful_attacks), |
| failed_attacks=len(pipeline_result.failed_attacks), |
| execution_time_ms=pipeline_result.execution_time_ms |
| ) |
|
|
|
|
| |
| from datetime import timedelta |
|
|