Spaces:
Paused
Paused
| """ | |
| Performance test for case stats optimization | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import pytest | |
| from sqlalchemy import create_engine, event | |
| from sqlalchemy.orm import sessionmaker | |
| from sqlalchemy.pool import StaticPool | |
| # Ensure backend is in path | |
| sys.path.append(os.path.join(os.getcwd(), "backend")) | |
| from app.services.business.case_service import case_service | |
| from core.models.base import Base | |
| from core.models.case import Case | |
| def perf_db(): | |
| engine = create_engine( | |
| "sqlite:///:memory:", | |
| connect_args={"check_same_thread": False}, | |
| poolclass=StaticPool, | |
| ) | |
| Base.metadata.create_all(bind=engine) | |
| session_local = sessionmaker(bind=engine) | |
| db = session_local() | |
| yield db, engine | |
| db.close() | |
| def test_get_case_stats_query_count(perf_db): | |
| db, engine = perf_db | |
| # Seed data | |
| cases = [] | |
| for i in range(100): | |
| status = "open" if i % 2 == 0 else "closed" | |
| priority = "critical" if i % 5 == 0 else "medium" | |
| cases.append( | |
| Case( | |
| title=f"Case {i}", | |
| description="desc", | |
| status=status, | |
| priority=priority, | |
| project_id="default", | |
| ) | |
| ) | |
| db.add_all(cases) | |
| db.commit() | |
| # Measure queries | |
| # Use a mutable object to store count since nonlocal won't work easily in fixture/test mix sometimes | |
| metrics = {"count": 0} | |
| def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): | |
| metrics["count"] += 1 | |
| start_time = time.time() | |
| stats = case_service.get_case_stats(db, project_id="default") | |
| end_time = time.time() | |
| print(f"\nTime taken: {end_time - start_time:.6f}s") | |
| print(f"Queries executed: {metrics['count']}") | |
| print(f"Stats: {stats}") | |
| # Check correctness | |
| assert stats["total_cases"] == 100 | |
| assert stats["open_cases"] == 50 | |
| assert stats["closed_cases"] == 50 | |
| assert stats["critical_cases"] == 20 | |
| # Performance assertions | |
| assert metrics["count"] <= 5, f"Too many queries: {metrics['count']}" | |
| def test_case_list_response_time(client_with_db, auth_headers): | |
| """ | |
| Test that case list API responds within acceptable time. | |
| """ | |
| start_time = time.time() | |
| response = client_with_db.get("/api/v1/cases/", headers=auth_headers) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| # Assert response time is under 1 second | |
| assert duration < 1.0, f"Case list took {duration:.3f}s, expected < 1.0s" | |
| assert response.status_code == 200 | |
| def test_health_check_response_time(client): | |
| """ | |
| Test that health check responds within acceptable time. | |
| Note: In test environments without Redis, this may be slower. | |
| """ | |
| start_time = time.time() | |
| response = client.get("/health") | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| # Allow longer time in test environment (Redis connection attempts add latency) | |
| assert duration < 5.0, f"Health check took {duration:.3f}s, expected < 5.0s" | |
| assert response.status_code in [200, 503] | |
| def test_database_query_performance(perf_db): | |
| """ | |
| Test that database queries complete within acceptable time. | |
| """ | |
| db, engine = perf_db | |
| # Create test data | |
| for i in range(50): | |
| case = Case( | |
| title=f"Performance Test Case {i}", | |
| description="Testing query performance", | |
| status="open", | |
| priority="medium", | |
| project_id="default", | |
| ) | |
| db.add(case) | |
| db.commit() | |
| # Measure query time | |
| start_time = time.time() | |
| result = db.query(Case).all() | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| # Should complete in under 100ms | |
| assert duration < 0.1, f"Query took {duration:.3f}s for {len(result)} records" | |
| assert len(result) == 50 | |
| if __name__ == "__main__": | |
| # Manually run if executed directly | |
| engine = create_engine("sqlite:///:memory:") | |
| Base.metadata.create_all(bind=engine) | |
| SessionLocal = sessionmaker(bind=engine) | |
| db = SessionLocal() | |
| cases = [] | |
| for i in range(100): | |
| status = "open" if i % 2 == 0 else "closed" | |
| priority = "critical" if i % 5 == 0 else "medium" | |
| cases.append( | |
| Case( | |
| title=f"Case {i}", | |
| description="desc", | |
| status=status, | |
| priority=priority, | |
| project_id="default", | |
| ) | |
| ) | |
| db.add_all(cases) | |
| db.commit() | |
| metrics = {"count": 0} | |
| def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): | |
| metrics["count"] += 1 | |
| start_time = time.time() | |
| stats = case_service.get_case_stats(db, project_id="default") | |
| end_time = time.time() | |
| print(f"Time taken: {end_time - start_time:.6f}s") | |
| print(f"Queries executed: {metrics['count']}") | |
| print(f"Stats: {stats}") | |