RobotPai / tests /unit /test_session.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Unit tests for session management module
"""
import time
import pytest
from unittest.mock import Mock, patch
from session import SessionManager, SessionMetrics, AsyncResponseCache, RateLimiter
class TestSessionMetrics:
"""Test session metrics functionality"""
def test_session_metrics_initialization(self):
"""Test metrics initialization"""
metrics = SessionMetrics(session_id="test-123")
assert metrics.session_id == "test-123"
assert metrics.total_queries == 0
assert metrics.cache_hits == 0
assert metrics.total_response_time == 0.0
assert metrics.tool_usage == {}
assert metrics.errors == []
assert metrics.parallel_executions == 0
def test_average_response_time(self):
"""Test average response time calculation"""
metrics = SessionMetrics(session_id="test-123")
# No queries yet
assert metrics.average_response_time == 0.0
# Add some queries
metrics.total_queries = 3
metrics.total_response_time = 6.0
assert metrics.average_response_time == 2.0
def test_cache_hit_rate(self):
"""Test cache hit rate calculation"""
metrics = SessionMetrics(session_id="test-123")
# No queries yet
assert metrics.cache_hit_rate == 0.0
# Add queries and cache hits
metrics.total_queries = 10
metrics.cache_hits = 3
assert metrics.cache_hit_rate == 30.0
def test_uptime_hours(self):
"""Test uptime calculation"""
metrics = SessionMetrics(session_id="test-123")
# Mock created_at to be 1 hour ago
metrics.created_at = time.time() - 3600
assert 0.99 < metrics.uptime_hours < 1.01 # Allow small variance
class TestAsyncResponseCache:
"""Test async response cache functionality"""
def test_cache_initialization(self):
"""Test cache initialization"""
cache = AsyncResponseCache(max_size=100, ttl_seconds=60)
assert cache.max_size == 100
assert cache.ttl_seconds == 60
assert len(cache.cache) == 0
assert len(cache.timestamps) == 0
def test_cache_set_and_get(self):
"""Test cache set and get operations"""
cache = AsyncResponseCache(max_size=100, ttl_seconds=60)
# Set a value
cache.set("key1", "value1")
# Get the value
assert cache.get("key1") == "value1"
# Non-existent key
assert cache.get("key2") is None
def test_cache_expiration(self):
"""Test cache TTL expiration"""
cache = AsyncResponseCache(max_size=100, ttl_seconds=1) # 1 second TTL
# Set a value
cache.set("key1", "value1")
assert cache.get("key1") == "value1"
# Wait for expiration
time.sleep(1.1)
assert cache.get("key1") is None
def test_cache_size_limit(self):
"""Test cache size limiting"""
cache = AsyncResponseCache(max_size=5, ttl_seconds=60)
# Fill cache beyond limit
for i in range(10):
cache.set(f"key{i}", f"value{i}")
# Should not exceed max size
assert len(cache.cache) <= 5
def test_cache_stats(self):
"""Test cache statistics"""
cache = AsyncResponseCache(max_size=100, ttl_seconds=60)
cache.set("key1", "value1")
cache.set("key2", "value2")
stats = cache.get_stats()
assert stats["size"] == 2
assert stats["max_size"] == 100
assert stats["ttl_seconds"] == 60
class TestRateLimiter:
"""Test rate limiter functionality"""
def test_rate_limiter_initialization(self):
"""Test rate limiter initialization"""
limiter = RateLimiter(max_requests_per_minute=60)
assert limiter.max_requests_per_minute == 60
assert limiter.total_requests == 0
assert len(limiter.requests) == 0
def test_rate_limiting_enforcement(self):
"""Test that rate limiting is enforced"""
limiter = RateLimiter(max_requests_per_minute=2) # Very low limit for testing
# First two requests should go through quickly
start = time.time()
limiter.wait_if_needed()
limiter.wait_if_needed()
# Third request should be delayed
limiter.wait_if_needed()
elapsed = time.time() - start
# Should have some delay due to rate limiting
assert limiter.total_requests == 3
def test_rate_limiter_status(self):
"""Test rate limiter status reporting"""
limiter = RateLimiter(max_requests_per_minute=60)
limiter.wait_if_needed()
status = limiter.get_status()
assert status["max_rpm"] == 60
assert status["total_requests"] == 1
assert status["current_rpm"] == 1
assert "last_request_time" in status
class TestSessionManager:
"""Test session manager functionality"""
def test_session_creation(self):
"""Test session creation"""
manager = SessionManager()
# Create session with auto-generated ID
session_id = manager.create_session()
assert session_id in manager.sessions
assert isinstance(manager.sessions[session_id], SessionMetrics)
# Create session with specific ID
custom_id = "custom-123"
result_id = manager.create_session(custom_id)
assert result_id == custom_id
assert custom_id in manager.sessions
def test_update_query_metrics(self):
"""Test updating query metrics"""
manager = SessionManager()
session_id = manager.create_session()
# Update metrics
manager.update_query_metrics(session_id, response_time=2.5, cache_hit=True, parallel=True)
metrics = manager.get_session_metrics(session_id)
assert metrics.total_queries == 1
assert metrics.total_response_time == 2.5
assert metrics.cache_hits == 1
assert metrics.parallel_executions == 1
def test_update_tool_usage(self):
"""Test updating tool usage"""
manager = SessionManager()
session_id = manager.create_session()
# Update tool usage
manager.update_tool_usage(session_id, "web_search")
manager.update_tool_usage(session_id, "web_search")
manager.update_tool_usage(session_id, "calculator")
metrics = manager.get_session_metrics(session_id)
assert metrics.tool_usage["web_search"] == 2
assert metrics.tool_usage["calculator"] == 1
def test_add_error(self):
"""Test error logging"""
manager = SessionManager()
session_id = manager.create_session()
# Add an error
error = ValueError("Test error")
context = {"step": "test_step"}
manager.add_error(session_id, error, context)
metrics = manager.get_session_metrics(session_id)
assert len(metrics.errors) == 1
assert metrics.errors[0]["error_type"] == "ValueError"
assert metrics.errors[0]["error_message"] == "Test error"
assert metrics.errors[0]["context"]["step"] == "test_step"
def test_global_analytics(self):
"""Test global analytics aggregation"""
manager = SessionManager()
# Create multiple sessions with activity
session1 = manager.create_session()
session2 = manager.create_session()
manager.update_query_metrics(session1, 1.0, cache_hit=True)
manager.update_query_metrics(session1, 2.0)
manager.update_query_metrics(session2, 3.0, parallel=True)
manager.update_tool_usage(session1, "tool1")
manager.update_tool_usage(session2, "tool1")
manager.update_tool_usage(session2, "tool2")
# Get global analytics
analytics = manager.get_global_analytics()
assert analytics["total_sessions"] == 2
assert analytics["total_queries"] == 3
assert analytics["avg_response_time"] == 2.0 # (1+2+3)/3
assert analytics["cache_hit_rate"] == 33.33333333333333 # 1/3
assert analytics["parallel_executions"] == 1
assert analytics["tool_usage"]["tool1"] == 2
assert analytics["tool_usage"]["tool2"] == 1
def test_cleanup_old_sessions(self):
"""Test cleaning up old sessions"""
manager = SessionManager()
# Create sessions
old_session = manager.create_session()
new_session = manager.create_session()
# Make one session old
manager.sessions[old_session].created_at = time.time() - (25 * 3600) # 25 hours ago
# Cleanup sessions older than 24 hours
manager.cleanup_old_sessions(max_age_hours=24)
assert old_session not in manager.sessions
assert new_session in manager.sessions