ALM-2 / backend /tests /test_analytics.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Analytics Tests for AegisLM Multi-Run Analysis.
Tests comparison engine, trend analyzer, aggregation utilities,
and error handling for analytics operations.
"""
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime, timedelta
from uuid import uuid4
from analytics.comparison_engine import ComparisonEngine, ComparisonMetric, TrendDirection
from analytics.trend_analyzer import TrendAnalyzer, TrendDirection as TrendDir, TrendStrength
from analytics.aggregation_utils import AggregationUtils, AggregationMethod
from analytics.analytics_service import AnalyticsService
from schemas.experiment_schema import Experiment, ExperimentStatus, ResultSummary, ConfigSnapshot
class TestComparisonEngine:
"""Test comparison engine functionality."""
@pytest.fixture
async def comparison_engine(self):
"""Create comparison engine for testing."""
return ComparisonEngine()
@pytest.fixture
def sample_experiments(self):
"""Create sample experiments for testing."""
experiments = []
for i in range(3):
experiment = Experiment(
run_id=uuid4(),
experiment_name=f"Test Experiment {i+1}",
config_snapshot=ConfigSnapshot(
model_name="test-model",
model_config={},
attack_types=["jailbreak"],
pipeline_config={},
prompt_count=10,
max_iterations=5,
mutation_enabled=False,
weights={}
),
model_name="test-model",
dataset_name="test-dataset",
attack_types=["jailbreak"],
prompt_count=10,
status=ExperimentStatus.COMPLETED,
created_at=datetime.utcnow() - timedelta(hours=i),
completed_at=datetime.utcnow() - timedelta(hours=i-1),
result_summary=ResultSummary(
robustness_score=0.5 + (i * 0.1), # 0.5, 0.6, 0.7
risk_score=0.4 - (i * 0.05), # 0.4, 0.35, 0.3
success_rate=0.3 + (i * 0.15), # 0.3, 0.45, 0.6
total_attacks=10,
successful_attacks=3 + i,
failed_attacks=7 - i,
execution_time_ms=1000 + (i * 100)
)
)
experiments.append(experiment)
return experiments
@pytest.mark.asyncio
async def test_compare_runs_success(self, comparison_engine, sample_experiments):
"""Test successful run comparison."""
run_ids = [exp.run_id.hex for exp in sample_experiments]
with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = sample_experiments
result = await comparison_engine.compare_runs(run_ids)
assert result is not None
assert len(result.rankings) == 3
assert result.best_run is not None
assert result.worst_run is not None
assert result.total_runs == 3
# Check rankings (higher robustness should be better)
best_run = next(r for r in result.rankings if r.is_best)
worst_run = next(r for r in result.rankings if r.is_worst)
assert best_run.rank == 1
assert worst_run.rank == 3
assert best_run.robustness_score > worst_run.robustness_score
@pytest.mark.asyncio
async def test_compare_runs_insufficient_runs(self, comparison_engine):
"""Test comparison with insufficient runs."""
with pytest.raises(ValueError, match="At least 2 runs required"):
await comparison_engine.compare_runs(["run1"])
@pytest.mark.asyncio
async def test_compare_runs_invalid_run_ids(self, comparison_engine):
"""Test comparison with invalid run IDs."""
with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.return_value = None
with pytest.raises(ValueError, match="Insufficient valid experiments"):
await comparison_engine.compare_runs(["invalid1", "invalid2"])
@pytest.mark.asyncio
async def test_metric_deltas_calculation(self, comparison_engine, sample_experiments):
"""Test metric delta calculations."""
run_ids = [exp.run_id.hex for exp in sample_experiments]
with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = sample_experiments
result = await comparison_engine.compare_runs(run_ids)
# Check that deltas are calculated
best_run = next(r for r in result.rankings if r.is_best)
assert len(best_run.deltas_to_best) == 0 # Best run has no deltas to best
assert len(best_run.deltas_to_worst) > 0 # But has deltas to worst
# Check delta structure
for metric_name, delta in best_run.deltas_to_worst.items():
assert delta.metric.value == metric_name
assert isinstance(delta.delta, float)
assert isinstance(delta.delta_percent, float)
assert isinstance(delta.improvement, bool)
class TestTrendAnalyzer:
"""Test trend analyzer functionality."""
@pytest.fixture
async def trend_analyzer(self):
"""Create trend analyzer for testing."""
return TrendAnalyzer()
@pytest.fixture
def sample_trend_experiments(self):
"""Create sample experiments with trend data."""
experiments = []
for i in range(5):
experiment = Experiment(
run_id=uuid4(),
experiment_name=f"Trend Experiment {i+1}",
config_snapshot=ConfigSnapshot(
model_name="trend-model",
model_config={},
attack_types=["jailbreak"],
pipeline_config={},
prompt_count=10,
max_iterations=5,
mutation_enabled=False,
weights={}
),
model_name="trend-model",
dataset_name="trend-dataset",
attack_types=["jailbreak"],
prompt_count=10,
status=ExperimentStatus.COMPLETED,
created_at=datetime.utcnow() - timedelta(days=4-i), # 4,3,2,1,0 days ago
completed_at=datetime.utcnow() - timedelta(days=4-i, hours=-1),
result_summary=ResultSummary(
robustness_score=0.3 + (i * 0.1), # Increasing trend: 0.3, 0.4, 0.5, 0.6, 0.7
risk_score=0.6 - (i * 0.05), # Decreasing trend: 0.6, 0.55, 0.5, 0.45, 0.4
success_rate=0.2 + (i * 0.1), # Increasing trend: 0.2, 0.3, 0.4, 0.5, 0.6
total_attacks=10,
successful_attacks=2 + i,
failed_attacks=8 - i,
execution_time_ms=1000 - (i * 50) # Decreasing trend
)
)
experiments.append(experiment)
return experiments
@pytest.mark.asyncio
async def test_analyze_trend_success(self, trend_analyzer, sample_trend_experiments):
"""Test successful trend analysis."""
run_ids = [exp.run_id.hex for exp in sample_trend_experiments]
with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = sample_trend_experiments
result = await trend_analyzer.analyze_trend(run_ids)
assert result is not None
assert len(result.metric_trends) > 0
assert result.total_runs == 5
assert result.time_period_days >= 4
assert result.overall_direction in [TrendDir.INCREASING, TrendDir.DECREASING, TrendDir.STABLE]
assert 0 <= result.overall_health_score <= 1
@pytest.mark.asyncio
async def test_analyze_trend_insufficient_runs(self, trend_analyzer):
"""Test trend analysis with insufficient runs."""
with pytest.raises(ValueError, match="At least 3 runs required"):
await trend_analyzer.analyze_trend(["run1", "run2"])
@pytest.mark.asyncio
async def test_trend_direction_detection(self, trend_analyzer, sample_trend_experiments):
"""Test trend direction detection."""
run_ids = [exp.run_id.hex for exp in sample_trend_experiments]
with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = sample_trend_experiments
result = await trend_analyzer.analyze_trend(run_ids)
# Check robustness trend (should be increasing)
robustness_trend = result.metric_trends.get('robustness_score')
if robustness_trend:
assert robustness_trend.metrics.direction in [TrendDir.INCREASING, TrendDir.STABLE]
assert robustness_trend.metrics.strength in [TrendStrength.WEAK, TrendStrength.MODERATE, TrendStrength.STRONG]
@pytest.mark.asyncio
async def test_anomaly_detection(self, trend_analyzer):
"""Test anomaly detection in trends."""
# Create experiments with an anomaly
experiments = []
base_value = 0.5
for i in range(5):
value = base_value + (i * 0.1)
# Add anomaly in the middle
if i == 2:
value = 0.9 # Much higher than expected
experiment = Experiment(
run_id=uuid4(),
experiment_name=f"Anomaly Test {i+1}",
config_snapshot=ConfigSnapshot(
model_name="test-model",
model_config={},
attack_types=["jailbreak"],
pipeline_config={},
prompt_count=10,
max_iterations=5,
mutation_enabled=False,
weights={}
),
model_name="test-model",
dataset_name="test-dataset",
attack_types=["jailbreak"],
prompt_count=10,
status=ExperimentStatus.COMPLETED,
created_at=datetime.utcnow() - timedelta(days=4-i),
result_summary=ResultSummary(
robustness_score=value,
risk_score=0.3,
success_rate=0.5,
total_attacks=10,
successful_attacks=5,
failed_attacks=5
)
)
experiments.append(experiment)
run_ids = [exp.run_id.hex for exp in experiments]
with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = experiments
result = await trend_analyzer.analyze_trend(run_ids)
# Check if anomaly was detected
robustness_trend = result.metric_trends.get('robustness_score')
if robustness_trend:
# Should detect at least one anomaly due to the outlier
assert robustness_trend.metrics.stability_score < 0.9 # Lower stability due to anomaly
class TestAggregationUtils:
"""Test aggregation utilities functionality."""
@pytest.fixture
async def aggregation_utils(self):
"""Create aggregation utilities for testing."""
return AggregationUtils()
@pytest.fixture
def sample_aggregation_experiments(self):
"""Create sample experiments for aggregation testing."""
experiments = []
models = ["model-a", "model-b", "model-a"]
datasets = ["dataset-1", "dataset-1", "dataset-2"]
for i in range(3):
experiment = Experiment(
run_id=uuid4(),
experiment_name=f"Aggregation Test {i+1}",
config_snapshot=ConfigSnapshot(
model_name=models[i],
model_config={},
attack_types=["jailbreak"],
pipeline_config={},
prompt_count=10,
max_iterations=5,
mutation_enabled=False,
weights={}
),
model_name=models[i],
dataset_name=datasets[i],
attack_types=["jailbreak"],
prompt_count=10,
status=ExperimentStatus.COMPLETED,
created_at=datetime.utcnow() - timedelta(hours=i),
result_summary=ResultSummary(
robustness_score=0.4 + (i * 0.1),
risk_score=0.5 - (i * 0.05),
success_rate=0.3 + (i * 0.2),
total_attacks=10,
successful_attacks=3 + i,
failed_attacks=7 - i,
execution_time_ms=1000 + (i * 200)
)
)
experiments.append(experiment)
return experiments
@pytest.mark.asyncio
async def test_aggregate_metrics_success(self, aggregation_utils, sample_aggregation_experiments):
"""Test successful metrics aggregation."""
result = await aggregation_utils.aggregate_metrics(sample_aggregation_experiments)
assert result is not None
assert result.total_experiments == 3
assert result.completed_experiments == 3
assert result.success_rate == 1.0
assert result.overall_health_score >= 0
assert result.model_distribution == {"model-a": 2, "model-b": 1}
assert result.dataset_distribution == {"dataset-1": 2, "dataset-2": 1}
@pytest.mark.asyncio
async def test_aggregate_empty_experiments(self, aggregation_utils):
"""Test aggregation with empty experiments list."""
result = await aggregation_utils.aggregate_metrics([])
assert result.total_experiments == 0
assert result.completed_experiments == 0
assert result.success_rate == 0.0
@pytest.mark.asyncio
async def test_aggregate_by_model(self, aggregation_utils, sample_aggregation_experiments):
"""Test aggregation by model."""
result = await aggregation_utils.aggregate_by_model(sample_aggregation_experiments)
assert "model-a" in result
assert "model-b" in result
assert result["model-a"].total_experiments == 2
assert result["model-b"].total_experiments == 1
@pytest.mark.asyncio
async def test_aggregate_by_dataset(self, aggregation_utils, sample_aggregation_experiments):
"""Test aggregation by dataset."""
result = await aggregation_utils.aggregate_by_dataset(sample_aggregation_experiments)
assert "dataset-1" in result
assert "dataset-2" in result
assert result["dataset-1"].total_experiments == 2
assert result["dataset-2"].total_experiments == 1
@pytest.mark.asyncio
async def test_get_top_performers(self, aggregation_utils, sample_aggregation_experiments):
"""Test getting top performers."""
top_performers = await aggregation_utils.get_top_performers(
sample_aggregation_experiments,
"robustness_score",
3
)
assert len(top_performers) == 3
assert top_performers[0][1] >= top_performers[1][1] # Should be sorted descending
assert top_performers[1][1] >= top_performers[2][1]
class TestAnalyticsService:
"""Test analytics service functionality."""
@pytest.fixture
async def analytics_service(self):
"""Create analytics service for testing."""
mock_db = AsyncMock()
return AnalyticsService(mock_db)
@pytest.mark.asyncio
async def test_fetch_runs_by_ids_success(self, analytics_service, sample_experiments):
"""Test successful run fetching by IDs."""
run_ids = [exp.run_id.hex for exp in sample_experiments[:2]]
with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.side_effect = sample_experiments[:2]
result = await analytics_service.fetch_runs_by_ids(run_ids)
assert len(result) == 2
assert all(exp.run_id.hex in run_ids for exp in result)
@pytest.mark.asyncio
async def test_fetch_runs_by_ids_not_found(self, analytics_service):
"""Test fetching non-existent runs."""
with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get:
mock_get.return_value = None
with pytest.raises(Exception): # Should raise HTTPException
await analytics_service.fetch_runs_by_ids(["non-existent"])
@pytest.mark.asyncio
async def test_compare_runs_integration(self, analytics_service, sample_experiments):
"""Test compare runs integration."""
run_ids = [exp.run_id.hex for exp in sample_experiments]
with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch:
mock_fetch.return_value = sample_experiments
with patch.object(analytics_service.comparison_engine, 'compare_runs') as mock_compare:
mock_compare.return_value = MagicMock()
result = await analytics_service.compare_runs(run_ids)
mock_fetch.assert_called_once_with(run_ids, None)
mock_compare.assert_called_once_with(run_ids)
@pytest.mark.asyncio
async def test_analyze_trends_integration(self, analytics_service, sample_experiments):
"""Test trend analysis integration."""
run_ids = [exp.run_id.hex for exp in sample_experiments]
with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch:
mock_fetch.return_value = sample_experiments
with patch.object(analytics_service.trend_analyzer, 'analyze_trend') as mock_analyze:
mock_analyze.return_value = MagicMock()
result = await analytics_service.analyze_trends(run_ids)
mock_fetch.assert_called_once_with(run_ids, None)
mock_analyze.assert_called_once_with(run_ids)
class TestErrorHandling:
"""Test error handling in analytics operations."""
@pytest.mark.asyncio
async def test_comparison_engine_error_handling(self):
"""Test comparison engine error handling."""
engine = ComparisonEngine()
# Test with invalid UUID format
with pytest.raises(ValueError):
await engine.compare_runs(["invalid-uuid"])
# Test with insufficient runs
with pytest.raises(ValueError, match="At least 2 runs required"):
await engine.compare_runs([])
@pytest.mark.asyncio
async def test_trend_analyzer_error_handling(self):
"""Test trend analyzer error handling."""
analyzer = TrendAnalyzer()
# Test with insufficient runs
with pytest.raises(ValueError, match="At least 3 runs required"):
await analyzer.analyze_trend(["run1", "run2"])
# Test with invalid UUID format
with pytest.raises(ValueError):
await analyzer.analyze_trend(["invalid-uuid", "run2", "run3"])
@pytest.mark.asyncio
async def test_aggregation_utils_error_handling(self):
"""Test aggregation utilities error handling."""
utils = AggregationUtils()
# Test with empty values
with pytest.raises(ValueError):
await utils._calculate_metric_statistics("test_metric", [])
def test_metric_validation(self):
"""Test metric validation and edge cases."""
from analytics.comparison_engine import ComparisonMetric
# Test valid metrics
assert ComparisonMetric.ROBUSTNESS_SCORE == "robustness_score"
assert ComparisonMetric.RISK_SCORE == "risk_score"
# Test improvement logic
from analytics.comparison_engine import ComparisonEngine
engine = ComparisonEngine()
# Higher robustness should be improvement
assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, 0.1) is True
assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, -0.1) is False
# Lower risk should be improvement
assert engine._is_improvement(ComparisonMetric.RISK_SCORE, -0.1) is True
assert engine._is_improvement(ComparisonMetric.RISK_SCORE, 0.1) is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])