ALM-2 / backend /tests /test_performance.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Performance tests for AegisLM SaaS Backend.
Tests concurrency control, rate limiting, metrics tracking,
and resource monitoring functionality.
"""
import pytest
import asyncio
import time
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime, timezone
from performance.concurrency_manager import ConcurrencyManager, TaskStatus
from performance.metrics_tracker import MetricsTracker, track_performance
from performance.resource_monitor import ResourceMonitor, ResourceThresholds
from core.config import settings
class TestConcurrencyManager:
"""Test concurrency manager functionality."""
@pytest.fixture
async def manager(self):
"""Create concurrency manager for testing."""
manager = ConcurrencyManager(max_concurrent_tasks=2)
await manager.start()
yield manager
await manager.stop()
@pytest.mark.asyncio
async def test_task_submission(self, manager):
"""Test task submission and execution."""
async def mock_task(task_id, duration=0.1):
await asyncio.sleep(duration)
return f"completed_{task_id}"
# Submit tasks
task1_accepted = await manager.submit_task("task1", mock_task, "task1", 0.1)
task2_accepted = await manager.submit_task("task2", mock_task, "task2", 0.1)
task3_accepted = await manager.submit_task("task3", mock_task, "task3", 0.1)
assert task1_accepted is True
assert task2_accepted is True
assert task3_accepted is True
# Wait for completion
await asyncio.sleep(0.5)
# Check task statuses
task1_status = await manager.get_task_status("task1")
task2_status = await manager.get_task_status("task2")
task3_status = await manager.get_task_status("task3")
assert task1_status.status == TaskStatus.COMPLETED
assert task2_status.status == TaskStatus.COMPLETED
assert task3_status.status == TaskStatus.COMPLETED
@pytest.mark.asyncio
async def test_concurrency_limit(self, manager):
"""Test concurrency limit enforcement."""
async def slow_task(task_id, duration=0.5):
await asyncio.sleep(duration)
return f"completed_{task_id}"
# Submit slow tasks
await manager.submit_task("slow1", slow_task, "slow1", 0.5)
await manager.submit_task("slow2", slow_task, "slow2", 0.5)
# Give them time to start
await asyncio.sleep(0.1)
# Check running tasks
metrics = await manager.get_metrics()
assert metrics["active_tasks"] <= 2
@pytest.mark.asyncio
async def test_queue_overflow(self, manager):
"""Test queue overflow handling."""
async def mock_task(task_id):
await asyncio.sleep(1)
return f"completed_{task_id}"
# Fill queue beyond capacity
accepted_tasks = []
for i in range(150): # More than max_queue_size (100)
accepted = await manager.submit_task(f"task_{i}", mock_task, f"task_{i}")
accepted_tasks.append(accepted)
# Some tasks should be rejected
assert any(not accepted for accepted in accepted_tasks)
# Check metrics
metrics = await manager.get_metrics()
assert metrics["total_tasks_rejected"] > 0
@pytest.mark.asyncio
async def test_metrics_collection(self, manager):
"""Test metrics collection."""
async def mock_task(task_id):
await asyncio.sleep(0.1)
return f"completed_{task_id}"
# Submit and complete tasks
await manager.submit_task("task1", mock_task, "task1")
await manager.submit_task("task2", mock_task, "task2")
# Wait for completion
await asyncio.sleep(0.3)
# Check metrics
metrics = await manager.get_metrics()
assert metrics["total_tasks_completed"] == 2
assert metrics["success_rate"] == 100.0
assert metrics["average_execution_time_ms"] > 0
class TestMetricsTracker:
"""Test metrics tracker functionality."""
@pytest.fixture
def tracker(self):
"""Create metrics tracker for testing."""
return MetricsTracker(retention_minutes=1, max_points=100)
@pytest.mark.asyncio
async def test_execution_time_recording(self, tracker):
"""Test execution time recording."""
await tracker.record_execution_time(150.5, "task1")
await tracker.record_execution_time(200.0, "task2")
metrics = await tracker.get_current_metrics()
assert metrics.execution_time_ms > 0
summary = await tracker.get_summary_stats()
assert summary["execution_time_stats"]["mean"] > 0
assert summary["data_points"]["execution_times"] == 2
@pytest.mark.asyncio
async def test_success_failure_recording(self, tracker):
"""Test success/failure recording."""
await tracker.record_task_success("task1")
await tracker.record_task_success("task2")
await tracker.record_task_failure("task3")
metrics = await tracker.get_current_metrics()
assert metrics.success_rate == 66.67 # 2/3 * 100
summary = await tracker.get_summary_stats()
assert summary["successful_tasks"] == 2
assert summary["failed_tasks"] == 1
@pytest.mark.asyncio
async def test_system_metrics_recording(self, tracker):
"""Test system metrics recording."""
await tracker.record_system_metrics(45.5, 67.8)
metrics = await tracker.get_current_metrics()
assert metrics.cpu_usage_percent == 45.5
assert metrics.memory_usage_percent == 67.8
@pytest.mark.asyncio
async def test_historical_data(self, tracker):
"""Test historical data retrieval."""
# Record some data points
for i in range(5):
await tracker.record_execution_time(100 + i * 10, f"task_{i}")
historical = await tracker.get_historical_metrics(minutes=1)
assert len(historical["execution_times"]) == 5
assert historical["execution_times"][0]["value"] == 100
@pytest.mark.asyncio
async def test_performance_decorator(self, tracker):
"""Test performance tracking decorator."""
@track_performance("test_task")
async def test_function():
await asyncio.sleep(0.1)
return "success"
result = await test_function()
assert result == "success"
# Check that metrics were recorded
summary = await tracker.get_summary_stats()
assert summary["successful_tasks"] == 1
class TestResourceMonitor:
"""Test resource monitor functionality."""
@pytest.fixture
async def monitor(self):
"""Create resource monitor for testing."""
monitor = ResourceMonitor(check_interval_seconds=1)
yield monitor
await monitor.stop()
@pytest.mark.asyncio
async def test_resource_thresholds(self, monitor):
"""Test resource threshold checking."""
# Set low thresholds for testing
monitor.set_thresholds(ResourceThresholds(
cpu_warning=50.0,
cpu_critical=80.0,
memory_warning=60.0,
memory_critical=85.0
))
# Mock high resource usage
with patch('psutil.cpu_percent', return_value=85.0), \
patch('psutil.virtual_memory') as mock_memory:
mock_memory.return_value.percent = 90.0
mock_memory.return_value.available = 2 * 1024**3 # 2GB
# Collect metrics
await monitor._collect_metrics()
# Check thresholds
await monitor._check_thresholds()
# Get current resources
resources = await monitor.get_current_resources()
assert resources.cpu_percent == 85.0
assert resources.memory_percent == 90.0
@pytest.mark.asyncio
async def test_load_assessment(self, monitor):
"""Test load assessment functionality."""
# Mock low resource usage
with patch('psutil.cpu_percent', return_value=30.0), \
patch('psutil.virtual_memory') as mock_memory:
mock_memory.return_value.percent = 40.0
mock_memory.return_value.available = 8 * 1024**3 # 8GB
await monitor._collect_metrics()
assessment = await monitor.check_can_handle_load(5)
assert assessment["can_handle"] is True
@pytest.mark.asyncio
async def test_alert_callback(self, monitor):
"""Test alert callback functionality."""
alert_received = False
alert_data = None
async def test_callback(alert):
nonlocal alert_received, alert_data
alert_received = True
alert_data = alert
monitor.add_alert_callback(test_callback)
# Trigger alert
await monitor._trigger_alert({
"type": "cpu_warning",
"value": 75.0,
"threshold": 70.0,
"message": "CPU usage high: 75.0%"
})
assert alert_received is True
assert alert_data["type"] == "cpu_warning"
assert alert_data["value"] == 75.0
class TestRateLimiting:
"""Test rate limiting functionality."""
@pytest.mark.asyncio
async def test_rate_limit_middleware(self):
"""Test rate limiting middleware."""
from middleware.rate_limit import RateLimitMiddleware
from fastapi import Request
from unittest.mock import AsyncMock
# Create mock request
request = MagicMock(spec=Request)
request.url.path = "/api/v1/evaluations/"
request.headers = {}
request.client = MagicMock()
request.client.host = "127.0.0.1"
# Create middleware
middleware = RateLimitMiddleware(None, default_limit=2, default_window=60)
# Mock Redis
with patch('middleware.rate_limit.get_redis') as mock_redis:
mock_client = AsyncMock()
mock_redis.return_value = mock_client
# Mock Redis operations
mock_client.zremrangebyscore.return_value = 0
mock_client.zcard.return_value = 0
mock_client.zadd.return_value = None
mock_client.expire.return_value = None
# Test multiple requests
call_next = AsyncMock()
call_next.return_value = MagicMock()
call_next.return_value.headers = {}
# First request should pass
response1 = await middleware.dispatch(request, call_next)
assert response1 is not None
# Second request should pass
response2 = await middleware.dispatch(request, call_next)
assert response2 is not None
# Third request should be rate limited
mock_client.zcard.return_value = 2 # At limit
response3 = await middleware.dispatch(request, call_next)
assert response3.status_code == 429
class TestIntegration:
"""Integration tests for performance components."""
@pytest.mark.asyncio
async def test_end_to_end_performance_tracking(self):
"""Test end-to-end performance tracking."""
# Initialize components
concurrency_manager = ConcurrencyManager(max_concurrent_tasks=2)
metrics_tracker = MetricsTracker()
await concurrency_manager.start()
try:
# Mock task with performance tracking
async def tracked_task(task_id, duration=0.1):
start_time = time.time()
await asyncio.sleep(duration)
# Record metrics
execution_time_ms = (time.time() - start_time) * 1000
await metrics_tracker.record_execution_time(execution_time_ms, task_id)
await metrics_tracker.record_task_success(task_id)
return f"completed_{task_id}"
# Submit tasks
await concurrency_manager.submit_task("task1", tracked_task, "task1", 0.1)
await concurrency_manager.submit_task("task2", tracked_task, "task2", 0.1)
# Wait for completion
await asyncio.sleep(0.3)
# Verify metrics
concurrency_metrics = await concurrency_manager.get_metrics()
performance_metrics = await metrics_tracker.get_current_metrics()
assert concurrency_metrics["total_tasks_completed"] == 2
assert performance_metrics.execution_time_ms > 0
finally:
await concurrency_manager.stop()
@pytest.mark.asyncio
async def test_configuration_integration(self):
"""Test configuration integration."""
# Test that configuration values are properly loaded
assert hasattr(settings, 'MAX_CONCURRENT_TASKS')
assert hasattr(settings, 'EVALUATION_RATE_LIMIT')
assert hasattr(settings, 'BENCHMARK_RATE_LIMIT')
assert hasattr(settings, 'CELERY_WORKER_CONCURRENCY')
assert hasattr(settings, 'CELERY_TASK_RATE_LIMIT')
# Test default values
assert settings.MAX_CONCURRENT_TASKS == 4
assert settings.EVALUATION_RATE_LIMIT == 20
assert settings.BENCHMARK_RATE_LIMIT == 10
assert settings.CELERY_WORKER_CONCURRENCY == 2
assert settings.CELERY_TASK_RATE_LIMIT == "10/m"
if __name__ == "__main__":
pytest.main([__file__, "-v"])