""" Analytics Tests for AegisLM Multi-Run Analysis. Tests comparison engine, trend analyzer, aggregation utilities, and error handling for analytics operations. """ import pytest import asyncio from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timedelta from uuid import uuid4 from analytics.comparison_engine import ComparisonEngine, ComparisonMetric, TrendDirection from analytics.trend_analyzer import TrendAnalyzer, TrendDirection as TrendDir, TrendStrength from analytics.aggregation_utils import AggregationUtils, AggregationMethod from analytics.analytics_service import AnalyticsService from schemas.experiment_schema import Experiment, ExperimentStatus, ResultSummary, ConfigSnapshot class TestComparisonEngine: """Test comparison engine functionality.""" @pytest.fixture async def comparison_engine(self): """Create comparison engine for testing.""" return ComparisonEngine() @pytest.fixture def sample_experiments(self): """Create sample experiments for testing.""" experiments = [] for i in range(3): experiment = Experiment( run_id=uuid4(), experiment_name=f"Test Experiment {i+1}", config_snapshot=ConfigSnapshot( model_name="test-model", model_config={}, attack_types=["jailbreak"], pipeline_config={}, prompt_count=10, max_iterations=5, mutation_enabled=False, weights={} ), model_name="test-model", dataset_name="test-dataset", attack_types=["jailbreak"], prompt_count=10, status=ExperimentStatus.COMPLETED, created_at=datetime.utcnow() - timedelta(hours=i), completed_at=datetime.utcnow() - timedelta(hours=i-1), result_summary=ResultSummary( robustness_score=0.5 + (i * 0.1), # 0.5, 0.6, 0.7 risk_score=0.4 - (i * 0.05), # 0.4, 0.35, 0.3 success_rate=0.3 + (i * 0.15), # 0.3, 0.45, 0.6 total_attacks=10, successful_attacks=3 + i, failed_attacks=7 - i, execution_time_ms=1000 + (i * 100) ) ) experiments.append(experiment) return experiments @pytest.mark.asyncio async def test_compare_runs_success(self, comparison_engine, sample_experiments): """Test successful run comparison.""" run_ids = [exp.run_id.hex for exp in sample_experiments] with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = sample_experiments result = await comparison_engine.compare_runs(run_ids) assert result is not None assert len(result.rankings) == 3 assert result.best_run is not None assert result.worst_run is not None assert result.total_runs == 3 # Check rankings (higher robustness should be better) best_run = next(r for r in result.rankings if r.is_best) worst_run = next(r for r in result.rankings if r.is_worst) assert best_run.rank == 1 assert worst_run.rank == 3 assert best_run.robustness_score > worst_run.robustness_score @pytest.mark.asyncio async def test_compare_runs_insufficient_runs(self, comparison_engine): """Test comparison with insufficient runs.""" with pytest.raises(ValueError, match="At least 2 runs required"): await comparison_engine.compare_runs(["run1"]) @pytest.mark.asyncio async def test_compare_runs_invalid_run_ids(self, comparison_engine): """Test comparison with invalid run IDs.""" with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: mock_get.return_value = None with pytest.raises(ValueError, match="Insufficient valid experiments"): await comparison_engine.compare_runs(["invalid1", "invalid2"]) @pytest.mark.asyncio async def test_metric_deltas_calculation(self, comparison_engine, sample_experiments): """Test metric delta calculations.""" run_ids = [exp.run_id.hex for exp in sample_experiments] with patch.object(comparison_engine.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = sample_experiments result = await comparison_engine.compare_runs(run_ids) # Check that deltas are calculated best_run = next(r for r in result.rankings if r.is_best) assert len(best_run.deltas_to_best) == 0 # Best run has no deltas to best assert len(best_run.deltas_to_worst) > 0 # But has deltas to worst # Check delta structure for metric_name, delta in best_run.deltas_to_worst.items(): assert delta.metric.value == metric_name assert isinstance(delta.delta, float) assert isinstance(delta.delta_percent, float) assert isinstance(delta.improvement, bool) class TestTrendAnalyzer: """Test trend analyzer functionality.""" @pytest.fixture async def trend_analyzer(self): """Create trend analyzer for testing.""" return TrendAnalyzer() @pytest.fixture def sample_trend_experiments(self): """Create sample experiments with trend data.""" experiments = [] for i in range(5): experiment = Experiment( run_id=uuid4(), experiment_name=f"Trend Experiment {i+1}", config_snapshot=ConfigSnapshot( model_name="trend-model", model_config={}, attack_types=["jailbreak"], pipeline_config={}, prompt_count=10, max_iterations=5, mutation_enabled=False, weights={} ), model_name="trend-model", dataset_name="trend-dataset", attack_types=["jailbreak"], prompt_count=10, status=ExperimentStatus.COMPLETED, created_at=datetime.utcnow() - timedelta(days=4-i), # 4,3,2,1,0 days ago completed_at=datetime.utcnow() - timedelta(days=4-i, hours=-1), result_summary=ResultSummary( robustness_score=0.3 + (i * 0.1), # Increasing trend: 0.3, 0.4, 0.5, 0.6, 0.7 risk_score=0.6 - (i * 0.05), # Decreasing trend: 0.6, 0.55, 0.5, 0.45, 0.4 success_rate=0.2 + (i * 0.1), # Increasing trend: 0.2, 0.3, 0.4, 0.5, 0.6 total_attacks=10, successful_attacks=2 + i, failed_attacks=8 - i, execution_time_ms=1000 - (i * 50) # Decreasing trend ) ) experiments.append(experiment) return experiments @pytest.mark.asyncio async def test_analyze_trend_success(self, trend_analyzer, sample_trend_experiments): """Test successful trend analysis.""" run_ids = [exp.run_id.hex for exp in sample_trend_experiments] with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = sample_trend_experiments result = await trend_analyzer.analyze_trend(run_ids) assert result is not None assert len(result.metric_trends) > 0 assert result.total_runs == 5 assert result.time_period_days >= 4 assert result.overall_direction in [TrendDir.INCREASING, TrendDir.DECREASING, TrendDir.STABLE] assert 0 <= result.overall_health_score <= 1 @pytest.mark.asyncio async def test_analyze_trend_insufficient_runs(self, trend_analyzer): """Test trend analysis with insufficient runs.""" with pytest.raises(ValueError, match="At least 3 runs required"): await trend_analyzer.analyze_trend(["run1", "run2"]) @pytest.mark.asyncio async def test_trend_direction_detection(self, trend_analyzer, sample_trend_experiments): """Test trend direction detection.""" run_ids = [exp.run_id.hex for exp in sample_trend_experiments] with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = sample_trend_experiments result = await trend_analyzer.analyze_trend(run_ids) # Check robustness trend (should be increasing) robustness_trend = result.metric_trends.get('robustness_score') if robustness_trend: assert robustness_trend.metrics.direction in [TrendDir.INCREASING, TrendDir.STABLE] assert robustness_trend.metrics.strength in [TrendStrength.WEAK, TrendStrength.MODERATE, TrendStrength.STRONG] @pytest.mark.asyncio async def test_anomaly_detection(self, trend_analyzer): """Test anomaly detection in trends.""" # Create experiments with an anomaly experiments = [] base_value = 0.5 for i in range(5): value = base_value + (i * 0.1) # Add anomaly in the middle if i == 2: value = 0.9 # Much higher than expected experiment = Experiment( run_id=uuid4(), experiment_name=f"Anomaly Test {i+1}", config_snapshot=ConfigSnapshot( model_name="test-model", model_config={}, attack_types=["jailbreak"], pipeline_config={}, prompt_count=10, max_iterations=5, mutation_enabled=False, weights={} ), model_name="test-model", dataset_name="test-dataset", attack_types=["jailbreak"], prompt_count=10, status=ExperimentStatus.COMPLETED, created_at=datetime.utcnow() - timedelta(days=4-i), result_summary=ResultSummary( robustness_score=value, risk_score=0.3, success_rate=0.5, total_attacks=10, successful_attacks=5, failed_attacks=5 ) ) experiments.append(experiment) run_ids = [exp.run_id.hex for exp in experiments] with patch.object(trend_analyzer.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = experiments result = await trend_analyzer.analyze_trend(run_ids) # Check if anomaly was detected robustness_trend = result.metric_trends.get('robustness_score') if robustness_trend: # Should detect at least one anomaly due to the outlier assert robustness_trend.metrics.stability_score < 0.9 # Lower stability due to anomaly class TestAggregationUtils: """Test aggregation utilities functionality.""" @pytest.fixture async def aggregation_utils(self): """Create aggregation utilities for testing.""" return AggregationUtils() @pytest.fixture def sample_aggregation_experiments(self): """Create sample experiments for aggregation testing.""" experiments = [] models = ["model-a", "model-b", "model-a"] datasets = ["dataset-1", "dataset-1", "dataset-2"] for i in range(3): experiment = Experiment( run_id=uuid4(), experiment_name=f"Aggregation Test {i+1}", config_snapshot=ConfigSnapshot( model_name=models[i], model_config={}, attack_types=["jailbreak"], pipeline_config={}, prompt_count=10, max_iterations=5, mutation_enabled=False, weights={} ), model_name=models[i], dataset_name=datasets[i], attack_types=["jailbreak"], prompt_count=10, status=ExperimentStatus.COMPLETED, created_at=datetime.utcnow() - timedelta(hours=i), result_summary=ResultSummary( robustness_score=0.4 + (i * 0.1), risk_score=0.5 - (i * 0.05), success_rate=0.3 + (i * 0.2), total_attacks=10, successful_attacks=3 + i, failed_attacks=7 - i, execution_time_ms=1000 + (i * 200) ) ) experiments.append(experiment) return experiments @pytest.mark.asyncio async def test_aggregate_metrics_success(self, aggregation_utils, sample_aggregation_experiments): """Test successful metrics aggregation.""" result = await aggregation_utils.aggregate_metrics(sample_aggregation_experiments) assert result is not None assert result.total_experiments == 3 assert result.completed_experiments == 3 assert result.success_rate == 1.0 assert result.overall_health_score >= 0 assert result.model_distribution == {"model-a": 2, "model-b": 1} assert result.dataset_distribution == {"dataset-1": 2, "dataset-2": 1} @pytest.mark.asyncio async def test_aggregate_empty_experiments(self, aggregation_utils): """Test aggregation with empty experiments list.""" result = await aggregation_utils.aggregate_metrics([]) assert result.total_experiments == 0 assert result.completed_experiments == 0 assert result.success_rate == 0.0 @pytest.mark.asyncio async def test_aggregate_by_model(self, aggregation_utils, sample_aggregation_experiments): """Test aggregation by model.""" result = await aggregation_utils.aggregate_by_model(sample_aggregation_experiments) assert "model-a" in result assert "model-b" in result assert result["model-a"].total_experiments == 2 assert result["model-b"].total_experiments == 1 @pytest.mark.asyncio async def test_aggregate_by_dataset(self, aggregation_utils, sample_aggregation_experiments): """Test aggregation by dataset.""" result = await aggregation_utils.aggregate_by_dataset(sample_aggregation_experiments) assert "dataset-1" in result assert "dataset-2" in result assert result["dataset-1"].total_experiments == 2 assert result["dataset-2"].total_experiments == 1 @pytest.mark.asyncio async def test_get_top_performers(self, aggregation_utils, sample_aggregation_experiments): """Test getting top performers.""" top_performers = await aggregation_utils.get_top_performers( sample_aggregation_experiments, "robustness_score", 3 ) assert len(top_performers) == 3 assert top_performers[0][1] >= top_performers[1][1] # Should be sorted descending assert top_performers[1][1] >= top_performers[2][1] class TestAnalyticsService: """Test analytics service functionality.""" @pytest.fixture async def analytics_service(self): """Create analytics service for testing.""" mock_db = AsyncMock() return AnalyticsService(mock_db) @pytest.mark.asyncio async def test_fetch_runs_by_ids_success(self, analytics_service, sample_experiments): """Test successful run fetching by IDs.""" run_ids = [exp.run_id.hex for exp in sample_experiments[:2]] with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get: mock_get.side_effect = sample_experiments[:2] result = await analytics_service.fetch_runs_by_ids(run_ids) assert len(result) == 2 assert all(exp.run_id.hex in run_ids for exp in result) @pytest.mark.asyncio async def test_fetch_runs_by_ids_not_found(self, analytics_service): """Test fetching non-existent runs.""" with patch.object(analytics_service.experiment_manager.store, 'get_experiment') as mock_get: mock_get.return_value = None with pytest.raises(Exception): # Should raise HTTPException await analytics_service.fetch_runs_by_ids(["non-existent"]) @pytest.mark.asyncio async def test_compare_runs_integration(self, analytics_service, sample_experiments): """Test compare runs integration.""" run_ids = [exp.run_id.hex for exp in sample_experiments] with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch: mock_fetch.return_value = sample_experiments with patch.object(analytics_service.comparison_engine, 'compare_runs') as mock_compare: mock_compare.return_value = MagicMock() result = await analytics_service.compare_runs(run_ids) mock_fetch.assert_called_once_with(run_ids, None) mock_compare.assert_called_once_with(run_ids) @pytest.mark.asyncio async def test_analyze_trends_integration(self, analytics_service, sample_experiments): """Test trend analysis integration.""" run_ids = [exp.run_id.hex for exp in sample_experiments] with patch.object(analytics_service, 'fetch_runs_by_ids') as mock_fetch: mock_fetch.return_value = sample_experiments with patch.object(analytics_service.trend_analyzer, 'analyze_trend') as mock_analyze: mock_analyze.return_value = MagicMock() result = await analytics_service.analyze_trends(run_ids) mock_fetch.assert_called_once_with(run_ids, None) mock_analyze.assert_called_once_with(run_ids) class TestErrorHandling: """Test error handling in analytics operations.""" @pytest.mark.asyncio async def test_comparison_engine_error_handling(self): """Test comparison engine error handling.""" engine = ComparisonEngine() # Test with invalid UUID format with pytest.raises(ValueError): await engine.compare_runs(["invalid-uuid"]) # Test with insufficient runs with pytest.raises(ValueError, match="At least 2 runs required"): await engine.compare_runs([]) @pytest.mark.asyncio async def test_trend_analyzer_error_handling(self): """Test trend analyzer error handling.""" analyzer = TrendAnalyzer() # Test with insufficient runs with pytest.raises(ValueError, match="At least 3 runs required"): await analyzer.analyze_trend(["run1", "run2"]) # Test with invalid UUID format with pytest.raises(ValueError): await analyzer.analyze_trend(["invalid-uuid", "run2", "run3"]) @pytest.mark.asyncio async def test_aggregation_utils_error_handling(self): """Test aggregation utilities error handling.""" utils = AggregationUtils() # Test with empty values with pytest.raises(ValueError): await utils._calculate_metric_statistics("test_metric", []) def test_metric_validation(self): """Test metric validation and edge cases.""" from analytics.comparison_engine import ComparisonMetric # Test valid metrics assert ComparisonMetric.ROBUSTNESS_SCORE == "robustness_score" assert ComparisonMetric.RISK_SCORE == "risk_score" # Test improvement logic from analytics.comparison_engine import ComparisonEngine engine = ComparisonEngine() # Higher robustness should be improvement assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, 0.1) is True assert engine._is_improvement(ComparisonMetric.ROBUSTNESS_SCORE, -0.1) is False # Lower risk should be improvement assert engine._is_improvement(ComparisonMetric.RISK_SCORE, -0.1) is True assert engine._is_improvement(ComparisonMetric.RISK_SCORE, 0.1) is False if __name__ == "__main__": pytest.main([__file__, "-v"])