Spaces:
Build error
Build error
| """ | |
| Unit tests for session management module | |
| """ | |
| import time | |
| import pytest | |
| from unittest.mock import Mock, patch | |
| from session import SessionManager, SessionMetrics, AsyncResponseCache, RateLimiter | |
| class TestSessionMetrics: | |
| """Test session metrics functionality""" | |
| def test_session_metrics_initialization(self): | |
| """Test metrics initialization""" | |
| metrics = SessionMetrics(session_id="test-123") | |
| assert metrics.session_id == "test-123" | |
| assert metrics.total_queries == 0 | |
| assert metrics.cache_hits == 0 | |
| assert metrics.total_response_time == 0.0 | |
| assert metrics.tool_usage == {} | |
| assert metrics.errors == [] | |
| assert metrics.parallel_executions == 0 | |
| def test_average_response_time(self): | |
| """Test average response time calculation""" | |
| metrics = SessionMetrics(session_id="test-123") | |
| # No queries yet | |
| assert metrics.average_response_time == 0.0 | |
| # Add some queries | |
| metrics.total_queries = 3 | |
| metrics.total_response_time = 6.0 | |
| assert metrics.average_response_time == 2.0 | |
| def test_cache_hit_rate(self): | |
| """Test cache hit rate calculation""" | |
| metrics = SessionMetrics(session_id="test-123") | |
| # No queries yet | |
| assert metrics.cache_hit_rate == 0.0 | |
| # Add queries and cache hits | |
| metrics.total_queries = 10 | |
| metrics.cache_hits = 3 | |
| assert metrics.cache_hit_rate == 30.0 | |
| def test_uptime_hours(self): | |
| """Test uptime calculation""" | |
| metrics = SessionMetrics(session_id="test-123") | |
| # Mock created_at to be 1 hour ago | |
| metrics.created_at = time.time() - 3600 | |
| assert 0.99 < metrics.uptime_hours < 1.01 # Allow small variance | |
| class TestAsyncResponseCache: | |
| """Test async response cache functionality""" | |
| def test_cache_initialization(self): | |
| """Test cache initialization""" | |
| cache = AsyncResponseCache(max_size=100, ttl_seconds=60) | |
| assert cache.max_size == 100 | |
| assert cache.ttl_seconds == 60 | |
| assert len(cache.cache) == 0 | |
| assert len(cache.timestamps) == 0 | |
| def test_cache_set_and_get(self): | |
| """Test cache set and get operations""" | |
| cache = AsyncResponseCache(max_size=100, ttl_seconds=60) | |
| # Set a value | |
| cache.set("key1", "value1") | |
| # Get the value | |
| assert cache.get("key1") == "value1" | |
| # Non-existent key | |
| assert cache.get("key2") is None | |
| def test_cache_expiration(self): | |
| """Test cache TTL expiration""" | |
| cache = AsyncResponseCache(max_size=100, ttl_seconds=1) # 1 second TTL | |
| # Set a value | |
| cache.set("key1", "value1") | |
| assert cache.get("key1") == "value1" | |
| # Wait for expiration | |
| time.sleep(1.1) | |
| assert cache.get("key1") is None | |
| def test_cache_size_limit(self): | |
| """Test cache size limiting""" | |
| cache = AsyncResponseCache(max_size=5, ttl_seconds=60) | |
| # Fill cache beyond limit | |
| for i in range(10): | |
| cache.set(f"key{i}", f"value{i}") | |
| # Should not exceed max size | |
| assert len(cache.cache) <= 5 | |
| def test_cache_stats(self): | |
| """Test cache statistics""" | |
| cache = AsyncResponseCache(max_size=100, ttl_seconds=60) | |
| cache.set("key1", "value1") | |
| cache.set("key2", "value2") | |
| stats = cache.get_stats() | |
| assert stats["size"] == 2 | |
| assert stats["max_size"] == 100 | |
| assert stats["ttl_seconds"] == 60 | |
| class TestRateLimiter: | |
| """Test rate limiter functionality""" | |
| def test_rate_limiter_initialization(self): | |
| """Test rate limiter initialization""" | |
| limiter = RateLimiter(max_requests_per_minute=60) | |
| assert limiter.max_requests_per_minute == 60 | |
| assert limiter.total_requests == 0 | |
| assert len(limiter.requests) == 0 | |
| def test_rate_limiting_enforcement(self): | |
| """Test that rate limiting is enforced""" | |
| limiter = RateLimiter(max_requests_per_minute=2) # Very low limit for testing | |
| # First two requests should go through quickly | |
| start = time.time() | |
| limiter.wait_if_needed() | |
| limiter.wait_if_needed() | |
| # Third request should be delayed | |
| limiter.wait_if_needed() | |
| elapsed = time.time() - start | |
| # Should have some delay due to rate limiting | |
| assert limiter.total_requests == 3 | |
| def test_rate_limiter_status(self): | |
| """Test rate limiter status reporting""" | |
| limiter = RateLimiter(max_requests_per_minute=60) | |
| limiter.wait_if_needed() | |
| status = limiter.get_status() | |
| assert status["max_rpm"] == 60 | |
| assert status["total_requests"] == 1 | |
| assert status["current_rpm"] == 1 | |
| assert "last_request_time" in status | |
| class TestSessionManager: | |
| """Test session manager functionality""" | |
| def test_session_creation(self): | |
| """Test session creation""" | |
| manager = SessionManager() | |
| # Create session with auto-generated ID | |
| session_id = manager.create_session() | |
| assert session_id in manager.sessions | |
| assert isinstance(manager.sessions[session_id], SessionMetrics) | |
| # Create session with specific ID | |
| custom_id = "custom-123" | |
| result_id = manager.create_session(custom_id) | |
| assert result_id == custom_id | |
| assert custom_id in manager.sessions | |
| def test_update_query_metrics(self): | |
| """Test updating query metrics""" | |
| manager = SessionManager() | |
| session_id = manager.create_session() | |
| # Update metrics | |
| manager.update_query_metrics(session_id, response_time=2.5, cache_hit=True, parallel=True) | |
| metrics = manager.get_session_metrics(session_id) | |
| assert metrics.total_queries == 1 | |
| assert metrics.total_response_time == 2.5 | |
| assert metrics.cache_hits == 1 | |
| assert metrics.parallel_executions == 1 | |
| def test_update_tool_usage(self): | |
| """Test updating tool usage""" | |
| manager = SessionManager() | |
| session_id = manager.create_session() | |
| # Update tool usage | |
| manager.update_tool_usage(session_id, "web_search") | |
| manager.update_tool_usage(session_id, "web_search") | |
| manager.update_tool_usage(session_id, "calculator") | |
| metrics = manager.get_session_metrics(session_id) | |
| assert metrics.tool_usage["web_search"] == 2 | |
| assert metrics.tool_usage["calculator"] == 1 | |
| def test_add_error(self): | |
| """Test error logging""" | |
| manager = SessionManager() | |
| session_id = manager.create_session() | |
| # Add an error | |
| error = ValueError("Test error") | |
| context = {"step": "test_step"} | |
| manager.add_error(session_id, error, context) | |
| metrics = manager.get_session_metrics(session_id) | |
| assert len(metrics.errors) == 1 | |
| assert metrics.errors[0]["error_type"] == "ValueError" | |
| assert metrics.errors[0]["error_message"] == "Test error" | |
| assert metrics.errors[0]["context"]["step"] == "test_step" | |
| def test_global_analytics(self): | |
| """Test global analytics aggregation""" | |
| manager = SessionManager() | |
| # Create multiple sessions with activity | |
| session1 = manager.create_session() | |
| session2 = manager.create_session() | |
| manager.update_query_metrics(session1, 1.0, cache_hit=True) | |
| manager.update_query_metrics(session1, 2.0) | |
| manager.update_query_metrics(session2, 3.0, parallel=True) | |
| manager.update_tool_usage(session1, "tool1") | |
| manager.update_tool_usage(session2, "tool1") | |
| manager.update_tool_usage(session2, "tool2") | |
| # Get global analytics | |
| analytics = manager.get_global_analytics() | |
| assert analytics["total_sessions"] == 2 | |
| assert analytics["total_queries"] == 3 | |
| assert analytics["avg_response_time"] == 2.0 # (1+2+3)/3 | |
| assert analytics["cache_hit_rate"] == 33.33333333333333 # 1/3 | |
| assert analytics["parallel_executions"] == 1 | |
| assert analytics["tool_usage"]["tool1"] == 2 | |
| assert analytics["tool_usage"]["tool2"] == 1 | |
| def test_cleanup_old_sessions(self): | |
| """Test cleaning up old sessions""" | |
| manager = SessionManager() | |
| # Create sessions | |
| old_session = manager.create_session() | |
| new_session = manager.create_session() | |
| # Make one session old | |
| manager.sessions[old_session].created_at = time.time() - (25 * 3600) # 25 hours ago | |
| # Cleanup sessions older than 24 hours | |
| manager.cleanup_old_sessions(max_age_hours=24) | |
| assert old_session not in manager.sessions | |
| assert new_session in manager.sessions |