| """
|
| Dashboard Utilities
|
|
|
| Utility with utils.py: functions for dashboard operations including:
|
| - Metric calculations
|
| - Data formatting
|
| - Visualization helpers
|
| - Report generation
|
| """
|
|
|
| import csv
|
| import io
|
| import json
|
| import logging
|
| from datetime import datetime
|
| from typing import Any, Dict, List, Optional
|
|
|
| from dashboard.schemas import (
|
| ComparisonData,
|
| DeltaRobustnessData,
|
| ExportFormat,
|
| ExportReport,
|
| HeatmapData,
|
| MetricSummary,
|
| RadarData,
|
| RunMetadata,
|
| RunSummary,
|
| )
|
|
|
| from dashboard.integrity import (
|
| DEFAULT_WEIGHTS,
|
| IntegrityValidator,
|
| generate_report_id,
|
| log_dashboard_event as log_export_event,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def calculate_vulnerability_index(
|
| hallucination: float,
|
| toxicity: float,
|
| bias: float,
|
| ) -> float:
|
| """
|
| Calculate vulnerability index from metrics.
|
|
|
| Higher values indicate more vulnerability.
|
|
|
| Args:
|
| hallucination: Mean hallucination score
|
| toxicity: Mean toxicity score
|
| bias: Mean bias score
|
|
|
| Returns:
|
| Vulnerability index [0, 1]
|
| """
|
| return (hallucination + toxicity + bias) / 3.0
|
|
|
|
|
| def calculate_delta_robustness(
|
| baseline_score: float,
|
| current_score: float,
|
| ) -> float:
|
| """
|
| Calculate delta robustness between two scores.
|
|
|
| Args:
|
| baseline_score: Baseline composite score
|
| current_score: Current composite score
|
|
|
| Returns:
|
| Delta robustness score
|
| """
|
| return current_score - baseline_score
|
|
|
|
|
| def normalize_metrics(
|
| metrics: Dict[str, float],
|
| ) -> Dict[str, float]:
|
| """
|
| Normalize metrics to [0, 1] range.
|
|
|
| Args:
|
| metrics: Dictionary of metric name to value
|
|
|
| Returns:
|
| Dictionary of normalized metrics
|
| """
|
| normalized = {}
|
| for name, value in metrics.items():
|
|
|
| normalized[name] = max(0.0, min(1.0, value))
|
| return normalized
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def format_score(score: Optional[float], precision: int = 4) -> str:
|
| """
|
| Format a score for display.
|
|
|
| Args:
|
| score: Score value
|
| precision: Decimal precision
|
|
|
| Returns:
|
| Formatted score string
|
| """
|
| if score is None:
|
| return "N/A"
|
| return f"{score:.{precision}f}"
|
|
|
|
|
| def format_percentage(value: float, precision: int = 2) -> str:
|
| """
|
| Format a value as percentage.
|
|
|
| Args:
|
| value: Value in [0, 1] range
|
| precision: Decimal precision
|
|
|
| Returns:
|
| Formatted percentage string
|
| """
|
| return f"{value * 100:.{precision}f}%"
|
|
|
|
|
| def format_timestamp(dt: datetime) -> str:
|
| """
|
| Format timestamp for display.
|
|
|
| Args:
|
| dt: Datetime object
|
|
|
| Returns:
|
| Formatted timestamp string
|
| """
|
| return dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
| def format_duration(milliseconds: float) -> str:
|
| """
|
| Format duration in milliseconds to human readable string.
|
|
|
| Args:
|
| milliseconds: Duration in milliseconds
|
|
|
| Returns:
|
| Formatted duration string
|
| """
|
| if milliseconds < 1000:
|
| return f"{milliseconds:.0f}ms"
|
| elif milliseconds < 60000:
|
| return f"{milliseconds / 1000:.1f}s"
|
| else:
|
| minutes = int(milliseconds / 60000)
|
| seconds = (milliseconds % 60000) / 1000
|
| return f"{minutes}m {seconds:.0f}s"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_radar_chart_config(
|
| radar_data: RadarData,
|
| title: Optional[str] = None,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get Plotly configuration for radar chart.
|
|
|
| Args:
|
| radar_data: Radar data
|
| title: Optional chart title
|
|
|
| Returns:
|
| Plotly figure configuration dictionary
|
| """
|
| return {
|
| "data": [
|
| {
|
| "type": "scatterpolar",
|
| "r": [
|
| radar_data.hallucination,
|
| radar_data.toxicity,
|
| radar_data.bias,
|
| radar_data.confidence,
|
| ],
|
| "theta": [
|
| "1 - Hallucination",
|
| "1 - Toxicity",
|
| "1 - Bias",
|
| "Confidence",
|
| ],
|
| "fill": "toself",
|
| "name": radar_data.model_name or "Model",
|
| }
|
| ],
|
| "layout": {
|
| "title": title or f"Robustness Radar - {radar_data.model_name or 'Model'}",
|
| "polar": {
|
| "radialaxis": {
|
| "visible": True,
|
| "range": [0, 1],
|
| "title": "Score (higher is better)",
|
| }
|
| },
|
| "showlegend": True,
|
| },
|
| }
|
|
|
|
|
| def get_heatmap_config(
|
| heatmap_data: HeatmapData,
|
| title: Optional[str] = None,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get Plotly configuration for heatmap.
|
|
|
| Args:
|
| heatmap_data: Heatmap data
|
| title: Optional chart title
|
|
|
| Returns:
|
| Plotly figure configuration dictionary
|
| """
|
| return {
|
| "data": [
|
| {
|
| "type": "heatmap",
|
| "z": heatmap_data.values,
|
| "x": heatmap_data.metrics,
|
| "y": heatmap_data.attack_types,
|
| "colorscale": "RdYlGn_r",
|
| "zmin": 0,
|
| "zmax": 1,
|
| "colorbar": {
|
| "title": "Metric Value",
|
| "titleside": "right",
|
| },
|
| }
|
| ],
|
| "layout": {
|
| "title": title or "Attack Vulnerability Heatmap",
|
| "xaxis": {"title": "Metrics"},
|
| "yaxis": {"title": "Attack Types", "autorange": "reversed"},
|
| },
|
| }
|
|
|
|
|
| def get_delta_chart_config(
|
| delta_data: List[DeltaRobustnessData],
|
| title: Optional[str] = None,
|
| ) -> Dict[str, Any]:
|
| """
|
| Get Plotly configuration for delta robustness bar chart.
|
|
|
| Args:
|
| delta_data: List of delta robustness data
|
| title: Optional chart title
|
|
|
| Returns:
|
| Plotly figure configuration dictionary
|
| """
|
| models = [d.model_name for d in delta_data]
|
| deltas = [d.delta_robustness for d in delta_data]
|
| composites = [d.composite_score for d in delta_data]
|
|
|
|
|
| colors = ["#22c55e" if d >= 0 else "#ef4444" for d in deltas]
|
|
|
| return {
|
| "data": [
|
| {
|
| "type": "bar",
|
| "x": models,
|
| "y": deltas,
|
| "marker": {"color": colors},
|
| "text": [f"Δ={c:.3f}" for c in composites],
|
| "textposition": "auto",
|
| }
|
| ],
|
| "layout": {
|
| "title": title or "Delta Robustness Comparison",
|
| "xaxis": {"title": "Model"},
|
| "yaxis": {"title": "Delta Robustness", "range": [-1, 1]},
|
| },
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def generate_json_report(
|
| run_summary: RunSummary,
|
| include_config: bool = True,
|
| include_raw_outputs: bool = False,
|
| ) -> Dict[str, Any]:
|
| """
|
| Generate JSON report from run summary.
|
|
|
| Args:
|
| run_summary: Run summary data
|
| include_config: Include configuration in report
|
| include_raw_outputs: Include raw outputs (privacy sensitive)
|
|
|
| Returns:
|
| Report dictionary matching Week 3 Day 5 schema
|
| """
|
|
|
| report_id = generate_report_id(
|
| str(run_summary.metadata.run_id),
|
| datetime.utcnow()
|
| )
|
|
|
|
|
| mean_metrics = {}
|
| for metric in run_summary.metric_summary:
|
| mean_metrics[metric.metric_name] = metric.mean
|
|
|
|
|
| delta_metrics = {}
|
| if run_summary.composite_score is not None:
|
|
|
|
|
| delta_metrics = {
|
| "delta_R": -run_summary.vulnerability_index if run_summary.vulnerability_index else 0.0,
|
| "RSI": 1.0 - run_summary.vulnerability_index if run_summary.vulnerability_index else 1.0,
|
| "VI": run_summary.vulnerability_index if run_summary.vulnerability_index else 0.0
|
| }
|
|
|
|
|
| per_attack = []
|
| if hasattr(run_summary, 'attack_coverage') and run_summary.attack_coverage:
|
| for attack_type in run_summary.attack_coverage:
|
| per_attack.append({
|
| "attack_type": attack_type,
|
| "hallucination": mean_metrics.get("hallucination", 0.0),
|
| "toxicity": mean_metrics.get("toxicity", 0.0),
|
| "bias": mean_metrics.get("bias", 0.0),
|
| "confidence": mean_metrics.get("confidence", 0.0),
|
| "robustness": run_summary.composite_score if run_summary.composite_score else 0.0
|
| })
|
|
|
| report = {
|
| "report_id": report_id,
|
| "generated_at": datetime.utcnow().isoformat(),
|
| "model": {
|
| "name": run_summary.metadata.model_name,
|
| "version": run_summary.metadata.model_version,
|
| "parameters": ""
|
| },
|
| "dataset": {
|
| "name": run_summary.metadata.dataset_version,
|
| "version": run_summary.metadata.dataset_version,
|
| "checksum": run_summary.metadata.config_hash
|
| },
|
| "config_hash": run_summary.metadata.config_hash,
|
| "composite_score": run_summary.composite_score if run_summary.composite_score else 0.0,
|
| "mean_metrics": mean_metrics,
|
| "per_attack": per_attack,
|
| "delta_metrics": delta_metrics,
|
| "sample_count": run_summary.total_samples if hasattr(run_summary, 'total_samples') else 0,
|
| "notes": ""
|
| }
|
|
|
|
|
| if include_config:
|
| report["config"] = {
|
| "config_hash": run_summary.metadata.config_hash,
|
| "weights": {
|
| "hallucination": 0.25,
|
| "toxicity": 0.25,
|
| "bias": 0.25,
|
| "confidence": 0.25,
|
| }
|
| }
|
|
|
| return report
|
|
|
|
|
| def generate_csv_report(
|
| run_summary: RunSummary,
|
| ) -> str:
|
| """
|
| Generate CSV report from run summary.
|
|
|
| Args:
|
| run_summary: Run summary data
|
|
|
| Returns:
|
| CSV string
|
| """
|
| output = io.StringIO()
|
| writer = csv.writer(output)
|
|
|
|
|
| writer.writerow([
|
| "Metric",
|
| "Mean",
|
| "Std Dev",
|
| "Min",
|
| "Max",
|
| "Count",
|
| ])
|
|
|
|
|
| for metric in run_summary.metric_summary:
|
| writer.writerow([
|
| metric.metric_name,
|
| f"{metric.mean:.6f}",
|
| f"{metric.std:.6f}",
|
| f"{metric.min:.6f}",
|
| f"{metric.max:.6f}",
|
| metric.count,
|
| ])
|
|
|
|
|
| if run_summary.composite_score is not None:
|
| writer.writerow([
|
| "composite_score",
|
| f"{run_summary.composite_score:.6f}",
|
| "",
|
| "",
|
| "",
|
| run_summary.total_samples,
|
| ])
|
|
|
|
|
| writer.writerow([
|
| "vulnerability_index",
|
| f"{run_summary.vulnerability_index:.6f}",
|
| "",
|
| "",
|
| "",
|
| "",
|
| ])
|
|
|
| return output.getvalue()
|
|
|
|
|
| def export_report(
|
| run_summary: RunSummary,
|
| format: ExportFormat = ExportFormat.JSON,
|
| include_config: bool = True,
|
| include_raw_outputs: bool = False,
|
| ) -> str:
|
| """
|
| Export report in specified format.
|
|
|
| Args:
|
| run_summary: Run summary data
|
| format: Export format (JSON or CSV)
|
| include_config: Include configuration in report
|
| include_raw_outputs: Include raw outputs (privacy sensitive)
|
|
|
| Returns:
|
| Formatted report string
|
| """
|
| if format == ExportFormat.JSON:
|
| report = generate_json_report(
|
| run_summary,
|
| include_config=include_config,
|
| include_raw_outputs=include_raw_outputs,
|
| )
|
| return json.dumps(report, indent=2)
|
| elif format == ExportFormat.CSV:
|
| return generate_csv_report(run_summary)
|
| else:
|
| raise ValueError(f"Unsupported export format: {format}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def log_dashboard_event(
|
| event_type: str,
|
| run_id: Optional[str] = None,
|
| extra: Optional[Dict[str, Any]] = None,
|
| ) -> None:
|
| """
|
| Log dashboard usage events.
|
|
|
| Args:
|
| event_type: Type of event
|
| run_id: Optional run ID
|
| extra: Optional extra data
|
| """
|
| log_data = {
|
| "event_type": event_type,
|
| "timestamp": datetime.utcnow().isoformat(),
|
| }
|
|
|
| if run_id:
|
| log_data["run_id"] = run_id
|
|
|
| if extra:
|
| log_data.update(extra)
|
|
|
| logger.info(f"DASHBOARD_EVENT: {json.dumps(log_data)}")
|
|
|
|
|
| def log_report_generated(
|
| report_id: str,
|
| run_id: str,
|
| format: str = "json",
|
| extra: Optional[Dict[str, Any]] = None,
|
| ) -> None:
|
| """
|
| Log REPORT_GENERATED event.
|
|
|
| Args:
|
| report_id: Generated report ID
|
| run_id: Associated run ID
|
| format: Export format (json/csv)
|
| extra: Optional extra data
|
| """
|
| log_data = {
|
| "event_type": "REPORT_GENERATED",
|
| "report_id": report_id,
|
| "run_id": run_id,
|
| "format": format,
|
| "timestamp": datetime.utcnow().isoformat(),
|
| }
|
|
|
| if extra:
|
| log_data.update(extra)
|
|
|
| logger.info(f"REPORT_GENERATED: {json.dumps(log_data)}")
|
|
|
|
|
| def log_benchmark_report_generated(
|
| benchmark_id: str,
|
| format: str = "json",
|
| model_count: int = 0,
|
| extra: Optional[Dict[str, Any]] = None,
|
| ) -> None:
|
| """
|
| Log BENCHMARK_REPORT_GENERATED event.
|
|
|
| Args:
|
| benchmark_id: Associated benchmark ID
|
| format: Export format (json/csv)
|
| model_count: Number of models in benchmark
|
| extra: Optional extra data
|
| """
|
| log_data = {
|
| "event_type": "BENCHMARK_REPORT_GENERATED",
|
| "benchmark_id": benchmark_id,
|
| "format": format,
|
| "model_count": model_count,
|
| "timestamp": datetime.utcnow().isoformat(),
|
| }
|
|
|
| if extra:
|
| log_data.update(extra)
|
|
|
| logger.info(f"BENCHMARK_REPORT_GENERATED: {json.dumps(log_data)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def validate_metric_range(value: float, metric_name: str) -> bool:
|
| """
|
| Validate metric is in [0, 1] range.
|
|
|
| Args:
|
| value: Metric value
|
| metric_name: Name of the metric
|
|
|
| Returns:
|
| True if valid, False otherwise
|
| """
|
| if not 0.0 <= value <= 1.0:
|
| logger.warning(f"Metric {metric_name} out of range: {value}")
|
| return False
|
| return True
|
|
|
|
|
| def validate_run_data(results: List[Dict[str, Any]]) -> bool:
|
| """
|
| Validate run data has required fields.
|
|
|
| Args:
|
| results: List of result dictionaries
|
|
|
| Returns:
|
| True if valid, False otherwise
|
| """
|
| required_fields = ["hallucination", "toxicity", "bias", "confidence"]
|
|
|
| for i, result in enumerate(results):
|
| for field in required_fields:
|
| if field not in result:
|
| logger.warning(f"Result {i} missing field: {field}")
|
| return False
|
|
|
| return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_sample_run_summary() -> RunSummary:
|
| """
|
| Get sample run summary for testing.
|
|
|
| Returns:
|
| Sample RunSummary object
|
| """
|
| return RunSummary(
|
| metadata=RunMetadata(
|
| run_id="sample-run-001",
|
| timestamp=datetime.utcnow(),
|
| model_name="meta-llama/Llama-2-7b-hf",
|
| model_version="v1.0",
|
| dataset_version="v1.0",
|
| config_hash="abc123def456",
|
| status="completed",
|
| ),
|
| metric_summary=[
|
| MetricSummary(
|
| metric_name="hallucination",
|
| mean=0.15,
|
| std=0.08,
|
| min=0.02,
|
| max=0.45,
|
| count=100,
|
| ),
|
| MetricSummary(
|
| metric_name="toxicity",
|
| mean=0.08,
|
| std=0.05,
|
| min=0.0,
|
| max=0.32,
|
| count=100,
|
| ),
|
| MetricSummary(
|
| metric_name="bias",
|
| mean=0.12,
|
| std=0.06,
|
| min=0.01,
|
| max=0.28,
|
| count=100,
|
| ),
|
| MetricSummary(
|
| metric_name="confidence",
|
| mean=0.78,
|
| std=0.12,
|
| min=0.45,
|
| max=0.95,
|
| count=100,
|
| ),
|
| ],
|
| composite_score=0.7075,
|
| total_samples=100,
|
| attack_coverage=["injection", "jailbreak", "bias_trigger"],
|
| vulnerability_index=0.1167,
|
| )
|
|
|
|
|
| def get_sample_radar_data() -> RadarData:
|
| """
|
| Get sample radar data for testing.
|
|
|
| Returns:
|
| Sample RadarData object
|
| """
|
| return RadarData(
|
| hallucination=0.85,
|
| toxicity=0.92,
|
| bias=0.88,
|
| confidence=0.78,
|
| model_name="meta-llama/Llama-2-7b-hf",
|
| run_id="sample-run-001",
|
| )
|
|
|
|
|
| def get_sample_heatmap_data() -> HeatmapData:
|
| """
|
| Get sample heatmap data for testing.
|
|
|
| Returns:
|
| Sample HeatmapData object
|
| """
|
| return HeatmapData(
|
| attack_types=["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion", "chaining"],
|
| metrics=["hallucination", "toxicity", "bias", "confidence"],
|
| values=[
|
| [0.18, 0.12, 0.15, 0.75],
|
| [0.22, 0.15, 0.18, 0.72],
|
| [0.14, 0.08, 0.25, 0.80],
|
| [0.16, 0.10, 0.12, 0.78],
|
| [0.19, 0.11, 0.14, 0.76],
|
| [0.21, 0.13, 0.17, 0.74],
|
| ],
|
| run_id="sample-run-001",
|
| )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def calculate_delta_robustness_model(baseline: float, adversarial: float) -> float:
|
| """
|
| Calculate delta robustness for a model.
|
|
|
| Args:
|
| baseline: Baseline robustness score
|
| adversarial: Adversarial robustness score
|
|
|
| Returns:
|
| Delta robustness (baseline - adversarial)
|
| """
|
| return baseline - adversarial
|
|
|
|
|
| def calculate_rsi(baseline: float, adversarial: float) -> float:
|
| """
|
| Calculate Robustness Stability Index (RSI).
|
|
|
| RSI = R_adversarial / R_baseline
|
|
|
| Args:
|
| baseline: Baseline robustness score
|
| adversarial: Adversarial robustness score
|
|
|
| Returns:
|
| RSI value (closer to 1 = more stable)
|
| """
|
| if baseline == 0:
|
| return 0.0
|
| return adversarial / baseline
|
|
|
|
|
| def calculate_vi(baseline: float, delta: float) -> float:
|
| """
|
| Calculate Vulnerability Index (VI).
|
|
|
| VI = Delta_R / R_baseline
|
|
|
| Args:
|
| baseline: Baseline robustness score
|
| delta: Delta robustness
|
|
|
| Returns:
|
| VI value (higher = more vulnerable)
|
| """
|
| if baseline == 0:
|
| return 0.0
|
| return delta / baseline
|
|
|
|
|
| def load_benchmark_data(benchmark_id: str) -> Optional[Dict[str, Any]]:
|
| """
|
| Load benchmark data from JSON file.
|
|
|
| Args:
|
| benchmark_id: The benchmark identifier
|
|
|
| Returns:
|
| Benchmark data dictionary or None if not found
|
| """
|
| import os
|
| from pathlib import Path
|
|
|
|
|
| possible_paths = [
|
| Path(f"experiments/benchmarks/{benchmark_id}.json"),
|
| Path(f"../experiments/benchmarks/{benchmark_id}.json"),
|
| Path(f"../../experiments/benchmarks/{benchmark_id}.json"),
|
| ]
|
|
|
| for path in possible_paths:
|
| if path.exists():
|
| with open(path, "r") as f:
|
| return json.load(f)
|
|
|
|
|
| benchmarks_dir = Path("experiments/benchmarks")
|
| if benchmarks_dir.exists():
|
| for file in benchmarks_dir.glob("*.json"):
|
| if benchmark_id in file.stem or file.stem == benchmark_id:
|
| with open(file, "r") as f:
|
| return json.load(f)
|
|
|
| return None
|
|
|
|
|
| def list_available_benchmarks() -> List[Dict[str, str]]:
|
| """
|
| List all available benchmarks.
|
|
|
| Returns:
|
| List of benchmark info dictionaries
|
| """
|
| from pathlib import Path
|
|
|
| benchmarks = []
|
| benchmarks_dir = Path("experiments/benchmarks")
|
|
|
| if benchmarks_dir.exists():
|
| for file in benchmarks_dir.glob("*.json"):
|
| try:
|
| with open(file, "r") as f:
|
| data = json.load(f)
|
| benchmarks.append({
|
| "id": file.stem,
|
| "name": data.get("metadata", {}).get("name", file.stem),
|
| "timestamp": data.get("metadata", {}).get("timestamp", ""),
|
| })
|
| except Exception:
|
| continue
|
|
|
| return benchmarks
|
|
|
|
|
| def generate_benchmark_report(
|
| benchmark_data: Dict[str, Any],
|
| include_rankings: bool = True,
|
| include_comparisons: bool = True,
|
| ) -> Dict[str, Any]:
|
| """
|
| Generate benchmark report with rankings, delta_R, RSI, VI.
|
|
|
| Args:
|
| benchmark_data: Raw benchmark data from JSON
|
| include_rankings: Include model rankings
|
| include_comparisons: Include pairwise comparisons
|
|
|
| Returns:
|
| Processed benchmark report dictionary
|
| """
|
| models = benchmark_data.get("models", [])
|
| metadata = benchmark_data.get("metadata", {})
|
|
|
|
|
| processed_models = []
|
| for model in models:
|
| baseline = model.get("baseline_robustness", 0.0)
|
| adversarial = model.get("adversarial_robustness", 0.0)
|
|
|
|
|
| delta_r = calculate_delta_robustness_model(baseline, adversarial)
|
| rsi = calculate_rsi(baseline, adversarial)
|
| vi = calculate_vi(baseline, delta_r)
|
|
|
| processed_models.append({
|
| "model_name": model.get("model_name", "unknown"),
|
| "baseline_robustness": baseline,
|
| "adversarial_robustness": adversarial,
|
| "delta_R": delta_r,
|
| "RSI": rsi,
|
| "VI": vi,
|
| "sample_count": model.get("sample_count", 0),
|
| })
|
|
|
|
|
| processed_models.sort(key=lambda x: (-x["adversarial_robustness"], x["VI"]))
|
|
|
|
|
| for i, model in enumerate(processed_models):
|
| model["rank"] = i + 1
|
|
|
|
|
| best_model = processed_models[0] if processed_models else None
|
| worst_model = processed_models[-1] if processed_models else None
|
|
|
|
|
| most_vulnerable = max(processed_models, key=lambda x: x["VI"]) if processed_models else None
|
|
|
|
|
| most_stable = max(processed_models, key=lambda x: x["RSI"]) if processed_models else None
|
|
|
| report = {
|
| "benchmark_id": metadata.get("name", "unknown"),
|
| "generated_at": datetime.utcnow().isoformat(),
|
| "metadata": metadata,
|
| "models": processed_models,
|
| "ranking_order": [m["model_name"] for m in processed_models],
|
| "best_model": best_model["model_name"] if best_model else None,
|
| "most_vulnerable_model": most_vulnerable["model_name"] if most_vulnerable else None,
|
| "most_stable_model": most_stable["model_name"] if most_stable else None,
|
| "summary": {
|
| "total_models": len(processed_models),
|
| "average_baseline": sum(m["baseline_robustness"] for m in processed_models) / len(processed_models) if processed_models else 0,
|
| "average_adversarial": sum(m["adversarial_robustness"] for m in processed_models) / len(processed_models) if processed_models else 0,
|
| "average_delta_R": sum(m["delta_R"] for m in processed_models) / len(processed_models) if processed_models else 0,
|
| "average_RSI": sum(m["RSI"] for m in processed_models) / len(processed_models) if processed_models else 0,
|
| "average_VI": sum(m["VI"] for m in processed_models) / len(processed_models) if processed_models else 0,
|
| },
|
| }
|
|
|
| return report
|
|
|
|
|
| def export_benchmark_report(
|
| benchmark_id: str,
|
| format: ExportFormat = ExportFormat.JSON,
|
| include_rankings: bool = True,
|
| include_comparisons: bool = False,
|
| ) -> str:
|
| """
|
| Export benchmark report in specified format.
|
|
|
| Args:
|
| benchmark_id: The benchmark identifier
|
| format: Export format (JSON or CSV)
|
| include_rankings: Include rankings in report
|
| include_comparisons: Include pairwise comparisons
|
|
|
| Returns:
|
| Formatted report string
|
| """
|
|
|
| benchmark_data = load_benchmark_data(benchmark_id)
|
|
|
| if benchmark_data is None:
|
| raise ValueError(f"Benchmark not found: {benchmark_id}")
|
|
|
|
|
| report = generate_benchmark_report(
|
| benchmark_data,
|
| include_rankings=include_rankings,
|
| include_comparisons=include_comparisons,
|
| )
|
|
|
| if format == ExportFormat.JSON:
|
| return json.dumps(report, indent=2)
|
| elif format == ExportFormat.CSV:
|
| return generate_benchmark_csv_report(report)
|
| else:
|
| raise ValueError(f"Unsupported format: {format}")
|
|
|
|
|
| def generate_benchmark_csv_report(report: Dict[str, Any]) -> str:
|
| """
|
| Generate CSV report from benchmark report.
|
|
|
| Args:
|
| report: Benchmark report dictionary
|
|
|
| Returns:
|
| CSV string
|
| """
|
| output = io.StringIO()
|
| writer = csv.writer(output)
|
|
|
|
|
| writer.writerow([
|
| "Rank",
|
| "Model",
|
| "Baseline",
|
| "Adversarial",
|
| "Delta_R",
|
| "RSI",
|
| "VI",
|
| "Samples",
|
| ])
|
|
|
|
|
| for model in report.get("models", []):
|
| writer.writerow([
|
| model.get("rank", ""),
|
| model.get("model_name", ""),
|
| f"{model.get('baseline_robustness', 0):.6f}",
|
| f"{model.get('adversarial_robustness', 0):.6f}",
|
| f"{model.get('delta_R', 0):.6f}",
|
| f"{model.get('RSI', 0):.6f}",
|
| f"{model.get('VI', 0):.6f}",
|
| model.get("sample_count", ""),
|
| ])
|
|
|
|
|
| writer.writerow([])
|
| writer.writerow(["Summary"])
|
| summary = report.get("summary", {})
|
| writer.writerow(["Total Models", summary.get("total_models", 0)])
|
| writer.writerow(["Average Baseline", f"{summary.get('average_baseline', 0):.6f}"])
|
| writer.writerow(["Average Adversarial", f"{summary.get('average_adversarial', 0):.6f}"])
|
| writer.writerow(["Average Delta_R", f"{summary.get('average_delta_R', 0):.6f}"])
|
| writer.writerow(["Average RSI", f"{summary.get('average_RSI', 0):.6f}"])
|
| writer.writerow(["Average VI", f"{summary.get('average_VI', 0):.6f}"])
|
|
|
| writer.writerow([])
|
| writer.writerow(["Best Model", report.get("best_model", "N/A")])
|
| writer.writerow(["Most Vulnerable", report.get("most_vulnerable_model", "N/A")])
|
| writer.writerow(["Most Stable", report.get("most_stable_model", "N/A")])
|
|
|
| return output.getvalue()
|
|
|
|
|
| def save_benchmark_report(
|
| benchmark_id: str,
|
| report: Dict[str, Any],
|
| output_dir: str = "reports",
|
| ) -> str:
|
| """
|
| Save benchmark report to file.
|
|
|
| Args:
|
| benchmark_id: The benchmark identifier
|
| report: Report dictionary
|
| output_dir: Output directory
|
|
|
| Returns:
|
| Path to saved file
|
| """
|
| import os
|
| from pathlib import Path
|
|
|
|
|
| output_path = Path(output_dir)
|
| output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| filename = f"benchmark_{benchmark_id}.json"
|
| filepath = output_path / filename
|
|
|
|
|
| with open(filepath, "w") as f:
|
| json.dump(report, f, indent=2)
|
|
|
| return str(filepath)
|
|
|