""" Performance test for case stats optimization """ import os import sys import time import pytest from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool # Ensure backend is in path sys.path.append(os.path.join(os.getcwd(), "backend")) from app.services.business.case_service import case_service from core.models.base import Base from core.models.case import Case @pytest.fixture(scope="function") def perf_db(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_local = sessionmaker(bind=engine) db = session_local() yield db, engine db.close() def test_get_case_stats_query_count(perf_db): db, engine = perf_db # Seed data cases = [] for i in range(100): status = "open" if i % 2 == 0 else "closed" priority = "critical" if i % 5 == 0 else "medium" cases.append( Case( title=f"Case {i}", description="desc", status=status, priority=priority, project_id="default", ) ) db.add_all(cases) db.commit() # Measure queries # Use a mutable object to store count since nonlocal won't work easily in fixture/test mix sometimes metrics = {"count": 0} @event.listens_for(engine, "before_cursor_execute") def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): metrics["count"] += 1 start_time = time.time() stats = case_service.get_case_stats(db, project_id="default") end_time = time.time() print(f"\nTime taken: {end_time - start_time:.6f}s") print(f"Queries executed: {metrics['count']}") print(f"Stats: {stats}") # Check correctness assert stats["total_cases"] == 100 assert stats["open_cases"] == 50 assert stats["closed_cases"] == 50 assert stats["critical_cases"] == 20 # Performance assertions assert metrics["count"] <= 5, f"Too many queries: {metrics['count']}" def test_case_list_response_time(client_with_db, auth_headers): """ Test that case list API responds within acceptable time. """ start_time = time.time() response = client_with_db.get("/api/v1/cases/", headers=auth_headers) end_time = time.time() duration = end_time - start_time # Assert response time is under 1 second assert duration < 1.0, f"Case list took {duration:.3f}s, expected < 1.0s" assert response.status_code == 200 def test_health_check_response_time(client): """ Test that health check responds within acceptable time. Note: In test environments without Redis, this may be slower. """ start_time = time.time() response = client.get("/health") end_time = time.time() duration = end_time - start_time # Allow longer time in test environment (Redis connection attempts add latency) assert duration < 5.0, f"Health check took {duration:.3f}s, expected < 5.0s" assert response.status_code in [200, 503] def test_database_query_performance(perf_db): """ Test that database queries complete within acceptable time. """ db, engine = perf_db # Create test data for i in range(50): case = Case( title=f"Performance Test Case {i}", description="Testing query performance", status="open", priority="medium", project_id="default", ) db.add(case) db.commit() # Measure query time start_time = time.time() result = db.query(Case).all() end_time = time.time() duration = end_time - start_time # Should complete in under 100ms assert duration < 0.1, f"Query took {duration:.3f}s for {len(result)} records" assert len(result) == 50 if __name__ == "__main__": # Manually run if executed directly engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(bind=engine) SessionLocal = sessionmaker(bind=engine) db = SessionLocal() cases = [] for i in range(100): status = "open" if i % 2 == 0 else "closed" priority = "critical" if i % 5 == 0 else "medium" cases.append( Case( title=f"Case {i}", description="desc", status=status, priority=priority, project_id="default", ) ) db.add_all(cases) db.commit() metrics = {"count": 0} @event.listens_for(engine, "before_cursor_execute") def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): metrics["count"] += 1 start_time = time.time() stats = case_service.get_case_stats(db, project_id="default") end_time = time.time() print(f"Time taken: {end_time - start_time:.6f}s") print(f"Queries executed: {metrics['count']}") print(f"Stats: {stats}")