""" Performance regression tests for the application """ import pytest import time import asyncio from typing import List, Dict, Any from unittest.mock import patch, AsyncMock import statistics class TestAPIPerformance: """Test API endpoint performance""" @pytest.mark.asyncio async def test_health_endpoint_response_time(self, async_client): """Test health endpoint response time""" start_time = time.time() response = await async_client.get("/health") response_time = time.time() - start_time assert response.status_code == 200 assert response_time < 0.1 # Should respond within 100ms @pytest.mark.asyncio async def test_merchant_search_performance(self, async_client, sample_merchant_data): """Test merchant search endpoint performance""" with patch('app.services.merchant.search_merchants') as mock_search: mock_search.return_value = [sample_merchant_data] * 10 start_time = time.time() response = await async_client.get("/api/v1/merchants/search", params={ "latitude": 40.7128, "longitude": -74.0060, "radius": 5000, "category": "salon" }) response_time = time.time() - start_time assert response.status_code == 200 assert response_time < 1.0 # Should respond within 1 second assert len(response.json()) == 10 @pytest.mark.asyncio async def test_nlp_processing_performance(self, async_client, mock_nlp_pipeline): """Test NLP processing endpoint performance""" with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline: mock_pipeline.process_query = mock_nlp_pipeline.process_query start_time = time.time() response = await async_client.post("/api/v1/nlp/analyze-query", params={ "query": "find the best hair salon near me with parking" }) response_time = time.time() - start_time assert response.status_code == 200 assert response_time < 2.0 # Should respond within 2 seconds @pytest.mark.asyncio async def test_concurrent_api_requests(self, async_client, sample_merchant_data): """Test concurrent API request handling""" with patch('app.services.merchant.get_merchants') as mock_get: mock_get.return_value = [sample_merchant_data] * 5 # Create 20 concurrent requests tasks = [ async_client.get("/api/v1/merchants/") for _ in range(20) ] start_time = time.time() responses = await asyncio.gather(*tasks) total_time = time.time() - start_time # All requests should succeed assert all(r.status_code == 200 for r in responses) # Should handle concurrent requests efficiently assert total_time < 5.0 # Within 5 seconds for 20 requests @pytest.mark.asyncio async def test_large_result_set_performance(self, async_client): """Test performance with large result sets""" # Mock large dataset large_dataset = [ { "_id": f"merchant_{i}", "name": f"Merchant {i}", "category": "salon", "location": {"type": "Point", "coordinates": [-74.0060, 40.7128]} } for i in range(100) ] with patch('app.services.merchant.get_merchants') as mock_get: mock_get.return_value = large_dataset start_time = time.time() response = await async_client.get("/api/v1/merchants/", params={"limit": 100}) response_time = time.time() - start_time assert response.status_code == 200 assert len(response.json()) == 100 assert response_time < 2.0 # Should handle large datasets efficiently class TestDatabasePerformance: """Test database operation performance""" @pytest.mark.asyncio async def test_single_merchant_query_performance(self, sample_merchant_data): """Test single merchant query performance""" from app.repositories.db_repository import get_merchant_by_id_from_db with patch('app.nosql.get_mongodb_client') as mock_client: mock_collection = AsyncMock() mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection mock_collection.find_one.return_value = sample_merchant_data start_time = time.time() result = await get_merchant_by_id_from_db("test_merchant_123") query_time = time.time() - start_time assert result is not None assert query_time < 0.1 # Should complete within 100ms @pytest.mark.asyncio async def test_geospatial_search_performance(self, sample_merchant_data): """Test geospatial search performance""" from app.repositories.db_repository import search_merchants_in_db with patch('app.nosql.get_mongodb_client') as mock_client: mock_collection = AsyncMock() mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 20 start_time = time.time() result = await search_merchants_in_db( latitude=40.7128, longitude=-74.0060, radius=5000, category="salon" ) query_time = time.time() - start_time assert len(result) == 20 assert query_time < 0.5 # Should complete within 500ms @pytest.mark.asyncio async def test_concurrent_database_queries(self, sample_merchant_data): """Test concurrent database query performance""" from app.repositories.db_repository import get_merchant_by_id_from_db with patch('app.nosql.get_mongodb_client') as mock_client: mock_collection = AsyncMock() mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection mock_collection.find_one.return_value = sample_merchant_data # Create 50 concurrent queries tasks = [ get_merchant_by_id_from_db(f"merchant_{i}") for i in range(50) ] start_time = time.time() results = await asyncio.gather(*tasks) total_time = time.time() - start_time assert len(results) == 50 assert all(r is not None for r in results) assert total_time < 2.0 # Should handle 50 concurrent queries within 2 seconds @pytest.mark.asyncio async def test_cache_performance(self, sample_merchant_data): """Test cache operation performance""" from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data with patch('app.nosql.get_redis_client') as mock_client: mock_redis = AsyncMock() mock_client.return_value = mock_redis mock_redis.setex.return_value = True mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}' # Test cache write performance start_time = time.time() await cache_merchant_data("test_merchant_123", sample_merchant_data) cache_write_time = time.time() - start_time # Test cache read performance start_time = time.time() result = await get_cached_merchant_data("test_merchant_123") cache_read_time = time.time() - start_time assert cache_write_time < 0.05 # Cache write within 50ms assert cache_read_time < 0.05 # Cache read within 50ms assert result is not None class TestNLPPerformance: """Test NLP processing performance""" @pytest.mark.asyncio async def test_intent_classification_performance(self): """Test intent classification performance""" from app.services.advanced_nlp import IntentClassifier classifier = IntentClassifier() test_queries = [ "find a hair salon", "best spa near me", "gym with parking", "dental clinic open now", "massage therapy luxury" ] start_time = time.time() results = [classifier.get_primary_intent(query) for query in test_queries] total_time = time.time() - start_time assert len(results) == 5 assert all(len(result) == 2 for result in results) # (intent, confidence) assert total_time < 1.0 # Should classify 5 queries within 1 second @pytest.mark.asyncio async def test_entity_extraction_performance(self): """Test entity extraction performance""" from app.services.advanced_nlp import BusinessEntityExtractor extractor = BusinessEntityExtractor() test_queries = [ "hair salon with parking near me", "luxury spa treatment with wifi", "budget-friendly gym open 24/7", "pet-friendly grooming service", "organic restaurant with outdoor seating" ] start_time = time.time() results = [extractor.extract_entities(query) for query in test_queries] total_time = time.time() - start_time assert len(results) == 5 assert all(isinstance(result, dict) for result in results) assert total_time < 2.0 # Should extract entities from 5 queries within 2 seconds @pytest.mark.asyncio async def test_semantic_matching_performance(self): """Test semantic matching performance""" from app.services.advanced_nlp import SemanticMatcher matcher = SemanticMatcher() test_queries = [ "hair salon", "spa treatment", "fitness center", "dental care", "massage therapy" ] start_time = time.time() results = [matcher.find_similar_services(query) for query in test_queries] total_time = time.time() - start_time assert len(results) == 5 assert all(isinstance(result, list) for result in results) assert total_time < 1.5 # Should find matches for 5 queries within 1.5 seconds @pytest.mark.asyncio async def test_complete_nlp_pipeline_performance(self): """Test complete NLP pipeline performance""" from app.services.advanced_nlp import AdvancedNLPPipeline pipeline = AdvancedNLPPipeline() test_queries = [ "find the best hair salon near me with parking", "luxury spa treatment open now", "budget-friendly gym with pool", "pet-friendly grooming service", "organic restaurant with delivery" ] processing_times = [] for query in test_queries: start_time = time.time() result = await pipeline.process_query(query) processing_time = time.time() - start_time processing_times.append(processing_time) assert "processing_time" in result assert processing_time < 3.0 # Each query within 3 seconds # Calculate statistics avg_time = statistics.mean(processing_times) max_time = max(processing_times) assert avg_time < 2.0 # Average processing time under 2 seconds assert max_time < 3.0 # Maximum processing time under 3 seconds class TestMemoryPerformance: """Test memory usage and performance""" @pytest.mark.asyncio async def test_memory_usage_during_processing(self): """Test memory usage during heavy processing""" import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB # Simulate heavy processing from app.services.advanced_nlp import AdvancedNLPPipeline pipeline = AdvancedNLPPipeline() # Process multiple queries queries = [f"find service {i}" for i in range(100)] tasks = [pipeline.process_query(query) for query in queries] await asyncio.gather(*tasks) final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = final_memory - initial_memory # Memory increase should be reasonable (less than 100MB for 100 queries) assert memory_increase < 100 @pytest.mark.asyncio async def test_cache_memory_efficiency(self): """Test cache memory efficiency""" from app.services.advanced_nlp import AsyncNLPProcessor processor = AsyncNLPProcessor(max_cache_size=100) def dummy_processor(text): return {"processed": text} # Fill cache beyond limit for i in range(150): await processor.process_async(f"query_{i}", dummy_processor) # Cache should not exceed max size assert len(processor.cache) <= 100 @pytest.mark.asyncio async def test_garbage_collection_efficiency(self): """Test garbage collection during processing""" import gc # Force garbage collection gc.collect() initial_objects = len(gc.get_objects()) # Create and process many objects from app.services.advanced_nlp import AdvancedNLPPipeline pipeline = AdvancedNLPPipeline() for i in range(50): await pipeline.process_query(f"test query {i}") # Force garbage collection again gc.collect() final_objects = len(gc.get_objects()) # Object count should not grow excessively object_increase = final_objects - initial_objects assert object_increase < 1000 # Reasonable object increase class TestLoadTesting: """Load testing scenarios""" @pytest.mark.asyncio async def test_sustained_load_performance(self, async_client, sample_merchant_data): """Test performance under sustained load""" with patch('app.services.merchant.get_merchants') as mock_get: mock_get.return_value = [sample_merchant_data] * 10 # Simulate sustained load for 30 seconds end_time = time.time() + 30 request_count = 0 response_times = [] while time.time() < end_time: start_time = time.time() response = await async_client.get("/api/v1/merchants/") response_time = time.time() - start_time response_times.append(response_time) request_count += 1 assert response.status_code == 200 # Small delay to prevent overwhelming await asyncio.sleep(0.1) # Calculate performance metrics avg_response_time = statistics.mean(response_times) max_response_time = max(response_times) requests_per_second = request_count / 30 assert avg_response_time < 1.0 # Average response time under 1 second assert max_response_time < 3.0 # Max response time under 3 seconds assert requests_per_second > 5 # At least 5 requests per second @pytest.mark.asyncio async def test_burst_load_handling(self, async_client, sample_merchant_data): """Test handling of burst load""" with patch('app.services.merchant.search_merchants') as mock_search: mock_search.return_value = [sample_merchant_data] * 5 # Create burst of 100 concurrent requests tasks = [ async_client.get("/api/v1/merchants/search", params={ "latitude": 40.7128, "longitude": -74.0060, "radius": 5000 }) for _ in range(100) ] start_time = time.time() responses = await asyncio.gather(*tasks, return_exceptions=True) total_time = time.time() - start_time # Count successful responses successful_responses = [r for r in responses if hasattr(r, 'status_code') and r.status_code == 200] success_rate = len(successful_responses) / len(responses) assert success_rate > 0.95 # At least 95% success rate assert total_time < 10.0 # Complete within 10 seconds class TestPerformanceRegression: """Performance regression detection""" @pytest.mark.asyncio async def test_api_response_time_regression(self, async_client, performance_test_data): """Test for API response time regression""" queries = performance_test_data["queries"] max_expected_time = performance_test_data["expected_max_response_time"] with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline: mock_pipeline.process_query.return_value = { "query": "test", "primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8}, "entities": {}, "similar_services": [], "search_parameters": {}, "processing_time": 0.1 } response_times = [] for query in queries: start_time = time.time() response = await async_client.post("/api/v1/nlp/analyze-query", params={"query": query}) response_time = time.time() - start_time response_times.append(response_time) assert response.status_code == 200 avg_response_time = statistics.mean(response_times) max_response_time = max(response_times) # Check for performance regression assert avg_response_time < max_expected_time assert max_response_time < max_expected_time * 2 # Allow some variance @pytest.mark.asyncio async def test_database_query_performance_regression(self, sample_merchant_data): """Test for database query performance regression""" from app.repositories.db_repository import search_merchants_in_db with patch('app.nosql.get_mongodb_client') as mock_client: mock_collection = AsyncMock() mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 10 query_times = [] # Test multiple search scenarios for i in range(10): start_time = time.time() await search_merchants_in_db( latitude=40.7128 + (i * 0.01), longitude=-74.0060 + (i * 0.01), radius=5000, category="salon" ) query_time = time.time() - start_time query_times.append(query_time) avg_query_time = statistics.mean(query_times) max_query_time = max(query_times) # Database queries should be fast assert avg_query_time < 0.1 # Average under 100ms assert max_query_time < 0.2 # Maximum under 200ms