zenith-backend / tests /performance /test_case_stats_perf.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Performance test for case stats optimization
"""
import os
import sys
import time
import pytest
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
# Ensure backend is in path
sys.path.append(os.path.join(os.getcwd(), "backend"))
from app.services.business.case_service import case_service
from core.models.base import Base
from core.models.case import Case
@pytest.fixture(scope="function")
def perf_db():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_local = sessionmaker(bind=engine)
db = session_local()
yield db, engine
db.close()
def test_get_case_stats_query_count(perf_db):
db, engine = perf_db
# Seed data
cases = []
for i in range(100):
status = "open" if i % 2 == 0 else "closed"
priority = "critical" if i % 5 == 0 else "medium"
cases.append(
Case(
title=f"Case {i}",
description="desc",
status=status,
priority=priority,
project_id="default",
)
)
db.add_all(cases)
db.commit()
# Measure queries
# Use a mutable object to store count since nonlocal won't work easily in fixture/test mix sometimes
metrics = {"count": 0}
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
metrics["count"] += 1
start_time = time.time()
stats = case_service.get_case_stats(db, project_id="default")
end_time = time.time()
print(f"\nTime taken: {end_time - start_time:.6f}s")
print(f"Queries executed: {metrics['count']}")
print(f"Stats: {stats}")
# Check correctness
assert stats["total_cases"] == 100
assert stats["open_cases"] == 50
assert stats["closed_cases"] == 50
assert stats["critical_cases"] == 20
# Performance assertions
assert metrics["count"] <= 5, f"Too many queries: {metrics['count']}"
def test_case_list_response_time(client_with_db, auth_headers):
"""
Test that case list API responds within acceptable time.
"""
start_time = time.time()
response = client_with_db.get("/api/v1/cases/", headers=auth_headers)
end_time = time.time()
duration = end_time - start_time
# Assert response time is under 1 second
assert duration < 1.0, f"Case list took {duration:.3f}s, expected < 1.0s"
assert response.status_code == 200
def test_health_check_response_time(client):
"""
Test that health check responds within acceptable time.
Note: In test environments without Redis, this may be slower.
"""
start_time = time.time()
response = client.get("/health")
end_time = time.time()
duration = end_time - start_time
# Allow longer time in test environment (Redis connection attempts add latency)
assert duration < 5.0, f"Health check took {duration:.3f}s, expected < 5.0s"
assert response.status_code in [200, 503]
def test_database_query_performance(perf_db):
"""
Test that database queries complete within acceptable time.
"""
db, engine = perf_db
# Create test data
for i in range(50):
case = Case(
title=f"Performance Test Case {i}",
description="Testing query performance",
status="open",
priority="medium",
project_id="default",
)
db.add(case)
db.commit()
# Measure query time
start_time = time.time()
result = db.query(Case).all()
end_time = time.time()
duration = end_time - start_time
# Should complete in under 100ms
assert duration < 0.1, f"Query took {duration:.3f}s for {len(result)} records"
assert len(result) == 50
if __name__ == "__main__":
# Manually run if executed directly
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
db = SessionLocal()
cases = []
for i in range(100):
status = "open" if i % 2 == 0 else "closed"
priority = "critical" if i % 5 == 0 else "medium"
cases.append(
Case(
title=f"Case {i}",
description="desc",
status=status,
priority=priority,
project_id="default",
)
)
db.add_all(cases)
db.commit()
metrics = {"count": 0}
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
metrics["count"] += 1
start_time = time.time()
stats = case_service.get_case_stats(db, project_id="default")
end_time = time.time()
print(f"Time taken: {end_time - start_time:.6f}s")
print(f"Queries executed: {metrics['count']}")
print(f"Stats: {stats}")