| """ |
| Performance tests for AegisLM SaaS Backend. |
| |
| Tests concurrency control, rate limiting, metrics tracking, |
| and resource monitoring functionality. |
| """ |
|
|
| import pytest |
| import asyncio |
| import time |
| from unittest.mock import AsyncMock, MagicMock, patch |
| from datetime import datetime, timezone |
|
|
| from performance.concurrency_manager import ConcurrencyManager, TaskStatus |
| from performance.metrics_tracker import MetricsTracker, track_performance |
| from performance.resource_monitor import ResourceMonitor, ResourceThresholds |
| from core.config import settings |
|
|
|
|
| class TestConcurrencyManager: |
| """Test concurrency manager functionality.""" |
| |
| @pytest.fixture |
| async def manager(self): |
| """Create concurrency manager for testing.""" |
| manager = ConcurrencyManager(max_concurrent_tasks=2) |
| await manager.start() |
| yield manager |
| await manager.stop() |
| |
| @pytest.mark.asyncio |
| async def test_task_submission(self, manager): |
| """Test task submission and execution.""" |
| async def mock_task(task_id, duration=0.1): |
| await asyncio.sleep(duration) |
| return f"completed_{task_id}" |
| |
| |
| task1_accepted = await manager.submit_task("task1", mock_task, "task1", 0.1) |
| task2_accepted = await manager.submit_task("task2", mock_task, "task2", 0.1) |
| task3_accepted = await manager.submit_task("task3", mock_task, "task3", 0.1) |
| |
| assert task1_accepted is True |
| assert task2_accepted is True |
| assert task3_accepted is True |
| |
| |
| await asyncio.sleep(0.5) |
| |
| |
| task1_status = await manager.get_task_status("task1") |
| task2_status = await manager.get_task_status("task2") |
| task3_status = await manager.get_task_status("task3") |
| |
| assert task1_status.status == TaskStatus.COMPLETED |
| assert task2_status.status == TaskStatus.COMPLETED |
| assert task3_status.status == TaskStatus.COMPLETED |
| |
| @pytest.mark.asyncio |
| async def test_concurrency_limit(self, manager): |
| """Test concurrency limit enforcement.""" |
| async def slow_task(task_id, duration=0.5): |
| await asyncio.sleep(duration) |
| return f"completed_{task_id}" |
| |
| |
| await manager.submit_task("slow1", slow_task, "slow1", 0.5) |
| await manager.submit_task("slow2", slow_task, "slow2", 0.5) |
| |
| |
| await asyncio.sleep(0.1) |
| |
| |
| metrics = await manager.get_metrics() |
| assert metrics["active_tasks"] <= 2 |
| |
| @pytest.mark.asyncio |
| async def test_queue_overflow(self, manager): |
| """Test queue overflow handling.""" |
| async def mock_task(task_id): |
| await asyncio.sleep(1) |
| return f"completed_{task_id}" |
| |
| |
| accepted_tasks = [] |
| for i in range(150): |
| accepted = await manager.submit_task(f"task_{i}", mock_task, f"task_{i}") |
| accepted_tasks.append(accepted) |
| |
| |
| assert any(not accepted for accepted in accepted_tasks) |
| |
| |
| metrics = await manager.get_metrics() |
| assert metrics["total_tasks_rejected"] > 0 |
| |
| @pytest.mark.asyncio |
| async def test_metrics_collection(self, manager): |
| """Test metrics collection.""" |
| async def mock_task(task_id): |
| await asyncio.sleep(0.1) |
| return f"completed_{task_id}" |
| |
| |
| await manager.submit_task("task1", mock_task, "task1") |
| await manager.submit_task("task2", mock_task, "task2") |
| |
| |
| await asyncio.sleep(0.3) |
| |
| |
| metrics = await manager.get_metrics() |
| assert metrics["total_tasks_completed"] == 2 |
| assert metrics["success_rate"] == 100.0 |
| assert metrics["average_execution_time_ms"] > 0 |
|
|
|
|
| class TestMetricsTracker: |
| """Test metrics tracker functionality.""" |
| |
| @pytest.fixture |
| def tracker(self): |
| """Create metrics tracker for testing.""" |
| return MetricsTracker(retention_minutes=1, max_points=100) |
| |
| @pytest.mark.asyncio |
| async def test_execution_time_recording(self, tracker): |
| """Test execution time recording.""" |
| await tracker.record_execution_time(150.5, "task1") |
| await tracker.record_execution_time(200.0, "task2") |
| |
| metrics = await tracker.get_current_metrics() |
| assert metrics.execution_time_ms > 0 |
| |
| summary = await tracker.get_summary_stats() |
| assert summary["execution_time_stats"]["mean"] > 0 |
| assert summary["data_points"]["execution_times"] == 2 |
| |
| @pytest.mark.asyncio |
| async def test_success_failure_recording(self, tracker): |
| """Test success/failure recording.""" |
| await tracker.record_task_success("task1") |
| await tracker.record_task_success("task2") |
| await tracker.record_task_failure("task3") |
| |
| metrics = await tracker.get_current_metrics() |
| assert metrics.success_rate == 66.67 |
| |
| summary = await tracker.get_summary_stats() |
| assert summary["successful_tasks"] == 2 |
| assert summary["failed_tasks"] == 1 |
| |
| @pytest.mark.asyncio |
| async def test_system_metrics_recording(self, tracker): |
| """Test system metrics recording.""" |
| await tracker.record_system_metrics(45.5, 67.8) |
| |
| metrics = await tracker.get_current_metrics() |
| assert metrics.cpu_usage_percent == 45.5 |
| assert metrics.memory_usage_percent == 67.8 |
| |
| @pytest.mark.asyncio |
| async def test_historical_data(self, tracker): |
| """Test historical data retrieval.""" |
| |
| for i in range(5): |
| await tracker.record_execution_time(100 + i * 10, f"task_{i}") |
| |
| historical = await tracker.get_historical_metrics(minutes=1) |
| assert len(historical["execution_times"]) == 5 |
| assert historical["execution_times"][0]["value"] == 100 |
| |
| @pytest.mark.asyncio |
| async def test_performance_decorator(self, tracker): |
| """Test performance tracking decorator.""" |
| @track_performance("test_task") |
| async def test_function(): |
| await asyncio.sleep(0.1) |
| return "success" |
| |
| result = await test_function() |
| assert result == "success" |
| |
| |
| summary = await tracker.get_summary_stats() |
| assert summary["successful_tasks"] == 1 |
|
|
|
|
| class TestResourceMonitor: |
| """Test resource monitor functionality.""" |
| |
| @pytest.fixture |
| async def monitor(self): |
| """Create resource monitor for testing.""" |
| monitor = ResourceMonitor(check_interval_seconds=1) |
| yield monitor |
| await monitor.stop() |
| |
| @pytest.mark.asyncio |
| async def test_resource_thresholds(self, monitor): |
| """Test resource threshold checking.""" |
| |
| monitor.set_thresholds(ResourceThresholds( |
| cpu_warning=50.0, |
| cpu_critical=80.0, |
| memory_warning=60.0, |
| memory_critical=85.0 |
| )) |
| |
| |
| with patch('psutil.cpu_percent', return_value=85.0), \ |
| patch('psutil.virtual_memory') as mock_memory: |
| |
| mock_memory.return_value.percent = 90.0 |
| mock_memory.return_value.available = 2 * 1024**3 |
| |
| |
| await monitor._collect_metrics() |
| |
| |
| await monitor._check_thresholds() |
| |
| |
| resources = await monitor.get_current_resources() |
| assert resources.cpu_percent == 85.0 |
| assert resources.memory_percent == 90.0 |
| |
| @pytest.mark.asyncio |
| async def test_load_assessment(self, monitor): |
| """Test load assessment functionality.""" |
| |
| with patch('psutil.cpu_percent', return_value=30.0), \ |
| patch('psutil.virtual_memory') as mock_memory: |
| |
| mock_memory.return_value.percent = 40.0 |
| mock_memory.return_value.available = 8 * 1024**3 |
| |
| await monitor._collect_metrics() |
| |
| assessment = await monitor.check_can_handle_load(5) |
| assert assessment["can_handle"] is True |
| |
| @pytest.mark.asyncio |
| async def test_alert_callback(self, monitor): |
| """Test alert callback functionality.""" |
| alert_received = False |
| alert_data = None |
| |
| async def test_callback(alert): |
| nonlocal alert_received, alert_data |
| alert_received = True |
| alert_data = alert |
| |
| monitor.add_alert_callback(test_callback) |
| |
| |
| await monitor._trigger_alert({ |
| "type": "cpu_warning", |
| "value": 75.0, |
| "threshold": 70.0, |
| "message": "CPU usage high: 75.0%" |
| }) |
| |
| assert alert_received is True |
| assert alert_data["type"] == "cpu_warning" |
| assert alert_data["value"] == 75.0 |
|
|
|
|
| class TestRateLimiting: |
| """Test rate limiting functionality.""" |
| |
| @pytest.mark.asyncio |
| async def test_rate_limit_middleware(self): |
| """Test rate limiting middleware.""" |
| from middleware.rate_limit import RateLimitMiddleware |
| from fastapi import Request |
| from unittest.mock import AsyncMock |
| |
| |
| request = MagicMock(spec=Request) |
| request.url.path = "/api/v1/evaluations/" |
| request.headers = {} |
| request.client = MagicMock() |
| request.client.host = "127.0.0.1" |
| |
| |
| middleware = RateLimitMiddleware(None, default_limit=2, default_window=60) |
| |
| |
| with patch('middleware.rate_limit.get_redis') as mock_redis: |
| mock_client = AsyncMock() |
| mock_redis.return_value = mock_client |
| |
| |
| mock_client.zremrangebyscore.return_value = 0 |
| mock_client.zcard.return_value = 0 |
| mock_client.zadd.return_value = None |
| mock_client.expire.return_value = None |
| |
| |
| call_next = AsyncMock() |
| call_next.return_value = MagicMock() |
| call_next.return_value.headers = {} |
| |
| |
| response1 = await middleware.dispatch(request, call_next) |
| assert response1 is not None |
| |
| |
| response2 = await middleware.dispatch(request, call_next) |
| assert response2 is not None |
| |
| |
| mock_client.zcard.return_value = 2 |
| response3 = await middleware.dispatch(request, call_next) |
| assert response3.status_code == 429 |
|
|
|
|
| class TestIntegration: |
| """Integration tests for performance components.""" |
| |
| @pytest.mark.asyncio |
| async def test_end_to_end_performance_tracking(self): |
| """Test end-to-end performance tracking.""" |
| |
| concurrency_manager = ConcurrencyManager(max_concurrent_tasks=2) |
| metrics_tracker = MetricsTracker() |
| |
| await concurrency_manager.start() |
| |
| try: |
| |
| async def tracked_task(task_id, duration=0.1): |
| start_time = time.time() |
| await asyncio.sleep(duration) |
| |
| |
| execution_time_ms = (time.time() - start_time) * 1000 |
| await metrics_tracker.record_execution_time(execution_time_ms, task_id) |
| await metrics_tracker.record_task_success(task_id) |
| |
| return f"completed_{task_id}" |
| |
| |
| await concurrency_manager.submit_task("task1", tracked_task, "task1", 0.1) |
| await concurrency_manager.submit_task("task2", tracked_task, "task2", 0.1) |
| |
| |
| await asyncio.sleep(0.3) |
| |
| |
| concurrency_metrics = await concurrency_manager.get_metrics() |
| performance_metrics = await metrics_tracker.get_current_metrics() |
| |
| assert concurrency_metrics["total_tasks_completed"] == 2 |
| assert performance_metrics.execution_time_ms > 0 |
| |
| finally: |
| await concurrency_manager.stop() |
| |
| @pytest.mark.asyncio |
| async def test_configuration_integration(self): |
| """Test configuration integration.""" |
| |
| assert hasattr(settings, 'MAX_CONCURRENT_TASKS') |
| assert hasattr(settings, 'EVALUATION_RATE_LIMIT') |
| assert hasattr(settings, 'BENCHMARK_RATE_LIMIT') |
| assert hasattr(settings, 'CELERY_WORKER_CONCURRENCY') |
| assert hasattr(settings, 'CELERY_TASK_RATE_LIMIT') |
| |
| |
| assert settings.MAX_CONCURRENT_TASKS == 4 |
| assert settings.EVALUATION_RATE_LIMIT == 20 |
| assert settings.BENCHMARK_RATE_LIMIT == 10 |
| assert settings.CELERY_WORKER_CONCURRENCY == 2 |
| assert settings.CELERY_TASK_RATE_LIMIT == "10/m" |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v"]) |
|
|