bookmyservice-mhs / app /tests /test_performance.py
MukeshKapoor25's picture
test(performance): Add comprehensive test suite for performance optimization
7611990
"""
Performance regression tests for the application
"""
import pytest
import time
import asyncio
from typing import List, Dict, Any
from unittest.mock import patch, AsyncMock
import statistics
class TestAPIPerformance:
"""Test API endpoint performance"""
@pytest.mark.asyncio
async def test_health_endpoint_response_time(self, async_client):
"""Test health endpoint response time"""
start_time = time.time()
response = await async_client.get("/health")
response_time = time.time() - start_time
assert response.status_code == 200
assert response_time < 0.1 # Should respond within 100ms
@pytest.mark.asyncio
async def test_merchant_search_performance(self, async_client, sample_merchant_data):
"""Test merchant search endpoint performance"""
with patch('app.services.merchant.search_merchants') as mock_search:
mock_search.return_value = [sample_merchant_data] * 10
start_time = time.time()
response = await async_client.get("/api/v1/merchants/search", params={
"latitude": 40.7128,
"longitude": -74.0060,
"radius": 5000,
"category": "salon"
})
response_time = time.time() - start_time
assert response.status_code == 200
assert response_time < 1.0 # Should respond within 1 second
assert len(response.json()) == 10
@pytest.mark.asyncio
async def test_nlp_processing_performance(self, async_client, mock_nlp_pipeline):
"""Test NLP processing endpoint performance"""
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline:
mock_pipeline.process_query = mock_nlp_pipeline.process_query
start_time = time.time()
response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": "find the best hair salon near me with parking"
})
response_time = time.time() - start_time
assert response.status_code == 200
assert response_time < 2.0 # Should respond within 2 seconds
@pytest.mark.asyncio
async def test_concurrent_api_requests(self, async_client, sample_merchant_data):
"""Test concurrent API request handling"""
with patch('app.services.merchant.get_merchants') as mock_get:
mock_get.return_value = [sample_merchant_data] * 5
# Create 20 concurrent requests
tasks = [
async_client.get("/api/v1/merchants/")
for _ in range(20)
]
start_time = time.time()
responses = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# All requests should succeed
assert all(r.status_code == 200 for r in responses)
# Should handle concurrent requests efficiently
assert total_time < 5.0 # Within 5 seconds for 20 requests
@pytest.mark.asyncio
async def test_large_result_set_performance(self, async_client):
"""Test performance with large result sets"""
# Mock large dataset
large_dataset = [
{
"_id": f"merchant_{i}",
"name": f"Merchant {i}",
"category": "salon",
"location": {"type": "Point", "coordinates": [-74.0060, 40.7128]}
}
for i in range(100)
]
with patch('app.services.merchant.get_merchants') as mock_get:
mock_get.return_value = large_dataset
start_time = time.time()
response = await async_client.get("/api/v1/merchants/", params={"limit": 100})
response_time = time.time() - start_time
assert response.status_code == 200
assert len(response.json()) == 100
assert response_time < 2.0 # Should handle large datasets efficiently
class TestDatabasePerformance:
"""Test database operation performance"""
@pytest.mark.asyncio
async def test_single_merchant_query_performance(self, sample_merchant_data):
"""Test single merchant query performance"""
from app.repositories.db_repository import get_merchant_by_id_from_db
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find_one.return_value = sample_merchant_data
start_time = time.time()
result = await get_merchant_by_id_from_db("test_merchant_123")
query_time = time.time() - start_time
assert result is not None
assert query_time < 0.1 # Should complete within 100ms
@pytest.mark.asyncio
async def test_geospatial_search_performance(self, sample_merchant_data):
"""Test geospatial search performance"""
from app.repositories.db_repository import search_merchants_in_db
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 20
start_time = time.time()
result = await search_merchants_in_db(
latitude=40.7128,
longitude=-74.0060,
radius=5000,
category="salon"
)
query_time = time.time() - start_time
assert len(result) == 20
assert query_time < 0.5 # Should complete within 500ms
@pytest.mark.asyncio
async def test_concurrent_database_queries(self, sample_merchant_data):
"""Test concurrent database query performance"""
from app.repositories.db_repository import get_merchant_by_id_from_db
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find_one.return_value = sample_merchant_data
# Create 50 concurrent queries
tasks = [
get_merchant_by_id_from_db(f"merchant_{i}")
for i in range(50)
]
start_time = time.time()
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
assert len(results) == 50
assert all(r is not None for r in results)
assert total_time < 2.0 # Should handle 50 concurrent queries within 2 seconds
@pytest.mark.asyncio
async def test_cache_performance(self, sample_merchant_data):
"""Test cache operation performance"""
from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data
with patch('app.nosql.get_redis_client') as mock_client:
mock_redis = AsyncMock()
mock_client.return_value = mock_redis
mock_redis.setex.return_value = True
mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}'
# Test cache write performance
start_time = time.time()
await cache_merchant_data("test_merchant_123", sample_merchant_data)
cache_write_time = time.time() - start_time
# Test cache read performance
start_time = time.time()
result = await get_cached_merchant_data("test_merchant_123")
cache_read_time = time.time() - start_time
assert cache_write_time < 0.05 # Cache write within 50ms
assert cache_read_time < 0.05 # Cache read within 50ms
assert result is not None
class TestNLPPerformance:
"""Test NLP processing performance"""
@pytest.mark.asyncio
async def test_intent_classification_performance(self):
"""Test intent classification performance"""
from app.services.advanced_nlp import IntentClassifier
classifier = IntentClassifier()
test_queries = [
"find a hair salon",
"best spa near me",
"gym with parking",
"dental clinic open now",
"massage therapy luxury"
]
start_time = time.time()
results = [classifier.get_primary_intent(query) for query in test_queries]
total_time = time.time() - start_time
assert len(results) == 5
assert all(len(result) == 2 for result in results) # (intent, confidence)
assert total_time < 1.0 # Should classify 5 queries within 1 second
@pytest.mark.asyncio
async def test_entity_extraction_performance(self):
"""Test entity extraction performance"""
from app.services.advanced_nlp import BusinessEntityExtractor
extractor = BusinessEntityExtractor()
test_queries = [
"hair salon with parking near me",
"luxury spa treatment with wifi",
"budget-friendly gym open 24/7",
"pet-friendly grooming service",
"organic restaurant with outdoor seating"
]
start_time = time.time()
results = [extractor.extract_entities(query) for query in test_queries]
total_time = time.time() - start_time
assert len(results) == 5
assert all(isinstance(result, dict) for result in results)
assert total_time < 2.0 # Should extract entities from 5 queries within 2 seconds
@pytest.mark.asyncio
async def test_semantic_matching_performance(self):
"""Test semantic matching performance"""
from app.services.advanced_nlp import SemanticMatcher
matcher = SemanticMatcher()
test_queries = [
"hair salon",
"spa treatment",
"fitness center",
"dental care",
"massage therapy"
]
start_time = time.time()
results = [matcher.find_similar_services(query) for query in test_queries]
total_time = time.time() - start_time
assert len(results) == 5
assert all(isinstance(result, list) for result in results)
assert total_time < 1.5 # Should find matches for 5 queries within 1.5 seconds
@pytest.mark.asyncio
async def test_complete_nlp_pipeline_performance(self):
"""Test complete NLP pipeline performance"""
from app.services.advanced_nlp import AdvancedNLPPipeline
pipeline = AdvancedNLPPipeline()
test_queries = [
"find the best hair salon near me with parking",
"luxury spa treatment open now",
"budget-friendly gym with pool",
"pet-friendly grooming service",
"organic restaurant with delivery"
]
processing_times = []
for query in test_queries:
start_time = time.time()
result = await pipeline.process_query(query)
processing_time = time.time() - start_time
processing_times.append(processing_time)
assert "processing_time" in result
assert processing_time < 3.0 # Each query within 3 seconds
# Calculate statistics
avg_time = statistics.mean(processing_times)
max_time = max(processing_times)
assert avg_time < 2.0 # Average processing time under 2 seconds
assert max_time < 3.0 # Maximum processing time under 3 seconds
class TestMemoryPerformance:
"""Test memory usage and performance"""
@pytest.mark.asyncio
async def test_memory_usage_during_processing(self):
"""Test memory usage during heavy processing"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Simulate heavy processing
from app.services.advanced_nlp import AdvancedNLPPipeline
pipeline = AdvancedNLPPipeline()
# Process multiple queries
queries = [f"find service {i}" for i in range(100)]
tasks = [pipeline.process_query(query) for query in queries]
await asyncio.gather(*tasks)
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = final_memory - initial_memory
# Memory increase should be reasonable (less than 100MB for 100 queries)
assert memory_increase < 100
@pytest.mark.asyncio
async def test_cache_memory_efficiency(self):
"""Test cache memory efficiency"""
from app.services.advanced_nlp import AsyncNLPProcessor
processor = AsyncNLPProcessor(max_cache_size=100)
def dummy_processor(text):
return {"processed": text}
# Fill cache beyond limit
for i in range(150):
await processor.process_async(f"query_{i}", dummy_processor)
# Cache should not exceed max size
assert len(processor.cache) <= 100
@pytest.mark.asyncio
async def test_garbage_collection_efficiency(self):
"""Test garbage collection during processing"""
import gc
# Force garbage collection
gc.collect()
initial_objects = len(gc.get_objects())
# Create and process many objects
from app.services.advanced_nlp import AdvancedNLPPipeline
pipeline = AdvancedNLPPipeline()
for i in range(50):
await pipeline.process_query(f"test query {i}")
# Force garbage collection again
gc.collect()
final_objects = len(gc.get_objects())
# Object count should not grow excessively
object_increase = final_objects - initial_objects
assert object_increase < 1000 # Reasonable object increase
class TestLoadTesting:
"""Load testing scenarios"""
@pytest.mark.asyncio
async def test_sustained_load_performance(self, async_client, sample_merchant_data):
"""Test performance under sustained load"""
with patch('app.services.merchant.get_merchants') as mock_get:
mock_get.return_value = [sample_merchant_data] * 10
# Simulate sustained load for 30 seconds
end_time = time.time() + 30
request_count = 0
response_times = []
while time.time() < end_time:
start_time = time.time()
response = await async_client.get("/api/v1/merchants/")
response_time = time.time() - start_time
response_times.append(response_time)
request_count += 1
assert response.status_code == 200
# Small delay to prevent overwhelming
await asyncio.sleep(0.1)
# Calculate performance metrics
avg_response_time = statistics.mean(response_times)
max_response_time = max(response_times)
requests_per_second = request_count / 30
assert avg_response_time < 1.0 # Average response time under 1 second
assert max_response_time < 3.0 # Max response time under 3 seconds
assert requests_per_second > 5 # At least 5 requests per second
@pytest.mark.asyncio
async def test_burst_load_handling(self, async_client, sample_merchant_data):
"""Test handling of burst load"""
with patch('app.services.merchant.search_merchants') as mock_search:
mock_search.return_value = [sample_merchant_data] * 5
# Create burst of 100 concurrent requests
tasks = [
async_client.get("/api/v1/merchants/search", params={
"latitude": 40.7128,
"longitude": -74.0060,
"radius": 5000
})
for _ in range(100)
]
start_time = time.time()
responses = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# Count successful responses
successful_responses = [r for r in responses if hasattr(r, 'status_code') and r.status_code == 200]
success_rate = len(successful_responses) / len(responses)
assert success_rate > 0.95 # At least 95% success rate
assert total_time < 10.0 # Complete within 10 seconds
class TestPerformanceRegression:
"""Performance regression detection"""
@pytest.mark.asyncio
async def test_api_response_time_regression(self, async_client, performance_test_data):
"""Test for API response time regression"""
queries = performance_test_data["queries"]
max_expected_time = performance_test_data["expected_max_response_time"]
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline:
mock_pipeline.process_query.return_value = {
"query": "test",
"primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8},
"entities": {},
"similar_services": [],
"search_parameters": {},
"processing_time": 0.1
}
response_times = []
for query in queries:
start_time = time.time()
response = await async_client.post("/api/v1/nlp/analyze-query", params={"query": query})
response_time = time.time() - start_time
response_times.append(response_time)
assert response.status_code == 200
avg_response_time = statistics.mean(response_times)
max_response_time = max(response_times)
# Check for performance regression
assert avg_response_time < max_expected_time
assert max_response_time < max_expected_time * 2 # Allow some variance
@pytest.mark.asyncio
async def test_database_query_performance_regression(self, sample_merchant_data):
"""Test for database query performance regression"""
from app.repositories.db_repository import search_merchants_in_db
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 10
query_times = []
# Test multiple search scenarios
for i in range(10):
start_time = time.time()
await search_merchants_in_db(
latitude=40.7128 + (i * 0.01),
longitude=-74.0060 + (i * 0.01),
radius=5000,
category="salon"
)
query_time = time.time() - start_time
query_times.append(query_time)
avg_query_time = statistics.mean(query_times)
max_query_time = max(query_times)
# Database queries should be fast
assert avg_query_time < 0.1 # Average under 100ms
assert max_query_time < 0.2 # Maximum under 200ms