ACA050's picture
Upload 50 files
1a4aa87 verified
"""
Benchmarking Engine
Main orchestration for the AegisLM Benchmarking Engine:
- Baseline evaluation mode
- Adversarial evaluation mode
- Delta robustness computation
- Cross-model comparison
- Statistical reporting
- Benchmark artifact generation
"""
import asyncio
import time
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from uuid import UUID
from backend.benchmarking.comparison import (
generate_comparative_report,
generate_vulnerability_heatmap,
rank_models,
)
from backend.benchmarking.reporter import (
generate_benchmark_artifact,
generate_text_report,
)
from backend.benchmarking.schemas import (
BenchmarkConfig,
BenchmarkMode,
BenchmarkPerformance,
BenchmarkResult,
BenchmarkStatus,
BenchmarkWeights,
EvaluationResult,
MetricDeltas,
ModelBenchmarkResult,
ModelMetrics,
StartBenchmarkRequest,
)
from backend.benchmarking.statistics import (
MetricStatistics,
calculate_vulnerability_consistency,
)
from backend.core.config import settings
from backend.core.orchestrator import (
EvaluationInput,
EvaluationOrchestrator,
RunStatus,
)
from backend.logging.logger import get_logger
# =============================================================================
# Benchmark Events
# =============================================================================
class BenchmarkEvent(str, Enum):
"""Observability events for benchmarking."""
BENCHMARK_STARTED = "BENCHMARK_STARTED"
BENCHMARK_COMPLETED = "BENCHMARK_COMPLETED"
BENCHMARK_FAILED = "BENCHMARK_FAILED"
MODEL_EVALUATION_STARTED = "MODEL_EVALUATION_STARTED"
MODEL_EVALUATION_COMPLETED = "MODEL_EVALUATION_COMPLETED"
BASELINE_COMPLETED = "BASELINE_COMPLETED"
ADVERSARIAL_COMPLETED = "ADVERSARIAL_COMPLETED"
DELTA_COMPUTED = "DELTA_COMPUTED"
# =============================================================================
# Benchmark Engine
# =============================================================================
class BenchmarkEngine:
"""
Main benchmarking engine for AegisLM.
Coordinates:
- Baseline evaluation (no attacks)
- Adversarial evaluation (full pipeline)
- Delta robustness computation
- Cross-model comparison
- Artifact generation
"""
def __init__(self):
self.logger = get_logger(__name__)
self._orchestrator = EvaluationOrchestrator()
self._active_benchmarks: Dict[str, asyncio.Task] = {}
def _log_event(
self,
event: BenchmarkEvent,
benchmark_id: str,
**kwargs: Any
) -> None:
"""Log benchmark event."""
log_data = {
"event": event.value,
"benchmark_id": benchmark_id,
"timestamp": datetime.utcnow().isoformat(),
}
log_data.update(kwargs)
if event in [BenchmarkEvent.BENCHMARK_STARTED, BenchmarkEvent.BENCHMARK_COMPLETED]:
self.logger.info("Benchmark event", **log_data)
elif event in [BenchmarkEvent.BENCHMARK_FAILED]:
self.logger.error("Benchmark event", **log_data)
else:
self.logger.debug("Benchmark event", **log_data)
async def start_benchmark(
self,
request: StartBenchmarkRequest,
) -> UUID:
"""
Start a new benchmark run.
Args:
request: Benchmark configuration
Returns:
Benchmark ID
"""
benchmark_id = uuid.uuid4()
benchmark_id_str = str(benchmark_id)
self.logger.info(
"Starting benchmark",
benchmark_id=benchmark_id_str,
models=request.models,
dataset=request.dataset_name,
)
# Create benchmark config
weights = request.weights or BenchmarkWeights()
config = BenchmarkConfig(
benchmark_id=benchmark_id,
models=request.models,
dataset_name=request.dataset_name,
dataset_version=request.dataset_version,
attack_enabled=request.attack_enabled,
mutation_depth=request.mutation_depth,
weights=weights,
max_concurrency=request.max_concurrency,
max_samples=request.max_samples,
enable_baseline=request.enable_baseline,
enable_adversarial=request.enable_adversarial,
attack_types=request.attack_types or ["jailbreak"],
)
# Validate config
config.validate_config()
# Start async execution
task = asyncio.create_task(
self._execute_benchmark(config)
)
self._active_benchmarks[benchmark_id_str] = task
return benchmark_id
async def _execute_benchmark(
self,
config: BenchmarkConfig,
) -> BenchmarkResult:
"""
Execute the benchmark asynchronously.
Args:
config: Benchmark configuration
Returns:
Complete benchmark result
"""
benchmark_id = config.benchmark_id
benchmark_id_str = str(benchmark_id)
start_time = datetime.utcnow()
# Initialize result
result = BenchmarkResult(
benchmark_id=benchmark_id,
dataset_name=config.dataset_name,
dataset_version=config.dataset_version,
models=config.models,
status=BenchmarkStatus.RUNNING,
results=[],
performance=BenchmarkPerformance(),
started_at=start_time,
config=config.model_dump(),
)
# Log start
self._log_event(
BenchmarkEvent.BENCHMARK_STARTED,
benchmark_id_str,
models=config.models,
dataset=config.dataset_name,
)
try:
# Evaluate each model
for model_name in config.models:
self._log_event(
BenchmarkEvent.MODEL_EVALUATION_STARTED,
benchmark_id_str,
model=model_name,
)
model_start_time = time.time()
# Evaluate model
model_result = await self._evaluate_model(
config=config,
model_name=model_name,
benchmark_id=benchmark_id_str,
)
model_time = time.time() - model_start_time
# Update performance tracking
result.performance.time_per_model_seconds[model_name] = model_time
result.performance.sample_counts[model_name] = (
model_result.adversarial.sample_count if model_result.adversarial else 0
)
result.performance.failure_rates[model_name] = (
model_result.adversarial.failure_rate if model_result.adversarial else 1.0
)
result.results.append(model_result)
self._log_event(
BenchmarkEvent.MODEL_EVALUATION_COMPLETED,
benchmark_id_str,
model=model_name,
time_seconds=model_time,
)
# Compute rankings (if multiple models)
if len(config.models) > 1:
result.rankings = rank_models(result.results)
# Generate vulnerability heatmap
result.vulnerability_heatmap = generate_vulnerability_heatmap(
result.results,
config.attack_types,
)
# Mark as completed
result.status = BenchmarkStatus.COMPLETED
result.completed_at = datetime.utcnow()
# Generate artifact
artifact_path = generate_benchmark_artifact(result)
self.logger.info(
"Benchmark artifact saved",
benchmark_id=benchmark_id_str,
path=artifact_path,
)
# Log completion
self._log_event(
BenchmarkEvent.BENCHMARK_COMPLETED,
benchmark_id_str,
models=config.models,
completed_at=result.completed_at.isoformat(),
)
except Exception as e:
result.status = BenchmarkStatus.FAILED
result.error = str(e)
result.completed_at = datetime.utcnow()
self.logger.error(
"Benchmark failed",
benchmark_id=benchmark_id_str,
error=str(e),
)
self._log_event(
BenchmarkEvent.BENCHMARK_FAILED,
benchmark_id_str,
error=str(e),
)
finally:
# Clean up active benchmark
self._active_benchmarks.pop(benchmark_id_str, None)
return result
async def _evaluate_model(
self,
config: BenchmarkConfig,
model_name: str,
benchmark_id: str,
) -> ModelBenchmarkResult:
"""
Evaluate a single model.
Args:
config: Benchmark configuration
model_name: Name of the model to evaluate
benchmark_id: Benchmark ID for logging
Returns:
Complete benchmark result for the model
"""
model_result = ModelBenchmarkResult(model_name=model_name)
# Create sampling config if max_samples is set
sampling_config = None
if config.max_samples:
sampling_config = {
"method": "random",
"sample_size": config.max_samples,
}
# Run baseline evaluation
if config.enable_baseline:
baseline_result = await self._run_evaluation(
model_name=model_name,
config=config,
mode=BenchmarkMode.BASELINE,
attack_enabled=False,
benchmark_id=benchmark_id,
sampling_config=sampling_config,
)
model_result.baseline = baseline_result
model_result.baseline_robustness = baseline_result.metrics.robustness
self._log_event(
BenchmarkEvent.BASELINE_COMPLETED,
benchmark_id,
model=model_name,
robustness=model_result.baseline_robustness,
)
# Run adversarial evaluation
if config.enable_adversarial:
adversarial_result = await self._run_evaluation(
model_name=model_name,
config=config,
mode=BenchmarkMode.ADVERSARIAL,
attack_enabled=config.attack_enabled,
benchmark_id=benchmark_id,
sampling_config=sampling_config,
)
model_result.adversarial = adversarial_result
model_result.adversarial_robustness = adversarial_result.metrics.robustness
self._log_event(
BenchmarkEvent.ADVERSARIAL_COMPLETED,
benchmark_id,
model=model_name,
robustness=model_result.adversarial_robustness,
)
# Compute deltas and derived metrics
if model_result.baseline and model_result.adversarial:
model_result.deltas = self._compute_deltas(
baseline=model_result.baseline,
adversarial=model_result.adversarial,
)
# Compute delta robustness
# ΔR = R_base - R_adv
model_result.delta_robustness = (
model_result.baseline_robustness - model_result.adversarial_robustness
)
# Compute Robustness Stability Index (RSI)
# RSI = R_adv / R_base
if model_result.baseline_robustness and model_result.baseline_robustness > 0:
model_result.robustness_stability_index = (
model_result.adversarial_robustness / model_result.baseline_robustness
)
else:
model_result.robustness_stability_index = 0.0
# Compute Vulnerability Index (VI)
# VI = delta_R / R_base
if model_result.baseline_robustness and model_result.baseline_robustness > 0:
model_result.vulnerability_index = (
model_result.delta_robustness / model_result.baseline_robustness
)
else:
model_result.vulnerability_index = 0.0
self._log_event(
BenchmarkEvent.DELTA_COMPUTED,
benchmark_id,
model=model_name,
delta_robustness=model_result.delta_robustness,
rsi=model_result.robustness_stability_index,
vi=model_result.vulnerability_index,
)
return model_result
async def _run_evaluation(
self,
model_name: str,
config: BenchmarkConfig,
mode: BenchmarkMode,
attack_enabled: bool,
benchmark_id: str,
sampling_config: Optional[Dict[str, Any]] = None,
) -> EvaluationResult:
"""
Run a single evaluation (baseline or adversarial).
Args:
model_name: Model to evaluate
config: Benchmark config
mode: Evaluation mode
attack_enabled: Whether to enable attacks
benchmark_id: Benchmark ID
sampling_config: Optional sampling config
Returns:
Evaluation result
"""
# Create evaluation input
eval_input = EvaluationInput(
model_name=model_name,
dataset_name=config.dataset_name,
dataset_version=config.dataset_version,
weights={
"hallucination": config.weights.hallucination,
"toxicity": config.weights.toxicity,
"bias": config.weights.bias,
"confidence": config.weights.confidence,
},
mutation_depth=config.mutation_depth if attack_enabled else 0,
attack_types=config.attack_types if attack_enabled else [],
max_concurrency=config.max_concurrency,
sampling_config=sampling_config,
)
# Run evaluation using orchestrator
output = await self._orchestrator.start_run(eval_input)
# Wait for completion
run_id = output.run_id
# Poll for completion (in production, this would be async callback)
max_wait = 300 # 5 minutes
waited = 0
poll_interval = 1
while waited < max_wait:
status = await self._orchestrator.get_run_status(run_id)
if status and status.status in [RunStatus.COMPLETED, RunStatus.FAILED]:
break
await asyncio.sleep(poll_interval)
waited += poll_interval
# Get final status
final_status = await self._orchestrator.get_run_status(run_id)
if final_status and final_status.status == RunStatus.COMPLETED:
# Extract metrics from output
metrics = ModelMetrics(
hallucination=final_status.metrics.get("hallucination", 0.5),
toxicity=final_status.metrics.get("toxicity", 0.5),
bias=final_status.metrics.get("bias", 0.5),
confidence=final_status.metrics.get("confidence", 0.5),
robustness=final_status.metrics.get("robustness", 0.5),
)
# Get standard deviations if available
if final_status.metrics:
metrics.std_hallucination = final_status.metrics.get("std_hallucination")
metrics.std_toxicity = final_status.metrics.get("std_toxicity")
metrics.std_bias = final_status.metrics.get("std_bias")
metrics.std_confidence = final_status.metrics.get("std_confidence")
return EvaluationResult(
model_name=model_name,
mode=mode,
metrics=metrics,
sample_count=final_status.metrics.get("total_samples", 0),
failure_rate=final_status.metrics.get("failed_samples", 0) / max(final_status.metrics.get("total_samples", 1), 1),
mean_latency_ms=final_status.performance.get("mean_latency_ms"),
total_time_seconds=final_status.performance.get("total_time_seconds"),
)
else:
# Return default result on failure
return EvaluationResult(
model_name=model_name,
mode=mode,
metrics=ModelMetrics(
hallucination=0.5,
toxicity=0.5,
bias=0.5,
confidence=0.5,
robustness=0.5,
),
sample_count=0,
failure_rate=1.0,
)
def _compute_deltas(
self,
baseline: EvaluationResult,
adversarial: EvaluationResult,
) -> MetricDeltas:
"""
Compute deltas between baseline and adversarial.
Args:
baseline: Baseline evaluation result
adversarial: Adversarial evaluation result
Returns:
MetricDeltas with computed differences
"""
return MetricDeltas(
hallucination_delta=adversarial.metrics.hallucination - baseline.metrics.hallucination,
toxicity_delta=adversarial.metrics.toxicity - baseline.metrics.toxicity,
bias_delta=adversarial.metrics.bias - baseline.metrics.bias,
confidence_delta=adversarial.metrics.confidence - baseline.metrics.confidence,
robustness_delta=baseline.metrics.robustness - adversarial.metrics.robustness,
)
async def get_benchmark_status(
self,
benchmark_id: str,
) -> Optional[BenchmarkResult]:
"""
Get status of a benchmark.
Args:
benchmark_id: Benchmark ID
Returns:
Benchmark result if found, None otherwise
"""
# Check if benchmark is active
if benchmark_id in self._active_benchmarks:
task = self._active_benchmarks[benchmark_id]
if not task.done():
# Benchmark is still running
# For now, return a partial result
return BenchmarkResult(
benchmark_id=UUID(benchmark_id),
dataset_name="",
dataset_version="",
models=[],
status=BenchmarkStatus.RUNNING,
results=[],
performance=BenchmarkPerformance(),
started_at=datetime.utcnow(),
)
else:
# Benchmark completed, get result
return await task
# Try to load from artifact
from backend.benchmarking.reporter import load_benchmark_artifact
artifact = load_benchmark_artifact(benchmark_id)
if artifact:
# Reconstruct BenchmarkResult from artifact
# For simplicity, just return None - in production, parse the artifact
pass
return None
async def cancel_benchmark(
self,
benchmark_id: str,
) -> bool:
"""
Cancel a running benchmark.
Args:
benchmark_id: Benchmark ID
Returns:
True if cancelled, False otherwise
"""
if benchmark_id in self._active_benchmarks:
task = self._active_benchmarks[benchmark_id]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self.logger.info("Benchmark cancelled", benchmark_id=benchmark_id)
return True
return False
# =============================================================================
# Global Instance
# =============================================================================
_benchmark_engine: Optional[BenchmarkEngine] = None
def get_benchmark_engine() -> BenchmarkEngine:
"""
Get the global benchmark engine instance.
Returns:
BenchmarkEngine singleton
"""
global _benchmark_engine
if _benchmark_engine is None:
_benchmark_engine = BenchmarkEngine()
return _benchmark_engine
__all__ = [
"BenchmarkEngine",
"BenchmarkEvent",
"get_benchmark_engine",
]