Spaces:
Sleeping
Sleeping
| """ | |
| Performance regression tests for the application | |
| """ | |
| import pytest | |
| import time | |
| import asyncio | |
| from typing import List, Dict, Any | |
| from unittest.mock import patch, AsyncMock | |
| import statistics | |
| class TestAPIPerformance: | |
| """Test API endpoint performance""" | |
| async def test_health_endpoint_response_time(self, async_client): | |
| """Test health endpoint response time""" | |
| start_time = time.time() | |
| response = await async_client.get("/health") | |
| response_time = time.time() - start_time | |
| assert response.status_code == 200 | |
| assert response_time < 0.1 # Should respond within 100ms | |
| async def test_merchant_search_performance(self, async_client, sample_merchant_data): | |
| """Test merchant search endpoint performance""" | |
| with patch('app.services.merchant.search_merchants') as mock_search: | |
| mock_search.return_value = [sample_merchant_data] * 10 | |
| start_time = time.time() | |
| response = await async_client.get("/api/v1/merchants/search", params={ | |
| "latitude": 40.7128, | |
| "longitude": -74.0060, | |
| "radius": 5000, | |
| "category": "salon" | |
| }) | |
| response_time = time.time() - start_time | |
| assert response.status_code == 200 | |
| assert response_time < 1.0 # Should respond within 1 second | |
| assert len(response.json()) == 10 | |
| async def test_nlp_processing_performance(self, async_client, mock_nlp_pipeline): | |
| """Test NLP processing endpoint performance""" | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline: | |
| mock_pipeline.process_query = mock_nlp_pipeline.process_query | |
| start_time = time.time() | |
| response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": "find the best hair salon near me with parking" | |
| }) | |
| response_time = time.time() - start_time | |
| assert response.status_code == 200 | |
| assert response_time < 2.0 # Should respond within 2 seconds | |
| async def test_concurrent_api_requests(self, async_client, sample_merchant_data): | |
| """Test concurrent API request handling""" | |
| with patch('app.services.merchant.get_merchants') as mock_get: | |
| mock_get.return_value = [sample_merchant_data] * 5 | |
| # Create 20 concurrent requests | |
| tasks = [ | |
| async_client.get("/api/v1/merchants/") | |
| for _ in range(20) | |
| ] | |
| start_time = time.time() | |
| responses = await asyncio.gather(*tasks) | |
| total_time = time.time() - start_time | |
| # All requests should succeed | |
| assert all(r.status_code == 200 for r in responses) | |
| # Should handle concurrent requests efficiently | |
| assert total_time < 5.0 # Within 5 seconds for 20 requests | |
| async def test_large_result_set_performance(self, async_client): | |
| """Test performance with large result sets""" | |
| # Mock large dataset | |
| large_dataset = [ | |
| { | |
| "_id": f"merchant_{i}", | |
| "name": f"Merchant {i}", | |
| "category": "salon", | |
| "location": {"type": "Point", "coordinates": [-74.0060, 40.7128]} | |
| } | |
| for i in range(100) | |
| ] | |
| with patch('app.services.merchant.get_merchants') as mock_get: | |
| mock_get.return_value = large_dataset | |
| start_time = time.time() | |
| response = await async_client.get("/api/v1/merchants/", params={"limit": 100}) | |
| response_time = time.time() - start_time | |
| assert response.status_code == 200 | |
| assert len(response.json()) == 100 | |
| assert response_time < 2.0 # Should handle large datasets efficiently | |
| class TestDatabasePerformance: | |
| """Test database operation performance""" | |
| async def test_single_merchant_query_performance(self, sample_merchant_data): | |
| """Test single merchant query performance""" | |
| from app.repositories.db_repository import get_merchant_by_id_from_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find_one.return_value = sample_merchant_data | |
| start_time = time.time() | |
| result = await get_merchant_by_id_from_db("test_merchant_123") | |
| query_time = time.time() - start_time | |
| assert result is not None | |
| assert query_time < 0.1 # Should complete within 100ms | |
| async def test_geospatial_search_performance(self, sample_merchant_data): | |
| """Test geospatial search performance""" | |
| from app.repositories.db_repository import search_merchants_in_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 20 | |
| start_time = time.time() | |
| result = await search_merchants_in_db( | |
| latitude=40.7128, | |
| longitude=-74.0060, | |
| radius=5000, | |
| category="salon" | |
| ) | |
| query_time = time.time() - start_time | |
| assert len(result) == 20 | |
| assert query_time < 0.5 # Should complete within 500ms | |
| async def test_concurrent_database_queries(self, sample_merchant_data): | |
| """Test concurrent database query performance""" | |
| from app.repositories.db_repository import get_merchant_by_id_from_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find_one.return_value = sample_merchant_data | |
| # Create 50 concurrent queries | |
| tasks = [ | |
| get_merchant_by_id_from_db(f"merchant_{i}") | |
| for i in range(50) | |
| ] | |
| start_time = time.time() | |
| results = await asyncio.gather(*tasks) | |
| total_time = time.time() - start_time | |
| assert len(results) == 50 | |
| assert all(r is not None for r in results) | |
| assert total_time < 2.0 # Should handle 50 concurrent queries within 2 seconds | |
| async def test_cache_performance(self, sample_merchant_data): | |
| """Test cache operation performance""" | |
| from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data | |
| with patch('app.nosql.get_redis_client') as mock_client: | |
| mock_redis = AsyncMock() | |
| mock_client.return_value = mock_redis | |
| mock_redis.setex.return_value = True | |
| mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}' | |
| # Test cache write performance | |
| start_time = time.time() | |
| await cache_merchant_data("test_merchant_123", sample_merchant_data) | |
| cache_write_time = time.time() - start_time | |
| # Test cache read performance | |
| start_time = time.time() | |
| result = await get_cached_merchant_data("test_merchant_123") | |
| cache_read_time = time.time() - start_time | |
| assert cache_write_time < 0.05 # Cache write within 50ms | |
| assert cache_read_time < 0.05 # Cache read within 50ms | |
| assert result is not None | |
| class TestNLPPerformance: | |
| """Test NLP processing performance""" | |
| async def test_intent_classification_performance(self): | |
| """Test intent classification performance""" | |
| from app.services.advanced_nlp import IntentClassifier | |
| classifier = IntentClassifier() | |
| test_queries = [ | |
| "find a hair salon", | |
| "best spa near me", | |
| "gym with parking", | |
| "dental clinic open now", | |
| "massage therapy luxury" | |
| ] | |
| start_time = time.time() | |
| results = [classifier.get_primary_intent(query) for query in test_queries] | |
| total_time = time.time() - start_time | |
| assert len(results) == 5 | |
| assert all(len(result) == 2 for result in results) # (intent, confidence) | |
| assert total_time < 1.0 # Should classify 5 queries within 1 second | |
| async def test_entity_extraction_performance(self): | |
| """Test entity extraction performance""" | |
| from app.services.advanced_nlp import BusinessEntityExtractor | |
| extractor = BusinessEntityExtractor() | |
| test_queries = [ | |
| "hair salon with parking near me", | |
| "luxury spa treatment with wifi", | |
| "budget-friendly gym open 24/7", | |
| "pet-friendly grooming service", | |
| "organic restaurant with outdoor seating" | |
| ] | |
| start_time = time.time() | |
| results = [extractor.extract_entities(query) for query in test_queries] | |
| total_time = time.time() - start_time | |
| assert len(results) == 5 | |
| assert all(isinstance(result, dict) for result in results) | |
| assert total_time < 2.0 # Should extract entities from 5 queries within 2 seconds | |
| async def test_semantic_matching_performance(self): | |
| """Test semantic matching performance""" | |
| from app.services.advanced_nlp import SemanticMatcher | |
| matcher = SemanticMatcher() | |
| test_queries = [ | |
| "hair salon", | |
| "spa treatment", | |
| "fitness center", | |
| "dental care", | |
| "massage therapy" | |
| ] | |
| start_time = time.time() | |
| results = [matcher.find_similar_services(query) for query in test_queries] | |
| total_time = time.time() - start_time | |
| assert len(results) == 5 | |
| assert all(isinstance(result, list) for result in results) | |
| assert total_time < 1.5 # Should find matches for 5 queries within 1.5 seconds | |
| async def test_complete_nlp_pipeline_performance(self): | |
| """Test complete NLP pipeline performance""" | |
| from app.services.advanced_nlp import AdvancedNLPPipeline | |
| pipeline = AdvancedNLPPipeline() | |
| test_queries = [ | |
| "find the best hair salon near me with parking", | |
| "luxury spa treatment open now", | |
| "budget-friendly gym with pool", | |
| "pet-friendly grooming service", | |
| "organic restaurant with delivery" | |
| ] | |
| processing_times = [] | |
| for query in test_queries: | |
| start_time = time.time() | |
| result = await pipeline.process_query(query) | |
| processing_time = time.time() - start_time | |
| processing_times.append(processing_time) | |
| assert "processing_time" in result | |
| assert processing_time < 3.0 # Each query within 3 seconds | |
| # Calculate statistics | |
| avg_time = statistics.mean(processing_times) | |
| max_time = max(processing_times) | |
| assert avg_time < 2.0 # Average processing time under 2 seconds | |
| assert max_time < 3.0 # Maximum processing time under 3 seconds | |
| class TestMemoryPerformance: | |
| """Test memory usage and performance""" | |
| async def test_memory_usage_during_processing(self): | |
| """Test memory usage during heavy processing""" | |
| import psutil | |
| import os | |
| process = psutil.Process(os.getpid()) | |
| initial_memory = process.memory_info().rss / 1024 / 1024 # MB | |
| # Simulate heavy processing | |
| from app.services.advanced_nlp import AdvancedNLPPipeline | |
| pipeline = AdvancedNLPPipeline() | |
| # Process multiple queries | |
| queries = [f"find service {i}" for i in range(100)] | |
| tasks = [pipeline.process_query(query) for query in queries] | |
| await asyncio.gather(*tasks) | |
| final_memory = process.memory_info().rss / 1024 / 1024 # MB | |
| memory_increase = final_memory - initial_memory | |
| # Memory increase should be reasonable (less than 100MB for 100 queries) | |
| assert memory_increase < 100 | |
| async def test_cache_memory_efficiency(self): | |
| """Test cache memory efficiency""" | |
| from app.services.advanced_nlp import AsyncNLPProcessor | |
| processor = AsyncNLPProcessor(max_cache_size=100) | |
| def dummy_processor(text): | |
| return {"processed": text} | |
| # Fill cache beyond limit | |
| for i in range(150): | |
| await processor.process_async(f"query_{i}", dummy_processor) | |
| # Cache should not exceed max size | |
| assert len(processor.cache) <= 100 | |
| async def test_garbage_collection_efficiency(self): | |
| """Test garbage collection during processing""" | |
| import gc | |
| # Force garbage collection | |
| gc.collect() | |
| initial_objects = len(gc.get_objects()) | |
| # Create and process many objects | |
| from app.services.advanced_nlp import AdvancedNLPPipeline | |
| pipeline = AdvancedNLPPipeline() | |
| for i in range(50): | |
| await pipeline.process_query(f"test query {i}") | |
| # Force garbage collection again | |
| gc.collect() | |
| final_objects = len(gc.get_objects()) | |
| # Object count should not grow excessively | |
| object_increase = final_objects - initial_objects | |
| assert object_increase < 1000 # Reasonable object increase | |
| class TestLoadTesting: | |
| """Load testing scenarios""" | |
| async def test_sustained_load_performance(self, async_client, sample_merchant_data): | |
| """Test performance under sustained load""" | |
| with patch('app.services.merchant.get_merchants') as mock_get: | |
| mock_get.return_value = [sample_merchant_data] * 10 | |
| # Simulate sustained load for 30 seconds | |
| end_time = time.time() + 30 | |
| request_count = 0 | |
| response_times = [] | |
| while time.time() < end_time: | |
| start_time = time.time() | |
| response = await async_client.get("/api/v1/merchants/") | |
| response_time = time.time() - start_time | |
| response_times.append(response_time) | |
| request_count += 1 | |
| assert response.status_code == 200 | |
| # Small delay to prevent overwhelming | |
| await asyncio.sleep(0.1) | |
| # Calculate performance metrics | |
| avg_response_time = statistics.mean(response_times) | |
| max_response_time = max(response_times) | |
| requests_per_second = request_count / 30 | |
| assert avg_response_time < 1.0 # Average response time under 1 second | |
| assert max_response_time < 3.0 # Max response time under 3 seconds | |
| assert requests_per_second > 5 # At least 5 requests per second | |
| async def test_burst_load_handling(self, async_client, sample_merchant_data): | |
| """Test handling of burst load""" | |
| with patch('app.services.merchant.search_merchants') as mock_search: | |
| mock_search.return_value = [sample_merchant_data] * 5 | |
| # Create burst of 100 concurrent requests | |
| tasks = [ | |
| async_client.get("/api/v1/merchants/search", params={ | |
| "latitude": 40.7128, | |
| "longitude": -74.0060, | |
| "radius": 5000 | |
| }) | |
| for _ in range(100) | |
| ] | |
| start_time = time.time() | |
| responses = await asyncio.gather(*tasks, return_exceptions=True) | |
| total_time = time.time() - start_time | |
| # Count successful responses | |
| successful_responses = [r for r in responses if hasattr(r, 'status_code') and r.status_code == 200] | |
| success_rate = len(successful_responses) / len(responses) | |
| assert success_rate > 0.95 # At least 95% success rate | |
| assert total_time < 10.0 # Complete within 10 seconds | |
| class TestPerformanceRegression: | |
| """Performance regression detection""" | |
| async def test_api_response_time_regression(self, async_client, performance_test_data): | |
| """Test for API response time regression""" | |
| queries = performance_test_data["queries"] | |
| max_expected_time = performance_test_data["expected_max_response_time"] | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_pipeline: | |
| mock_pipeline.process_query.return_value = { | |
| "query": "test", | |
| "primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8}, | |
| "entities": {}, | |
| "similar_services": [], | |
| "search_parameters": {}, | |
| "processing_time": 0.1 | |
| } | |
| response_times = [] | |
| for query in queries: | |
| start_time = time.time() | |
| response = await async_client.post("/api/v1/nlp/analyze-query", params={"query": query}) | |
| response_time = time.time() - start_time | |
| response_times.append(response_time) | |
| assert response.status_code == 200 | |
| avg_response_time = statistics.mean(response_times) | |
| max_response_time = max(response_times) | |
| # Check for performance regression | |
| assert avg_response_time < max_expected_time | |
| assert max_response_time < max_expected_time * 2 # Allow some variance | |
| async def test_database_query_performance_regression(self, sample_merchant_data): | |
| """Test for database query performance regression""" | |
| from app.repositories.db_repository import search_merchants_in_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] * 10 | |
| query_times = [] | |
| # Test multiple search scenarios | |
| for i in range(10): | |
| start_time = time.time() | |
| await search_merchants_in_db( | |
| latitude=40.7128 + (i * 0.01), | |
| longitude=-74.0060 + (i * 0.01), | |
| radius=5000, | |
| category="salon" | |
| ) | |
| query_time = time.time() - start_time | |
| query_times.append(query_time) | |
| avg_query_time = statistics.mean(query_times) | |
| max_query_time = max(query_times) | |
| # Database queries should be fast | |
| assert avg_query_time < 0.1 # Average under 100ms | |
| assert max_query_time < 0.2 # Maximum under 200ms |