| """ |
| Analytics Tests for AegisLM Multi-Run Analysis. |
| |
| Tests comparison engine, trend analyzer, aggregation utilities, |
| and error handling for analytics operations. |
| """ |
|
|
| import pytest |
| import asyncio |
| from unittest.mock import AsyncMock, MagicMock, patch |
| from datetime import datetime, timedelta |
| from uuid import uuid4 |
|
|
| from analytics.comparison_engine import ComparisonEngine, ComparisonMetric, TrendDirection |
| from analytics.trend_analyzer import TrendAnalyzer, TrendDirection as TrendDir, TrendStrength |
| from analytics.aggregation_utils import AggregationUtils, AggregationMethod |
| from analytics.analytics_service import AnalyticsService |
| from schemas.experiment_schema import Experiment, ExperimentStatus, ResultSummary, ConfigSnapshot |
|
|
|
|
| class TestComparisonEngine: |
| """Test comparison engine functionality.""" |
| |
| @pytest.fixture |
| async def comparison_engine(self): |
| """Create comparison engine for testing.""" |
| return ComparisonEngine() |
| |
| @pytest.fixture |
| def sample_experiments(self): |
| """Create sample experiments for testing.""" |
| experiments = [] |
| |
| for i in range(3): |
| experiment = Experiment( |
| run_id=uuid4(), |
| experiment_name=f"Test Experiment {i+1}", |
| config_snapshot=ConfigSnapshot( |
| model_name="test-model", |
| model_config={}, |
| attack_types=["jailbreak"], |
| pipeline_config={}, |
| prompt_count=10, |
| max_iterations=5, |
| mutation_enabled=False, |
| weights={} |
| ), |
| model_name="test-model", |
| dataset_name="test-dataset", |
| attack_types=["jailbreak"], |
| prompt_count=10, |
| status=ExperimentStatus.COMPLETED, |
| created_at=datetime.utcnow() - timedelta(hours=i), |
| completed_at=datetime.utcnow() - timedelta(hours=i-1), |
| result_summary=ResultSummary( |
| robustness_score=0.5 + (i * 0.1), |
| risk_score=0.4 - (i * 0.05), |
| success_rate=0.3 + (i * 0.15), |
| total_attacks=10, |
| successful_attacks=3 + i, |
| failed_attacks=7 - i, |
| execution_time_ms=1000 + (i * 100) |
| ) |
| ) |
| experiments.append(experiment) |
| |
| return experiments |
| |
| @pytest.mark.asyncio |
| async def test_compare_runs_success(self, comparison_engine, sample_experiments): |
| """Test successful run comparison.""" |
| run_ids = [exp.run_id.hex for exp in sample_experiments] |
| |
| with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = sample_experiments |
| |
| result = await comparison_engine.compare_runs(run_ids) |
| |
| assert result is not None |
| assert len(result.rankings) == 3 |
| assert result.best_run is not None |
| assert result.worst_run is not None |
| assert result.total_runs == 3 |
| |
| |
| best_run = next(r for r in result.rankings if r.is_best) |
| worst_run = next(r for r in result.rankings if r.is_worst) |
| |
| assert best_run.rank == 1 |
| assert worst_run.rank == 3 |
| assert best_run.robustness_score > worst_run.robustness_score |
| |
| @pytest.mark.asyncio |
| async def test_compare_runs_insufficient_runs(self, comparison_engine): |
| """Test comparison with insufficient runs.""" |
| with pytest.raises(ValueError, match="At least 2 runs required"): |
| await comparison_engine.compare_runs(["run1"]) |
| |
| @pytest.mark.asyncio |
| async def test_compare_runs_invalid_run_ids(self, comparison_engine): |
| """Test comparison with invalid run IDs.""" |
| with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.return_value = None |
| |
| with pytest.raises(ValueError, match="Insufficient valid experiments"): |
| await comparison_engine.compare_runs(["invalid1", "invalid2"]) |
| |
| @pytest.mark.asyncio |
| async def test_metric_deltas_calculation(self, comparison_engine, sample_experiments): |
| """Test metric delta calculations.""" |
| run_ids = [exp.run_id.hex for exp in sample_experiments] |
| |
| with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = sample_experiments |
| |
| result = await comparison_engine.compare_runs(run_ids) |
| |
| |
| best_run = next(r for r in result.rankings if r.is_best) |
| assert len(best_run.deltas_to_best) == 0 |
| assert len(best_run.deltas_to_worst) > 0 |
| |
| |
| for metric_name, delta in best_run.deltas_to_worst.items(): |
| assert delta.metric.value == metric_name |
| assert isinstance(delta.delta, float) |
| assert isinstance(delta.delta_percent, float) |
| assert isinstance(delta.improvement, bool) |
|
|
|
|
| class TestTrendAnalyzer: |
| """Test trend analyzer functionality.""" |
| |
| @pytest.fixture |
| async def trend_analyzer(self): |
| """Create trend analyzer for testing.""" |
| return TrendAnalyzer() |
| |
| @pytest.fixture |
| def sample_trend_experiments(self): |
| """Create sample experiments with trend data.""" |
| experiments = [] |
| |
| for i in range(5): |
| experiment = Experiment( |
| run_id=uuid4(), |
| experiment_name=f"Trend Experiment {i+1}", |
| config_snapshot=ConfigSnapshot( |
| model_name="trend-model", |
| model_config={}, |
| attack_types=["jailbreak"], |
| pipeline_config={}, |
| prompt_count=10, |
| max_iterations=5, |
| mutation_enabled=False, |
| weights={} |
| ), |
| model_name="trend-model", |
| dataset_name="trend-dataset", |
| attack_types=["jailbreak"], |
| prompt_count=10, |
| status=ExperimentStatus.COMPLETED, |
| created_at=datetime.utcnow() - timedelta(days=4-i), |
| completed_at=datetime.utcnow() - timedelta(days=4-i, hours=-1), |
| result_summary=ResultSummary( |
| robustness_score=0.3 + (i * 0.1), |
| risk_score=0.6 - (i * 0.05), |
| success_rate=0.2 + (i * 0.1), |
| total_attacks=10, |
| successful_attacks=2 + i, |
| failed_attacks=8 - i, |
| execution_time_ms=1000 - (i * 50) |
| ) |
| ) |
| experiments.append(experiment) |
| |
| return experiments |
| |
| @pytest.mark.asyncio |
| async def test_analyze_trend_success(self, trend_analyzer, sample_trend_experiments): |
| """Test successful trend analysis.""" |
| run_ids = [exp.run_id.hex for exp in sample_trend_experiments] |
| |
| with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = sample_trend_experiments |
| |
| result = await trend_analyzer.analyze_trend(run_ids) |
| |
| assert result is not None |
| assert len(result.metric_trends) > 0 |
| assert result.total_runs == 5 |
| assert result.time_period_days >= 4 |
| assert result.overall_direction in [TrendDir.INCREASING, TrendDir.DECREASING, TrendDir.STABLE] |
| assert 0 <= result.overall_health_score <= 1 |
| |
| @pytest.mark.asyncio |
| async def test_analyze_trend_insufficient_runs(self, trend_analyzer): |
| """Test trend analysis with insufficient runs.""" |
| with pytest.raises(ValueError, match="At least 3 runs required"): |
| await trend_analyzer.analyze_trend(["run1", "run2"]) |
| |
| @pytest.mark.asyncio |
| async def test_trend_direction_detection(self, trend_analyzer, sample_trend_experiments): |
| """Test trend direction detection.""" |
| run_ids = [exp.run_id.hex for exp in sample_trend_experiments] |
| |
| with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = sample_trend_experiments |
| |
| result = await trend_analyzer.analyze_trend(run_ids) |
| |
| |
| robustness_trend = result.metric_trends.get('robustness_score') |
| if robustness_trend: |
| assert robustness_trend.metrics.direction in [TrendDir.INCREASING, TrendDir.STABLE] |
| assert robustness_trend.metrics.strength in [TrendStrength.WEAK, TrendStrength.MODERATE, TrendStrength.STRONG] |
| |
| @pytest.mark.asyncio |
| async def test_anomaly_detection(self, trend_analyzer): |
| """Test anomaly detection in trends.""" |
| |
| experiments = [] |
| base_value = 0.5 |
| |
| for i in range(5): |
| value = base_value + (i * 0.1) |
| |
| if i == 2: |
| value = 0.9 |
| |
| experiment = Experiment( |
| run_id=uuid4(), |
| experiment_name=f"Anomaly Test {i+1}", |
| config_snapshot=ConfigSnapshot( |
| model_name="test-model", |
| model_config={}, |
| attack_types=["jailbreak"], |
| pipeline_config={}, |
| prompt_count=10, |
| max_iterations=5, |
| mutation_enabled=False, |
| weights={} |
| ), |
| model_name="test-model", |
| dataset_name="test-dataset", |
| attack_types=["jailbreak"], |
| prompt_count=10, |
| status=ExperimentStatus.COMPLETED, |
| created_at=datetime.utcnow() - timedelta(days=4-i), |
| result_summary=ResultSummary( |
| robustness_score=value, |
| risk_score=0.3, |
| success_rate=0.5, |
| total_attacks=10, |
| successful_attacks=5, |
| failed_attacks=5 |
| ) |
| ) |
| experiments.append(experiment) |
| |
| run_ids = [exp.run_id.hex for exp in experiments] |
| |
| with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = experiments |
| |
| result = await trend_analyzer.analyze_trend(run_ids) |
| |
| |
| robustness_trend = result.metric_trends.get('robustness_score') |
| if robustness_trend: |
| |
| assert robustness_trend.metrics.stability_score < 0.9 |
|
|
|
|
| class TestAggregationUtils: |
| """Test aggregation utilities functionality.""" |
| |
| @pytest.fixture |
| async def aggregation_utils(self): |
| """Create aggregation utilities for testing.""" |
| return AggregationUtils() |
| |
| @pytest.fixture |
| def sample_aggregation_experiments(self): |
| """Create sample experiments for aggregation testing.""" |
| experiments = [] |
| |
| models = ["model-a", "model-b", "model-a"] |
| datasets = ["dataset-1", "dataset-1", "dataset-2"] |
| |
| for i in range(3): |
| experiment = Experiment( |
| run_id=uuid4(), |
| experiment_name=f"Aggregation Test {i+1}", |
| config_snapshot=ConfigSnapshot( |
| model_name=models[i], |
| model_config={}, |
| attack_types=["jailbreak"], |
| pipeline_config={}, |
| prompt_count=10, |
| max_iterations=5, |
| mutation_enabled=False, |
| weights={} |
| ), |
| model_name=models[i], |
| dataset_name=datasets[i], |
| attack_types=["jailbreak"], |
| prompt_count=10, |
| status=ExperimentStatus.COMPLETED, |
| created_at=datetime.utcnow() - timedelta(hours=i), |
| result_summary=ResultSummary( |
| robustness_score=0.4 + (i * 0.1), |
| risk_score=0.5 - (i * 0.05), |
| success_rate=0.3 + (i * 0.2), |
| total_attacks=10, |
| successful_attacks=3 + i, |
| failed_attacks=7 - i, |
| execution_time_ms=1000 + (i * 200) |
| ) |
| ) |
| experiments.append(experiment) |
| |
| return experiments |
| |
| @pytest.mark.asyncio |
| async def test_aggregate_metrics_success(self, aggregation_utils, sample_aggregation_experiments): |
| """Test successful metrics aggregation.""" |
| result = await aggregation_utils.aggregate_metrics(sample_aggregation_experiments) |
| |
| assert result is not None |
| assert result.total_experiments == 3 |
| assert result.completed_experiments == 3 |
| assert result.success_rate == 1.0 |
| assert result.overall_health_score >= 0 |
| assert result.model_distribution == {"model-a": 2, "model-b": 1} |
| assert result.dataset_distribution == {"dataset-1": 2, "dataset-2": 1} |
| |
| @pytest.mark.asyncio |
| async def test_aggregate_empty_experiments(self, aggregation_utils): |
| """Test aggregation with empty experiments list.""" |
| result = await aggregation_utils.aggregate_metrics([]) |
| |
| assert result.total_experiments == 0 |
| assert result.completed_experiments == 0 |
| assert result.success_rate == 0.0 |
| |
| @pytest.mark.asyncio |
| async def test_aggregate_by_model(self, aggregation_utils, sample_aggregation_experiments): |
| """Test aggregation by model.""" |
| result = await aggregation_utils.aggregate_by_model(sample_aggregation_experiments) |
| |
| assert "model-a" in result |
| assert "model-b" in result |
| assert result["model-a"].total_experiments == 2 |
| assert result["model-b"].total_experiments == 1 |
| |
| @pytest.mark.asyncio |
| async def test_aggregate_by_dataset(self, aggregation_utils, sample_aggregation_experiments): |
| """Test aggregation by dataset.""" |
| result = await aggregation_utils.aggregate_by_dataset(sample_aggregation_experiments) |
| |
| assert "dataset-1" in result |
| assert "dataset-2" in result |
| assert result["dataset-1"].total_experiments == 2 |
| assert result["dataset-2"].total_experiments == 1 |
| |
| @pytest.mark.asyncio |
| async def test_get_top_performers(self, aggregation_utils, sample_aggregation_experiments): |
| """Test getting top performers.""" |
| top_performers = await aggregation_utils.get_top_performers( |
| sample_aggregation_experiments, |
| "robustness_score", |
| 3 |
| ) |
| |
| assert len(top_performers) == 3 |
| assert top_performers[0][1] >= top_performers[1][1] |
| assert top_performers[1][1] >= top_performers[2][1] |
|
|
|
|
| class TestAnalyticsService: |
| """Test analytics service functionality.""" |
| |
| @pytest.fixture |
| async def analytics_service(self): |
| """Create analytics service for testing.""" |
| mock_db = AsyncMock() |
| return AnalyticsService(mock_db) |
| |
| @pytest.mark.asyncio |
| async def test_fetch_runs_by_ids_success(self, analytics_service, sample_experiments): |
| """Test successful run fetching by IDs.""" |
| run_ids = [exp.run_id.hex for exp in sample_experiments[:2]] |
| |
| with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.side_effect = sample_experiments[:2] |
| |
| result = await analytics_service.fetch_runs_by_ids(run_ids) |
| |
| assert len(result) == 2 |
| assert all(exp.run_id.hex in run_ids for exp in result) |
| |
| @pytest.mark.asyncio |
| async def test_fetch_runs_by_ids_not_found(self, analytics_service): |
| """Test fetching non-existent runs.""" |
| with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get: |
| mock_get.return_value = None |
| |
| with pytest.raises(Exception): |
| await analytics_service.fetch_runs_by_ids(["non-existent"]) |
| |
| @pytest.mark.asyncio |
| async def test_compare_runs_integration(self, analytics_service, sample_experiments): |
| """Test compare runs integration.""" |
| run_ids = [exp.run_id.hex for exp in sample_experiments] |
| |
| with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch: |
| mock_fetch.return_value = sample_experiments |
| |
| with patch.object(analytics_service.comparison_engine, 'compare_runs') as mock_compare: |
| mock_compare.return_value = MagicMock() |
| |
| result = await analytics_service.compare_runs(run_ids) |
| |
| mock_fetch.assert_called_once_with(run_ids, None) |
| mock_compare.assert_called_once_with(run_ids) |
| |
| @pytest.mark.asyncio |
| async def test_analyze_trends_integration(self, analytics_service, sample_experiments): |
| """Test trend analysis integration.""" |
| run_ids = [exp.run_id.hex for exp in sample_experiments] |
| |
| with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch: |
| mock_fetch.return_value = sample_experiments |
| |
| with patch.object(analytics_service.trend_analyzer, 'analyze_trend') as mock_analyze: |
| mock_analyze.return_value = MagicMock() |
| |
| result = await analytics_service.analyze_trends(run_ids) |
| |
| mock_fetch.assert_called_once_with(run_ids, None) |
| mock_analyze.assert_called_once_with(run_ids) |
|
|
|
|
| class TestErrorHandling: |
| """Test error handling in analytics operations.""" |
| |
| @pytest.mark.asyncio |
| async def test_comparison_engine_error_handling(self): |
| """Test comparison engine error handling.""" |
| engine = ComparisonEngine() |
| |
| |
| with pytest.raises(ValueError): |
| await engine.compare_runs(["invalid-uuid"]) |
| |
| |
| with pytest.raises(ValueError, match="At least 2 runs required"): |
| await engine.compare_runs([]) |
| |
| @pytest.mark.asyncio |
| async def test_trend_analyzer_error_handling(self): |
| """Test trend analyzer error handling.""" |
| analyzer = TrendAnalyzer() |
| |
| |
| with pytest.raises(ValueError, match="At least 3 runs required"): |
| await analyzer.analyze_trend(["run1", "run2"]) |
| |
| |
| with pytest.raises(ValueError): |
| await analyzer.analyze_trend(["invalid-uuid", "run2", "run3"]) |
| |
| @pytest.mark.asyncio |
| async def test_aggregation_utils_error_handling(self): |
| """Test aggregation utilities error handling.""" |
| utils = AggregationUtils() |
| |
| |
| with pytest.raises(ValueError): |
| await utils._calculate_metric_statistics("test_metric", []) |
| |
| def test_metric_validation(self): |
| """Test metric validation and edge cases.""" |
| from analytics.comparison_engine import ComparisonMetric |
| |
| |
| assert ComparisonMetric.ROBUSTNESS_SCORE == "robustness_score" |
| assert ComparisonMetric.RISK_SCORE == "risk_score" |
| |
| |
| from analytics.comparison_engine import ComparisonEngine |
| engine = ComparisonEngine() |
| |
| |
| assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, 0.1) is True |
| assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, -0.1) is False |
| |
| |
| assert engine._is_improvement(ComparisonMetric.RISK_SCORE, -0.1) is True |
| assert engine._is_improvement(ComparisonMetric.RISK_SCORE, 0.1) is False |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v"]) |
|
|