| """
|
| Benchmarking Engine
|
|
|
| Main orchestration for the AegisLM Benchmarking Engine:
|
| - Baseline evaluation mode
|
| - Adversarial evaluation mode
|
| - Delta robustness computation
|
| - Cross-model comparison
|
| - Statistical reporting
|
| - Benchmark artifact generation
|
| """
|
|
|
| import asyncio
|
| import time
|
| import uuid
|
| from datetime import datetime
|
| from enum import Enum
|
| from typing import Any, Dict, List, Optional
|
| from uuid import UUID
|
|
|
| from backend.benchmarking.comparison import (
|
| generate_comparative_report,
|
| generate_vulnerability_heatmap,
|
| rank_models,
|
| )
|
| from backend.benchmarking.reporter import (
|
| generate_benchmark_artifact,
|
| generate_text_report,
|
| )
|
| from backend.benchmarking.schemas import (
|
| BenchmarkConfig,
|
| BenchmarkMode,
|
| BenchmarkPerformance,
|
| BenchmarkResult,
|
| BenchmarkStatus,
|
| BenchmarkWeights,
|
| EvaluationResult,
|
| MetricDeltas,
|
| ModelBenchmarkResult,
|
| ModelMetrics,
|
| StartBenchmarkRequest,
|
| )
|
| from backend.benchmarking.statistics import (
|
| MetricStatistics,
|
| calculate_vulnerability_consistency,
|
| )
|
| from backend.core.config import settings
|
| from backend.core.orchestrator import (
|
| EvaluationInput,
|
| EvaluationOrchestrator,
|
| RunStatus,
|
| )
|
| from backend.logging.logger import get_logger
|
|
|
|
|
|
|
|
|
|
|
|
|
| class BenchmarkEvent(str, Enum):
|
| """Observability events for benchmarking."""
|
| BENCHMARK_STARTED = "BENCHMARK_STARTED"
|
| BENCHMARK_COMPLETED = "BENCHMARK_COMPLETED"
|
| BENCHMARK_FAILED = "BENCHMARK_FAILED"
|
| MODEL_EVALUATION_STARTED = "MODEL_EVALUATION_STARTED"
|
| MODEL_EVALUATION_COMPLETED = "MODEL_EVALUATION_COMPLETED"
|
| BASELINE_COMPLETED = "BASELINE_COMPLETED"
|
| ADVERSARIAL_COMPLETED = "ADVERSARIAL_COMPLETED"
|
| DELTA_COMPUTED = "DELTA_COMPUTED"
|
|
|
|
|
|
|
|
|
|
|
|
|
| class BenchmarkEngine:
|
| """
|
| Main benchmarking engine for AegisLM.
|
|
|
| Coordinates:
|
| - Baseline evaluation (no attacks)
|
| - Adversarial evaluation (full pipeline)
|
| - Delta robustness computation
|
| - Cross-model comparison
|
| - Artifact generation
|
| """
|
|
|
| def __init__(self):
|
| self.logger = get_logger(__name__)
|
| self._orchestrator = EvaluationOrchestrator()
|
| self._active_benchmarks: Dict[str, asyncio.Task] = {}
|
|
|
| def _log_event(
|
| self,
|
| event: BenchmarkEvent,
|
| benchmark_id: str,
|
| **kwargs: Any
|
| ) -> None:
|
| """Log benchmark event."""
|
| log_data = {
|
| "event": event.value,
|
| "benchmark_id": benchmark_id,
|
| "timestamp": datetime.utcnow().isoformat(),
|
| }
|
| log_data.update(kwargs)
|
|
|
| if event in [BenchmarkEvent.BENCHMARK_STARTED, BenchmarkEvent.BENCHMARK_COMPLETED]:
|
| self.logger.info("Benchmark event", **log_data)
|
| elif event in [BenchmarkEvent.BENCHMARK_FAILED]:
|
| self.logger.error("Benchmark event", **log_data)
|
| else:
|
| self.logger.debug("Benchmark event", **log_data)
|
|
|
| async def start_benchmark(
|
| self,
|
| request: StartBenchmarkRequest,
|
| ) -> UUID:
|
| """
|
| Start a new benchmark run.
|
|
|
| Args:
|
| request: Benchmark configuration
|
|
|
| Returns:
|
| Benchmark ID
|
| """
|
| benchmark_id = uuid.uuid4()
|
| benchmark_id_str = str(benchmark_id)
|
|
|
| self.logger.info(
|
| "Starting benchmark",
|
| benchmark_id=benchmark_id_str,
|
| models=request.models,
|
| dataset=request.dataset_name,
|
| )
|
|
|
|
|
| weights = request.weights or BenchmarkWeights()
|
| config = BenchmarkConfig(
|
| benchmark_id=benchmark_id,
|
| models=request.models,
|
| dataset_name=request.dataset_name,
|
| dataset_version=request.dataset_version,
|
| attack_enabled=request.attack_enabled,
|
| mutation_depth=request.mutation_depth,
|
| weights=weights,
|
| max_concurrency=request.max_concurrency,
|
| max_samples=request.max_samples,
|
| enable_baseline=request.enable_baseline,
|
| enable_adversarial=request.enable_adversarial,
|
| attack_types=request.attack_types or ["jailbreak"],
|
| )
|
|
|
|
|
| config.validate_config()
|
|
|
|
|
| task = asyncio.create_task(
|
| self._execute_benchmark(config)
|
| )
|
| self._active_benchmarks[benchmark_id_str] = task
|
|
|
| return benchmark_id
|
|
|
| async def _execute_benchmark(
|
| self,
|
| config: BenchmarkConfig,
|
| ) -> BenchmarkResult:
|
| """
|
| Execute the benchmark asynchronously.
|
|
|
| Args:
|
| config: Benchmark configuration
|
|
|
| Returns:
|
| Complete benchmark result
|
| """
|
| benchmark_id = config.benchmark_id
|
| benchmark_id_str = str(benchmark_id)
|
| start_time = datetime.utcnow()
|
|
|
|
|
| result = BenchmarkResult(
|
| benchmark_id=benchmark_id,
|
| dataset_name=config.dataset_name,
|
| dataset_version=config.dataset_version,
|
| models=config.models,
|
| status=BenchmarkStatus.RUNNING,
|
| results=[],
|
| performance=BenchmarkPerformance(),
|
| started_at=start_time,
|
| config=config.model_dump(),
|
| )
|
|
|
|
|
| self._log_event(
|
| BenchmarkEvent.BENCHMARK_STARTED,
|
| benchmark_id_str,
|
| models=config.models,
|
| dataset=config.dataset_name,
|
| )
|
|
|
| try:
|
|
|
| for model_name in config.models:
|
| self._log_event(
|
| BenchmarkEvent.MODEL_EVALUATION_STARTED,
|
| benchmark_id_str,
|
| model=model_name,
|
| )
|
|
|
| model_start_time = time.time()
|
|
|
|
|
| model_result = await self._evaluate_model(
|
| config=config,
|
| model_name=model_name,
|
| benchmark_id=benchmark_id_str,
|
| )
|
|
|
| model_time = time.time() - model_start_time
|
|
|
|
|
| result.performance.time_per_model_seconds[model_name] = model_time
|
| result.performance.sample_counts[model_name] = (
|
| model_result.adversarial.sample_count if model_result.adversarial else 0
|
| )
|
| result.performance.failure_rates[model_name] = (
|
| model_result.adversarial.failure_rate if model_result.adversarial else 1.0
|
| )
|
|
|
| result.results.append(model_result)
|
|
|
| self._log_event(
|
| BenchmarkEvent.MODEL_EVALUATION_COMPLETED,
|
| benchmark_id_str,
|
| model=model_name,
|
| time_seconds=model_time,
|
| )
|
|
|
|
|
| if len(config.models) > 1:
|
| result.rankings = rank_models(result.results)
|
|
|
|
|
| result.vulnerability_heatmap = generate_vulnerability_heatmap(
|
| result.results,
|
| config.attack_types,
|
| )
|
|
|
|
|
| result.status = BenchmarkStatus.COMPLETED
|
| result.completed_at = datetime.utcnow()
|
|
|
|
|
| artifact_path = generate_benchmark_artifact(result)
|
| self.logger.info(
|
| "Benchmark artifact saved",
|
| benchmark_id=benchmark_id_str,
|
| path=artifact_path,
|
| )
|
|
|
|
|
| self._log_event(
|
| BenchmarkEvent.BENCHMARK_COMPLETED,
|
| benchmark_id_str,
|
| models=config.models,
|
| completed_at=result.completed_at.isoformat(),
|
| )
|
|
|
| except Exception as e:
|
| result.status = BenchmarkStatus.FAILED
|
| result.error = str(e)
|
| result.completed_at = datetime.utcnow()
|
|
|
| self.logger.error(
|
| "Benchmark failed",
|
| benchmark_id=benchmark_id_str,
|
| error=str(e),
|
| )
|
|
|
| self._log_event(
|
| BenchmarkEvent.BENCHMARK_FAILED,
|
| benchmark_id_str,
|
| error=str(e),
|
| )
|
|
|
| finally:
|
|
|
| self._active_benchmarks.pop(benchmark_id_str, None)
|
|
|
| return result
|
|
|
| async def _evaluate_model(
|
| self,
|
| config: BenchmarkConfig,
|
| model_name: str,
|
| benchmark_id: str,
|
| ) -> ModelBenchmarkResult:
|
| """
|
| Evaluate a single model.
|
|
|
| Args:
|
| config: Benchmark configuration
|
| model_name: Name of the model to evaluate
|
| benchmark_id: Benchmark ID for logging
|
|
|
| Returns:
|
| Complete benchmark result for the model
|
| """
|
| model_result = ModelBenchmarkResult(model_name=model_name)
|
|
|
|
|
| sampling_config = None
|
| if config.max_samples:
|
| sampling_config = {
|
| "method": "random",
|
| "sample_size": config.max_samples,
|
| }
|
|
|
|
|
| if config.enable_baseline:
|
| baseline_result = await self._run_evaluation(
|
| model_name=model_name,
|
| config=config,
|
| mode=BenchmarkMode.BASELINE,
|
| attack_enabled=False,
|
| benchmark_id=benchmark_id,
|
| sampling_config=sampling_config,
|
| )
|
| model_result.baseline = baseline_result
|
| model_result.baseline_robustness = baseline_result.metrics.robustness
|
|
|
| self._log_event(
|
| BenchmarkEvent.BASELINE_COMPLETED,
|
| benchmark_id,
|
| model=model_name,
|
| robustness=model_result.baseline_robustness,
|
| )
|
|
|
|
|
| if config.enable_adversarial:
|
| adversarial_result = await self._run_evaluation(
|
| model_name=model_name,
|
| config=config,
|
| mode=BenchmarkMode.ADVERSARIAL,
|
| attack_enabled=config.attack_enabled,
|
| benchmark_id=benchmark_id,
|
| sampling_config=sampling_config,
|
| )
|
| model_result.adversarial = adversarial_result
|
| model_result.adversarial_robustness = adversarial_result.metrics.robustness
|
|
|
| self._log_event(
|
| BenchmarkEvent.ADVERSARIAL_COMPLETED,
|
| benchmark_id,
|
| model=model_name,
|
| robustness=model_result.adversarial_robustness,
|
| )
|
|
|
|
|
| if model_result.baseline and model_result.adversarial:
|
| model_result.deltas = self._compute_deltas(
|
| baseline=model_result.baseline,
|
| adversarial=model_result.adversarial,
|
| )
|
|
|
|
|
|
|
| model_result.delta_robustness = (
|
| model_result.baseline_robustness - model_result.adversarial_robustness
|
| )
|
|
|
|
|
|
|
| if model_result.baseline_robustness and model_result.baseline_robustness > 0:
|
| model_result.robustness_stability_index = (
|
| model_result.adversarial_robustness / model_result.baseline_robustness
|
| )
|
| else:
|
| model_result.robustness_stability_index = 0.0
|
|
|
|
|
|
|
| if model_result.baseline_robustness and model_result.baseline_robustness > 0:
|
| model_result.vulnerability_index = (
|
| model_result.delta_robustness / model_result.baseline_robustness
|
| )
|
| else:
|
| model_result.vulnerability_index = 0.0
|
|
|
| self._log_event(
|
| BenchmarkEvent.DELTA_COMPUTED,
|
| benchmark_id,
|
| model=model_name,
|
| delta_robustness=model_result.delta_robustness,
|
| rsi=model_result.robustness_stability_index,
|
| vi=model_result.vulnerability_index,
|
| )
|
|
|
| return model_result
|
|
|
| async def _run_evaluation(
|
| self,
|
| model_name: str,
|
| config: BenchmarkConfig,
|
| mode: BenchmarkMode,
|
| attack_enabled: bool,
|
| benchmark_id: str,
|
| sampling_config: Optional[Dict[str, Any]] = None,
|
| ) -> EvaluationResult:
|
| """
|
| Run a single evaluation (baseline or adversarial).
|
|
|
| Args:
|
| model_name: Model to evaluate
|
| config: Benchmark config
|
| mode: Evaluation mode
|
| attack_enabled: Whether to enable attacks
|
| benchmark_id: Benchmark ID
|
| sampling_config: Optional sampling config
|
|
|
| Returns:
|
| Evaluation result
|
| """
|
|
|
| eval_input = EvaluationInput(
|
| model_name=model_name,
|
| dataset_name=config.dataset_name,
|
| dataset_version=config.dataset_version,
|
| weights={
|
| "hallucination": config.weights.hallucination,
|
| "toxicity": config.weights.toxicity,
|
| "bias": config.weights.bias,
|
| "confidence": config.weights.confidence,
|
| },
|
| mutation_depth=config.mutation_depth if attack_enabled else 0,
|
| attack_types=config.attack_types if attack_enabled else [],
|
| max_concurrency=config.max_concurrency,
|
| sampling_config=sampling_config,
|
| )
|
|
|
|
|
| output = await self._orchestrator.start_run(eval_input)
|
|
|
|
|
| run_id = output.run_id
|
|
|
|
|
| max_wait = 300
|
| waited = 0
|
| poll_interval = 1
|
|
|
| while waited < max_wait:
|
| status = await self._orchestrator.get_run_status(run_id)
|
|
|
| if status and status.status in [RunStatus.COMPLETED, RunStatus.FAILED]:
|
| break
|
|
|
| await asyncio.sleep(poll_interval)
|
| waited += poll_interval
|
|
|
|
|
| final_status = await self._orchestrator.get_run_status(run_id)
|
|
|
| if final_status and final_status.status == RunStatus.COMPLETED:
|
|
|
| metrics = ModelMetrics(
|
| hallucination=final_status.metrics.get("hallucination", 0.5),
|
| toxicity=final_status.metrics.get("toxicity", 0.5),
|
| bias=final_status.metrics.get("bias", 0.5),
|
| confidence=final_status.metrics.get("confidence", 0.5),
|
| robustness=final_status.metrics.get("robustness", 0.5),
|
| )
|
|
|
|
|
| if final_status.metrics:
|
| metrics.std_hallucination = final_status.metrics.get("std_hallucination")
|
| metrics.std_toxicity = final_status.metrics.get("std_toxicity")
|
| metrics.std_bias = final_status.metrics.get("std_bias")
|
| metrics.std_confidence = final_status.metrics.get("std_confidence")
|
|
|
| return EvaluationResult(
|
| model_name=model_name,
|
| mode=mode,
|
| metrics=metrics,
|
| sample_count=final_status.metrics.get("total_samples", 0),
|
| failure_rate=final_status.metrics.get("failed_samples", 0) / max(final_status.metrics.get("total_samples", 1), 1),
|
| mean_latency_ms=final_status.performance.get("mean_latency_ms"),
|
| total_time_seconds=final_status.performance.get("total_time_seconds"),
|
| )
|
| else:
|
|
|
| return EvaluationResult(
|
| model_name=model_name,
|
| mode=mode,
|
| metrics=ModelMetrics(
|
| hallucination=0.5,
|
| toxicity=0.5,
|
| bias=0.5,
|
| confidence=0.5,
|
| robustness=0.5,
|
| ),
|
| sample_count=0,
|
| failure_rate=1.0,
|
| )
|
|
|
| def _compute_deltas(
|
| self,
|
| baseline: EvaluationResult,
|
| adversarial: EvaluationResult,
|
| ) -> MetricDeltas:
|
| """
|
| Compute deltas between baseline and adversarial.
|
|
|
| Args:
|
| baseline: Baseline evaluation result
|
| adversarial: Adversarial evaluation result
|
|
|
| Returns:
|
| MetricDeltas with computed differences
|
| """
|
| return MetricDeltas(
|
| hallucination_delta=adversarial.metrics.hallucination - baseline.metrics.hallucination,
|
| toxicity_delta=adversarial.metrics.toxicity - baseline.metrics.toxicity,
|
| bias_delta=adversarial.metrics.bias - baseline.metrics.bias,
|
| confidence_delta=adversarial.metrics.confidence - baseline.metrics.confidence,
|
| robustness_delta=baseline.metrics.robustness - adversarial.metrics.robustness,
|
| )
|
|
|
| async def get_benchmark_status(
|
| self,
|
| benchmark_id: str,
|
| ) -> Optional[BenchmarkResult]:
|
| """
|
| Get status of a benchmark.
|
|
|
| Args:
|
| benchmark_id: Benchmark ID
|
|
|
| Returns:
|
| Benchmark result if found, None otherwise
|
| """
|
|
|
| if benchmark_id in self._active_benchmarks:
|
| task = self._active_benchmarks[benchmark_id]
|
|
|
| if not task.done():
|
|
|
|
|
| return BenchmarkResult(
|
| benchmark_id=UUID(benchmark_id),
|
| dataset_name="",
|
| dataset_version="",
|
| models=[],
|
| status=BenchmarkStatus.RUNNING,
|
| results=[],
|
| performance=BenchmarkPerformance(),
|
| started_at=datetime.utcnow(),
|
| )
|
| else:
|
|
|
| return await task
|
|
|
|
|
| from backend.benchmarking.reporter import load_benchmark_artifact
|
|
|
| artifact = load_benchmark_artifact(benchmark_id)
|
|
|
| if artifact:
|
|
|
|
|
| pass
|
|
|
| return None
|
|
|
| async def cancel_benchmark(
|
| self,
|
| benchmark_id: str,
|
| ) -> bool:
|
| """
|
| Cancel a running benchmark.
|
|
|
| Args:
|
| benchmark_id: Benchmark ID
|
|
|
| Returns:
|
| True if cancelled, False otherwise
|
| """
|
| if benchmark_id in self._active_benchmarks:
|
| task = self._active_benchmarks[benchmark_id]
|
| task.cancel()
|
|
|
| try:
|
| await task
|
| except asyncio.CancelledError:
|
| pass
|
|
|
| self.logger.info("Benchmark cancelled", benchmark_id=benchmark_id)
|
| return True
|
|
|
| return False
|
|
|
|
|
|
|
|
|
|
|
|
|
| _benchmark_engine: Optional[BenchmarkEngine] = None
|
|
|
|
|
| def get_benchmark_engine() -> BenchmarkEngine:
|
| """
|
| Get the global benchmark engine instance.
|
|
|
| Returns:
|
| BenchmarkEngine singleton
|
| """
|
| global _benchmark_engine
|
| if _benchmark_engine is None:
|
| _benchmark_engine = BenchmarkEngine()
|
| return _benchmark_engine
|
|
|
|
|
| __all__ = [
|
| "BenchmarkEngine",
|
| "BenchmarkEvent",
|
| "get_benchmark_engine",
|
| ]
|
|
|