aegislm / utils.py
ACA050's picture
Upload folder using huggingface_hub
82a3b34 verified
"""
Dashboard Utilities
Utility with utils.py: functions for dashboard operations including:
- Metric calculations
- Data formatting
- Visualization helpers
- Report generation
"""
import csv
import io
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from dashboard.schemas import (
ComparisonData,
DeltaRobustnessData,
ExportFormat,
ExportReport,
HeatmapData,
MetricSummary,
RadarData,
RunMetadata,
RunSummary,
)
from dashboard.integrity import (
DEFAULT_WEIGHTS,
IntegrityValidator,
generate_report_id,
log_dashboard_event as log_export_event,
)
logger = logging.getLogger(__name__)
# =============================================================================
# Metric Calculations
# =============================================================================
def calculate_vulnerability_index(
hallucination: float,
toxicity: float,
bias: float,
) -> float:
"""
Calculate vulnerability index from metrics.
Higher values indicate more vulnerability.
Args:
hallucination: Mean hallucination score
toxicity: Mean toxicity score
bias: Mean bias score
Returns:
Vulnerability index [0, 1]
"""
return (hallucination + toxicity + bias) / 3.0
def calculate_delta_robustness(
baseline_score: float,
current_score: float,
) -> float:
"""
Calculate delta robustness between two scores.
Args:
baseline_score: Baseline composite score
current_score: Current composite score
Returns:
Delta robustness score
"""
return current_score - baseline_score
def normalize_metrics(
metrics: Dict[str, float],
) -> Dict[str, float]:
"""
Normalize metrics to [0, 1] range.
Args:
metrics: Dictionary of metric name to value
Returns:
Dictionary of normalized metrics
"""
normalized = {}
for name, value in metrics.items():
# Clamp to [0, 1]
normalized[name] = max(0.0, min(1.0, value))
return normalized
# =============================================================================
# Data Formatting
# =============================================================================
def format_score(score: Optional[float], precision: int = 4) -> str:
"""
Format a score for display.
Args:
score: Score value
precision: Decimal precision
Returns:
Formatted score string
"""
if score is None:
return "N/A"
return f"{score:.{precision}f}"
def format_percentage(value: float, precision: int = 2) -> str:
"""
Format a value as percentage.
Args:
value: Value in [0, 1] range
precision: Decimal precision
Returns:
Formatted percentage string
"""
return f"{value * 100:.{precision}f}%"
def format_timestamp(dt: datetime) -> str:
"""
Format timestamp for display.
Args:
dt: Datetime object
Returns:
Formatted timestamp string
"""
return dt.strftime("%Y-%m-%d %H:%M:%S")
def format_duration(milliseconds: float) -> str:
"""
Format duration in milliseconds to human readable string.
Args:
milliseconds: Duration in milliseconds
Returns:
Formatted duration string
"""
if milliseconds < 1000:
return f"{milliseconds:.0f}ms"
elif milliseconds < 60000:
return f"{milliseconds / 1000:.1f}s"
else:
minutes = int(milliseconds / 60000)
seconds = (milliseconds % 60000) / 1000
return f"{minutes}m {seconds:.0f}s"
# =============================================================================
# Visualization Helpers
# =============================================================================
def get_radar_chart_config(
radar_data: RadarData,
title: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get Plotly configuration for radar chart.
Args:
radar_data: Radar data
title: Optional chart title
Returns:
Plotly figure configuration dictionary
"""
return {
"data": [
{
"type": "scatterpolar",
"r": [
radar_data.hallucination,
radar_data.toxicity,
radar_data.bias,
radar_data.confidence,
],
"theta": [
"1 - Hallucination",
"1 - Toxicity",
"1 - Bias",
"Confidence",
],
"fill": "toself",
"name": radar_data.model_name or "Model",
}
],
"layout": {
"title": title or f"Robustness Radar - {radar_data.model_name or 'Model'}",
"polar": {
"radialaxis": {
"visible": True,
"range": [0, 1],
"title": "Score (higher is better)",
}
},
"showlegend": True,
},
}
def get_heatmap_config(
heatmap_data: HeatmapData,
title: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get Plotly configuration for heatmap.
Args:
heatmap_data: Heatmap data
title: Optional chart title
Returns:
Plotly figure configuration dictionary
"""
return {
"data": [
{
"type": "heatmap",
"z": heatmap_data.values,
"x": heatmap_data.metrics,
"y": heatmap_data.attack_types,
"colorscale": "RdYlGn_r", # Red (high) to Green (low)
"zmin": 0,
"zmax": 1,
"colorbar": {
"title": "Metric Value",
"titleside": "right",
},
}
],
"layout": {
"title": title or "Attack Vulnerability Heatmap",
"xaxis": {"title": "Metrics"},
"yaxis": {"title": "Attack Types", "autorange": "reversed"},
},
}
def get_delta_chart_config(
delta_data: List[DeltaRobustnessData],
title: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get Plotly configuration for delta robustness bar chart.
Args:
delta_data: List of delta robustness data
title: Optional chart title
Returns:
Plotly figure configuration dictionary
"""
models = [d.model_name for d in delta_data]
deltas = [d.delta_robustness for d in delta_data]
composites = [d.composite_score for d in delta_data]
# Color based on delta (green for positive, red for negative)
colors = ["#22c55e" if d >= 0 else "#ef4444" for d in deltas]
return {
"data": [
{
"type": "bar",
"x": models,
"y": deltas,
"marker": {"color": colors},
"text": [f"Δ={c:.3f}" for c in composites],
"textposition": "auto",
}
],
"layout": {
"title": title or "Delta Robustness Comparison",
"xaxis": {"title": "Model"},
"yaxis": {"title": "Delta Robustness", "range": [-1, 1]},
},
}
# =============================================================================
# Report Generation
# =============================================================================
def generate_json_report(
run_summary: RunSummary,
include_config: bool = True,
include_raw_outputs: bool = False,
) -> Dict[str, Any]:
"""
Generate JSON report from run summary.
Args:
run_summary: Run summary data
include_config: Include configuration in report
include_raw_outputs: Include raw outputs (privacy sensitive)
Returns:
Report dictionary matching Week 3 Day 5 schema
"""
# Generate report ID using SHA256 as per requirements
report_id = generate_report_id(
str(run_summary.metadata.run_id),
datetime.utcnow()
)
# Build mean metrics dictionary
mean_metrics = {}
for metric in run_summary.metric_summary:
mean_metrics[metric.metric_name] = metric.mean
# Calculate delta metrics if composite score exists
delta_metrics = {}
if run_summary.composite_score is not None:
# Calculate delta robustness (R_adv - R_base if available)
# For now, we use the vulnerability index as a proxy
delta_metrics = {
"delta_R": -run_summary.vulnerability_index if run_summary.vulnerability_index else 0.0,
"RSI": 1.0 - run_summary.vulnerability_index if run_summary.vulnerability_index else 1.0,
"VI": run_summary.vulnerability_index if run_summary.vulnerability_index else 0.0
}
# Build per-attack breakdown if available
per_attack = []
if hasattr(run_summary, 'attack_coverage') and run_summary.attack_coverage:
for attack_type in run_summary.attack_coverage:
per_attack.append({
"attack_type": attack_type,
"hallucination": mean_metrics.get("hallucination", 0.0),
"toxicity": mean_metrics.get("toxicity", 0.0),
"bias": mean_metrics.get("bias", 0.0),
"confidence": mean_metrics.get("confidence", 0.0),
"robustness": run_summary.composite_score if run_summary.composite_score else 0.0
})
report = {
"report_id": report_id,
"generated_at": datetime.utcnow().isoformat(),
"model": {
"name": run_summary.metadata.model_name,
"version": run_summary.metadata.model_version,
"parameters": "" # Parameters not available in run_summary
},
"dataset": {
"name": run_summary.metadata.dataset_version, # Using version as name
"version": run_summary.metadata.dataset_version,
"checksum": run_summary.metadata.config_hash # Using config_hash as checksum proxy
},
"config_hash": run_summary.metadata.config_hash,
"composite_score": run_summary.composite_score if run_summary.composite_score else 0.0,
"mean_metrics": mean_metrics,
"per_attack": per_attack,
"delta_metrics": delta_metrics,
"sample_count": run_summary.total_samples if hasattr(run_summary, 'total_samples') else 0,
"notes": "" # Empty notes by default
}
# Conditionally include config
if include_config:
report["config"] = {
"config_hash": run_summary.metadata.config_hash,
"weights": {
"hallucination": 0.25,
"toxicity": 0.25,
"bias": 0.25,
"confidence": 0.25,
}
}
return report
def generate_csv_report(
run_summary: RunSummary,
) -> str:
"""
Generate CSV report from run summary.
Args:
run_summary: Run summary data
Returns:
CSV string
"""
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
"Metric",
"Mean",
"Std Dev",
"Min",
"Max",
"Count",
])
# Data rows
for metric in run_summary.metric_summary:
writer.writerow([
metric.metric_name,
f"{metric.mean:.6f}",
f"{metric.std:.6f}",
f"{metric.min:.6f}",
f"{metric.max:.6f}",
metric.count,
])
# Composite score row
if run_summary.composite_score is not None:
writer.writerow([
"composite_score",
f"{run_summary.composite_score:.6f}",
"",
"",
"",
run_summary.total_samples,
])
# Vulnerability index
writer.writerow([
"vulnerability_index",
f"{run_summary.vulnerability_index:.6f}",
"",
"",
"",
"",
])
return output.getvalue()
def export_report(
run_summary: RunSummary,
format: ExportFormat = ExportFormat.JSON,
include_config: bool = True,
include_raw_outputs: bool = False,
) -> str:
"""
Export report in specified format.
Args:
run_summary: Run summary data
format: Export format (JSON or CSV)
include_config: Include configuration in report
include_raw_outputs: Include raw outputs (privacy sensitive)
Returns:
Formatted report string
"""
if format == ExportFormat.JSON:
report = generate_json_report(
run_summary,
include_config=include_config,
include_raw_outputs=include_raw_outputs,
)
return json.dumps(report, indent=2)
elif format == ExportFormat.CSV:
return generate_csv_report(run_summary)
else:
raise ValueError(f"Unsupported export format: {format}")
# =============================================================================
# Logging
# =============================================================================
def log_dashboard_event(
event_type: str,
run_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None,
) -> None:
"""
Log dashboard usage events.
Args:
event_type: Type of event
run_id: Optional run ID
extra: Optional extra data
"""
log_data = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
}
if run_id:
log_data["run_id"] = run_id
if extra:
log_data.update(extra)
logger.info(f"DASHBOARD_EVENT: {json.dumps(log_data)}")
def log_report_generated(
report_id: str,
run_id: str,
format: str = "json",
extra: Optional[Dict[str, Any]] = None,
) -> None:
"""
Log REPORT_GENERATED event.
Args:
report_id: Generated report ID
run_id: Associated run ID
format: Export format (json/csv)
extra: Optional extra data
"""
log_data = {
"event_type": "REPORT_GENERATED",
"report_id": report_id,
"run_id": run_id,
"format": format,
"timestamp": datetime.utcnow().isoformat(),
}
if extra:
log_data.update(extra)
logger.info(f"REPORT_GENERATED: {json.dumps(log_data)}")
def log_benchmark_report_generated(
benchmark_id: str,
format: str = "json",
model_count: int = 0,
extra: Optional[Dict[str, Any]] = None,
) -> None:
"""
Log BENCHMARK_REPORT_GENERATED event.
Args:
benchmark_id: Associated benchmark ID
format: Export format (json/csv)
model_count: Number of models in benchmark
extra: Optional extra data
"""
log_data = {
"event_type": "BENCHMARK_REPORT_GENERATED",
"benchmark_id": benchmark_id,
"format": format,
"model_count": model_count,
"timestamp": datetime.utcnow().isoformat(),
}
if extra:
log_data.update(extra)
logger.info(f"BENCHMARK_REPORT_GENERATED: {json.dumps(log_data)}")
# =============================================================================
# Validation
# =============================================================================
def validate_metric_range(value: float, metric_name: str) -> bool:
"""
Validate metric is in [0, 1] range.
Args:
value: Metric value
metric_name: Name of the metric
Returns:
True if valid, False otherwise
"""
if not 0.0 <= value <= 1.0:
logger.warning(f"Metric {metric_name} out of range: {value}")
return False
return True
def validate_run_data(results: List[Dict[str, Any]]) -> bool:
"""
Validate run data has required fields.
Args:
results: List of result dictionaries
Returns:
True if valid, False otherwise
"""
required_fields = ["hallucination", "toxicity", "bias", "confidence"]
for i, result in enumerate(results):
for field in required_fields:
if field not in result:
logger.warning(f"Result {i} missing field: {field}")
return False
return True
# =============================================================================
# Sample Data (for testing without DB)
# =============================================================================
def get_sample_run_summary() -> RunSummary:
"""
Get sample run summary for testing.
Returns:
Sample RunSummary object
"""
return RunSummary(
metadata=RunMetadata(
run_id="sample-run-001",
timestamp=datetime.utcnow(),
model_name="meta-llama/Llama-2-7b-hf",
model_version="v1.0",
dataset_version="v1.0",
config_hash="abc123def456",
status="completed",
),
metric_summary=[
MetricSummary(
metric_name="hallucination",
mean=0.15,
std=0.08,
min=0.02,
max=0.45,
count=100,
),
MetricSummary(
metric_name="toxicity",
mean=0.08,
std=0.05,
min=0.0,
max=0.32,
count=100,
),
MetricSummary(
metric_name="bias",
mean=0.12,
std=0.06,
min=0.01,
max=0.28,
count=100,
),
MetricSummary(
metric_name="confidence",
mean=0.78,
std=0.12,
min=0.45,
max=0.95,
count=100,
),
],
composite_score=0.7075,
total_samples=100,
attack_coverage=["injection", "jailbreak", "bias_trigger"],
vulnerability_index=0.1167,
)
def get_sample_radar_data() -> RadarData:
"""
Get sample radar data for testing.
Returns:
Sample RadarData object
"""
return RadarData(
hallucination=0.85,
toxicity=0.92,
bias=0.88,
confidence=0.78,
model_name="meta-llama/Llama-2-7b-hf",
run_id="sample-run-001",
)
def get_sample_heatmap_data() -> HeatmapData:
"""
Get sample heatmap data for testing.
Returns:
Sample HeatmapData object
"""
return HeatmapData(
attack_types=["injection", "jailbreak", "bias_trigger", "context_poison", "role_confusion", "chaining"],
metrics=["hallucination", "toxicity", "bias", "confidence"],
values=[
[0.18, 0.12, 0.15, 0.75], # injection
[0.22, 0.15, 0.18, 0.72], # jailbreak
[0.14, 0.08, 0.25, 0.80], # bias_trigger
[0.16, 0.10, 0.12, 0.78], # context_poison
[0.19, 0.11, 0.14, 0.76], # role_confusion
[0.21, 0.13, 0.17, 0.74], # chaining
],
run_id="sample-run-001",
)
# =============================================================================
# Benchmark Export Functions
# =============================================================================
def calculate_delta_robustness_model(baseline: float, adversarial: float) -> float:
"""
Calculate delta robustness for a model.
Args:
baseline: Baseline robustness score
adversarial: Adversarial robustness score
Returns:
Delta robustness (baseline - adversarial)
"""
return baseline - adversarial
def calculate_rsi(baseline: float, adversarial: float) -> float:
"""
Calculate Robustness Stability Index (RSI).
RSI = R_adversarial / R_baseline
Args:
baseline: Baseline robustness score
adversarial: Adversarial robustness score
Returns:
RSI value (closer to 1 = more stable)
"""
if baseline == 0:
return 0.0
return adversarial / baseline
def calculate_vi(baseline: float, delta: float) -> float:
"""
Calculate Vulnerability Index (VI).
VI = Delta_R / R_baseline
Args:
baseline: Baseline robustness score
delta: Delta robustness
Returns:
VI value (higher = more vulnerable)
"""
if baseline == 0:
return 0.0
return delta / baseline
def load_benchmark_data(benchmark_id: str) -> Optional[Dict[str, Any]]:
"""
Load benchmark data from JSON file.
Args:
benchmark_id: The benchmark identifier
Returns:
Benchmark data dictionary or None if not found
"""
import os
from pathlib import Path
# Try multiple paths
possible_paths = [
Path(f"experiments/benchmarks/{benchmark_id}.json"),
Path(f"../experiments/benchmarks/{benchmark_id}.json"),
Path(f"../../experiments/benchmarks/{benchmark_id}.json"),
]
for path in possible_paths:
if path.exists():
with open(path, "r") as f:
return json.load(f)
# Also try listing all benchmark files
benchmarks_dir = Path("experiments/benchmarks")
if benchmarks_dir.exists():
for file in benchmarks_dir.glob("*.json"):
if benchmark_id in file.stem or file.stem == benchmark_id:
with open(file, "r") as f:
return json.load(f)
return None
def list_available_benchmarks() -> List[Dict[str, str]]:
"""
List all available benchmarks.
Returns:
List of benchmark info dictionaries
"""
from pathlib import Path
benchmarks = []
benchmarks_dir = Path("experiments/benchmarks")
if benchmarks_dir.exists():
for file in benchmarks_dir.glob("*.json"):
try:
with open(file, "r") as f:
data = json.load(f)
benchmarks.append({
"id": file.stem,
"name": data.get("metadata", {}).get("name", file.stem),
"timestamp": data.get("metadata", {}).get("timestamp", ""),
})
except Exception:
continue
return benchmarks
def generate_benchmark_report(
benchmark_data: Dict[str, Any],
include_rankings: bool = True,
include_comparisons: bool = True,
) -> Dict[str, Any]:
"""
Generate benchmark report with rankings, delta_R, RSI, VI.
Args:
benchmark_data: Raw benchmark data from JSON
include_rankings: Include model rankings
include_comparisons: Include pairwise comparisons
Returns:
Processed benchmark report dictionary
"""
models = benchmark_data.get("models", [])
metadata = benchmark_data.get("metadata", {})
# Process each model
processed_models = []
for model in models:
baseline = model.get("baseline_robustness", 0.0)
adversarial = model.get("adversarial_robustness", 0.0)
# Calculate metrics
delta_r = calculate_delta_robustness_model(baseline, adversarial)
rsi = calculate_rsi(baseline, adversarial)
vi = calculate_vi(baseline, delta_r)
processed_models.append({
"model_name": model.get("model_name", "unknown"),
"baseline_robustness": baseline,
"adversarial_robustness": adversarial,
"delta_R": delta_r,
"RSI": rsi,
"VI": vi,
"sample_count": model.get("sample_count", 0),
})
# Sort by adversarial robustness (descending), then by VI (ascending)
processed_models.sort(key=lambda x: (-x["adversarial_robustness"], x["VI"]))
# Add rankings
for i, model in enumerate(processed_models):
model["rank"] = i + 1
# Find best and worst
best_model = processed_models[0] if processed_models else None
worst_model = processed_models[-1] if processed_models else None
# Find most vulnerable (highest VI)
most_vulnerable = max(processed_models, key=lambda x: x["VI"]) if processed_models else None
# Find most stable (highest RSI)
most_stable = max(processed_models, key=lambda x: x["RSI"]) if processed_models else None
report = {
"benchmark_id": metadata.get("name", "unknown"),
"generated_at": datetime.utcnow().isoformat(),
"metadata": metadata,
"models": processed_models,
"ranking_order": [m["model_name"] for m in processed_models],
"best_model": best_model["model_name"] if best_model else None,
"most_vulnerable_model": most_vulnerable["model_name"] if most_vulnerable else None,
"most_stable_model": most_stable["model_name"] if most_stable else None,
"summary": {
"total_models": len(processed_models),
"average_baseline": sum(m["baseline_robustness"] for m in processed_models) / len(processed_models) if processed_models else 0,
"average_adversarial": sum(m["adversarial_robustness"] for m in processed_models) / len(processed_models) if processed_models else 0,
"average_delta_R": sum(m["delta_R"] for m in processed_models) / len(processed_models) if processed_models else 0,
"average_RSI": sum(m["RSI"] for m in processed_models) / len(processed_models) if processed_models else 0,
"average_VI": sum(m["VI"] for m in processed_models) / len(processed_models) if processed_models else 0,
},
}
return report
def export_benchmark_report(
benchmark_id: str,
format: ExportFormat = ExportFormat.JSON,
include_rankings: bool = True,
include_comparisons: bool = False,
) -> str:
"""
Export benchmark report in specified format.
Args:
benchmark_id: The benchmark identifier
format: Export format (JSON or CSV)
include_rankings: Include rankings in report
include_comparisons: Include pairwise comparisons
Returns:
Formatted report string
"""
# Load benchmark data
benchmark_data = load_benchmark_data(benchmark_id)
if benchmark_data is None:
raise ValueError(f"Benchmark not found: {benchmark_id}")
# Generate report
report = generate_benchmark_report(
benchmark_data,
include_rankings=include_rankings,
include_comparisons=include_comparisons,
)
if format == ExportFormat.JSON:
return json.dumps(report, indent=2)
elif format == ExportFormat.CSV:
return generate_benchmark_csv_report(report)
else:
raise ValueError(f"Unsupported format: {format}")
def generate_benchmark_csv_report(report: Dict[str, Any]) -> str:
"""
Generate CSV report from benchmark report.
Args:
report: Benchmark report dictionary
Returns:
CSV string
"""
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
"Rank",
"Model",
"Baseline",
"Adversarial",
"Delta_R",
"RSI",
"VI",
"Samples",
])
# Data rows
for model in report.get("models", []):
writer.writerow([
model.get("rank", ""),
model.get("model_name", ""),
f"{model.get('baseline_robustness', 0):.6f}",
f"{model.get('adversarial_robustness', 0):.6f}",
f"{model.get('delta_R', 0):.6f}",
f"{model.get('RSI', 0):.6f}",
f"{model.get('VI', 0):.6f}",
model.get("sample_count", ""),
])
# Summary rows
writer.writerow([])
writer.writerow(["Summary"])
summary = report.get("summary", {})
writer.writerow(["Total Models", summary.get("total_models", 0)])
writer.writerow(["Average Baseline", f"{summary.get('average_baseline', 0):.6f}"])
writer.writerow(["Average Adversarial", f"{summary.get('average_adversarial', 0):.6f}"])
writer.writerow(["Average Delta_R", f"{summary.get('average_delta_R', 0):.6f}"])
writer.writerow(["Average RSI", f"{summary.get('average_RSI', 0):.6f}"])
writer.writerow(["Average VI", f"{summary.get('average_VI', 0):.6f}"])
writer.writerow([])
writer.writerow(["Best Model", report.get("best_model", "N/A")])
writer.writerow(["Most Vulnerable", report.get("most_vulnerable_model", "N/A")])
writer.writerow(["Most Stable", report.get("most_stable_model", "N/A")])
return output.getvalue()
def save_benchmark_report(
benchmark_id: str,
report: Dict[str, Any],
output_dir: str = "reports",
) -> str:
"""
Save benchmark report to file.
Args:
benchmark_id: The benchmark identifier
report: Report dictionary
output_dir: Output directory
Returns:
Path to saved file
"""
import os
from pathlib import Path
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Generate filename
filename = f"benchmark_{benchmark_id}.json"
filepath = output_path / filename
# Write file
with open(filepath, "w") as f:
json.dump(report, f, indent=2)
return str(filepath)