""" Performance tests for AegisLM SaaS Backend. Tests concurrency control, rate limiting, metrics tracking, and resource monitoring functionality. """ import pytest import asyncio import time from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timezone from performance.concurrency_manager import ConcurrencyManager, TaskStatus from performance.metrics_tracker import MetricsTracker, track_performance from performance.resource_monitor import ResourceMonitor, ResourceThresholds from core.config import settings class TestConcurrencyManager: """Test concurrency manager functionality.""" @pytest.fixture async def manager(self): """Create concurrency manager for testing.""" manager = ConcurrencyManager(max_concurrent_tasks=2) await manager.start() yield manager await manager.stop() @pytest.mark.asyncio async def test_task_submission(self, manager): """Test task submission and execution.""" async def mock_task(task_id, duration=0.1): await asyncio.sleep(duration) return f"completed_{task_id}" # Submit tasks task1_accepted = await manager.submit_task("task1", mock_task, "task1", 0.1) task2_accepted = await manager.submit_task("task2", mock_task, "task2", 0.1) task3_accepted = await manager.submit_task("task3", mock_task, "task3", 0.1) assert task1_accepted is True assert task2_accepted is True assert task3_accepted is True # Wait for completion await asyncio.sleep(0.5) # Check task statuses task1_status = await manager.get_task_status("task1") task2_status = await manager.get_task_status("task2") task3_status = await manager.get_task_status("task3") assert task1_status.status == TaskStatus.COMPLETED assert task2_status.status == TaskStatus.COMPLETED assert task3_status.status == TaskStatus.COMPLETED @pytest.mark.asyncio async def test_concurrency_limit(self, manager): """Test concurrency limit enforcement.""" async def slow_task(task_id, duration=0.5): await asyncio.sleep(duration) return f"completed_{task_id}" # Submit slow tasks await manager.submit_task("slow1", slow_task, "slow1", 0.5) await manager.submit_task("slow2", slow_task, "slow2", 0.5) # Give them time to start await asyncio.sleep(0.1) # Check running tasks metrics = await manager.get_metrics() assert metrics["active_tasks"] <= 2 @pytest.mark.asyncio async def test_queue_overflow(self, manager): """Test queue overflow handling.""" async def mock_task(task_id): await asyncio.sleep(1) return f"completed_{task_id}" # Fill queue beyond capacity accepted_tasks = [] for i in range(150): # More than max_queue_size (100) accepted = await manager.submit_task(f"task_{i}", mock_task, f"task_{i}") accepted_tasks.append(accepted) # Some tasks should be rejected assert any(not accepted for accepted in accepted_tasks) # Check metrics metrics = await manager.get_metrics() assert metrics["total_tasks_rejected"] > 0 @pytest.mark.asyncio async def test_metrics_collection(self, manager): """Test metrics collection.""" async def mock_task(task_id): await asyncio.sleep(0.1) return f"completed_{task_id}" # Submit and complete tasks await manager.submit_task("task1", mock_task, "task1") await manager.submit_task("task2", mock_task, "task2") # Wait for completion await asyncio.sleep(0.3) # Check metrics metrics = await manager.get_metrics() assert metrics["total_tasks_completed"] == 2 assert metrics["success_rate"] == 100.0 assert metrics["average_execution_time_ms"] > 0 class TestMetricsTracker: """Test metrics tracker functionality.""" @pytest.fixture def tracker(self): """Create metrics tracker for testing.""" return MetricsTracker(retention_minutes=1, max_points=100) @pytest.mark.asyncio async def test_execution_time_recording(self, tracker): """Test execution time recording.""" await tracker.record_execution_time(150.5, "task1") await tracker.record_execution_time(200.0, "task2") metrics = await tracker.get_current_metrics() assert metrics.execution_time_ms > 0 summary = await tracker.get_summary_stats() assert summary["execution_time_stats"]["mean"] > 0 assert summary["data_points"]["execution_times"] == 2 @pytest.mark.asyncio async def test_success_failure_recording(self, tracker): """Test success/failure recording.""" await tracker.record_task_success("task1") await tracker.record_task_success("task2") await tracker.record_task_failure("task3") metrics = await tracker.get_current_metrics() assert metrics.success_rate == 66.67 # 2/3 * 100 summary = await tracker.get_summary_stats() assert summary["successful_tasks"] == 2 assert summary["failed_tasks"] == 1 @pytest.mark.asyncio async def test_system_metrics_recording(self, tracker): """Test system metrics recording.""" await tracker.record_system_metrics(45.5, 67.8) metrics = await tracker.get_current_metrics() assert metrics.cpu_usage_percent == 45.5 assert metrics.memory_usage_percent == 67.8 @pytest.mark.asyncio async def test_historical_data(self, tracker): """Test historical data retrieval.""" # Record some data points for i in range(5): await tracker.record_execution_time(100 + i * 10, f"task_{i}") historical = await tracker.get_historical_metrics(minutes=1) assert len(historical["execution_times"]) == 5 assert historical["execution_times"][0]["value"] == 100 @pytest.mark.asyncio async def test_performance_decorator(self, tracker): """Test performance tracking decorator.""" @track_performance("test_task") async def test_function(): await asyncio.sleep(0.1) return "success" result = await test_function() assert result == "success" # Check that metrics were recorded summary = await tracker.get_summary_stats() assert summary["successful_tasks"] == 1 class TestResourceMonitor: """Test resource monitor functionality.""" @pytest.fixture async def monitor(self): """Create resource monitor for testing.""" monitor = ResourceMonitor(check_interval_seconds=1) yield monitor await monitor.stop() @pytest.mark.asyncio async def test_resource_thresholds(self, monitor): """Test resource threshold checking.""" # Set low thresholds for testing monitor.set_thresholds(ResourceThresholds( cpu_warning=50.0, cpu_critical=80.0, memory_warning=60.0, memory_critical=85.0 )) # Mock high resource usage with patch('psutil.cpu_percent', return_value=85.0), \ patch('psutil.virtual_memory') as mock_memory: mock_memory.return_value.percent = 90.0 mock_memory.return_value.available = 2 * 1024**3 # 2GB # Collect metrics await monitor._collect_metrics() # Check thresholds await monitor._check_thresholds() # Get current resources resources = await monitor.get_current_resources() assert resources.cpu_percent == 85.0 assert resources.memory_percent == 90.0 @pytest.mark.asyncio async def test_load_assessment(self, monitor): """Test load assessment functionality.""" # Mock low resource usage with patch('psutil.cpu_percent', return_value=30.0), \ patch('psutil.virtual_memory') as mock_memory: mock_memory.return_value.percent = 40.0 mock_memory.return_value.available = 8 * 1024**3 # 8GB await monitor._collect_metrics() assessment = await monitor.check_can_handle_load(5) assert assessment["can_handle"] is True @pytest.mark.asyncio async def test_alert_callback(self, monitor): """Test alert callback functionality.""" alert_received = False alert_data = None async def test_callback(alert): nonlocal alert_received, alert_data alert_received = True alert_data = alert monitor.add_alert_callback(test_callback) # Trigger alert await monitor._trigger_alert({ "type": "cpu_warning", "value": 75.0, "threshold": 70.0, "message": "CPU usage high: 75.0%" }) assert alert_received is True assert alert_data["type"] == "cpu_warning" assert alert_data["value"] == 75.0 class TestRateLimiting: """Test rate limiting functionality.""" @pytest.mark.asyncio async def test_rate_limit_middleware(self): """Test rate limiting middleware.""" from middleware.rate_limit import RateLimitMiddleware from fastapi import Request from unittest.mock import AsyncMock # Create mock request request = MagicMock(spec=Request) request.url.path = "/api/v1/evaluations/" request.headers = {} request.client = MagicMock() request.client.host = "127.0.0.1" # Create middleware middleware = RateLimitMiddleware(None, default_limit=2, default_window=60) # Mock Redis with patch('middleware.rate_limit.get_redis') as mock_redis: mock_client = AsyncMock() mock_redis.return_value = mock_client # Mock Redis operations mock_client.zremrangebyscore.return_value = 0 mock_client.zcard.return_value = 0 mock_client.zadd.return_value = None mock_client.expire.return_value = None # Test multiple requests call_next = AsyncMock() call_next.return_value = MagicMock() call_next.return_value.headers = {} # First request should pass response1 = await middleware.dispatch(request, call_next) assert response1 is not None # Second request should pass response2 = await middleware.dispatch(request, call_next) assert response2 is not None # Third request should be rate limited mock_client.zcard.return_value = 2 # At limit response3 = await middleware.dispatch(request, call_next) assert response3.status_code == 429 class TestIntegration: """Integration tests for performance components.""" @pytest.mark.asyncio async def test_end_to_end_performance_tracking(self): """Test end-to-end performance tracking.""" # Initialize components concurrency_manager = ConcurrencyManager(max_concurrent_tasks=2) metrics_tracker = MetricsTracker() await concurrency_manager.start() try: # Mock task with performance tracking async def tracked_task(task_id, duration=0.1): start_time = time.time() await asyncio.sleep(duration) # Record metrics execution_time_ms = (time.time() - start_time) * 1000 await metrics_tracker.record_execution_time(execution_time_ms, task_id) await metrics_tracker.record_task_success(task_id) return f"completed_{task_id}" # Submit tasks await concurrency_manager.submit_task("task1", tracked_task, "task1", 0.1) await concurrency_manager.submit_task("task2", tracked_task, "task2", 0.1) # Wait for completion await asyncio.sleep(0.3) # Verify metrics concurrency_metrics = await concurrency_manager.get_metrics() performance_metrics = await metrics_tracker.get_current_metrics() assert concurrency_metrics["total_tasks_completed"] == 2 assert performance_metrics.execution_time_ms > 0 finally: await concurrency_manager.stop() @pytest.mark.asyncio async def test_configuration_integration(self): """Test configuration integration.""" # Test that configuration values are properly loaded assert hasattr(settings, 'MAX_CONCURRENT_TASKS') assert hasattr(settings, 'EVALUATION_RATE_LIMIT') assert hasattr(settings, 'BENCHMARK_RATE_LIMIT') assert hasattr(settings, 'CELERY_WORKER_CONCURRENCY') assert hasattr(settings, 'CELERY_TASK_RATE_LIMIT') # Test default values assert settings.MAX_CONCURRENT_TASKS == 4 assert settings.EVALUATION_RATE_LIMIT == 20 assert settings.BENCHMARK_RATE_LIMIT == 10 assert settings.CELERY_WORKER_CONCURRENCY == 2 assert settings.CELERY_TASK_RATE_LIMIT == "10/m" if __name__ == "__main__": pytest.main([__file__, "-v"])