Spaces:
Sleeping
Sleeping
| """ | |
| Integration tests for end-to-end functionality | |
| """ | |
| import pytest | |
| import asyncio | |
| from unittest.mock import patch, AsyncMock, MagicMock | |
| from typing import Dict, Any, List | |
| class TestEndToEndUserJourney: | |
| """Test complete user journey scenarios""" | |
| async def test_complete_search_journey(self, async_client, sample_merchant_data): | |
| """Test complete user search journey from query to results""" | |
| # Mock all dependencies | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \ | |
| patch('app.services.merchant.search_merchants') as mock_search: | |
| # Setup NLP pipeline mock | |
| mock_nlp.process_query.return_value = { | |
| "query": "find the best hair salon near me with parking", | |
| "primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.9}, | |
| "entities": { | |
| "service_types": ["haircut", "hair styling"], | |
| "amenities": ["parking"], | |
| "location_modifiers": ["near me"], | |
| "quality_indicators": ["best"] | |
| }, | |
| "similar_services": [("salon", 0.95), ("beauty", 0.8)], | |
| "search_parameters": { | |
| "merchant_category": "salon", | |
| "amenities": ["parking"], | |
| "radius": 5000, | |
| "min_rating": 4.0 | |
| }, | |
| "processing_time": 0.234 | |
| } | |
| # Setup merchant search mock | |
| mock_search.return_value = [sample_merchant_data] | |
| # Step 1: User submits natural language query | |
| nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": "find the best hair salon near me with parking", | |
| "latitude": 40.7128, | |
| "longitude": -74.0060 | |
| }) | |
| assert nlp_response.status_code == 200 | |
| nlp_data = nlp_response.json() | |
| assert nlp_data["status"] == "success" | |
| assert "analysis" in nlp_data | |
| # Step 2: Use NLP results to search merchants | |
| search_params = nlp_data["analysis"]["search_parameters"] | |
| search_response = await async_client.get("/api/v1/merchants/search", params={ | |
| "latitude": 40.7128, | |
| "longitude": -74.0060, | |
| **search_params | |
| }) | |
| assert search_response.status_code == 200 | |
| search_data = search_response.json() | |
| assert len(search_data) == 1 | |
| assert search_data[0]["name"] == "Test Hair Salon" | |
| assert "parking" in search_data[0]["amenities"] | |
| async def test_merchant_discovery_flow(self, async_client, sample_merchant_data): | |
| """Test merchant discovery and detail retrieval flow""" | |
| with patch('app.services.merchant.get_merchants') as mock_get_all, \ | |
| patch('app.services.merchant.get_merchant_by_id') as mock_get_by_id: | |
| mock_get_all.return_value = [sample_merchant_data] | |
| mock_get_by_id.return_value = sample_merchant_data | |
| # Step 1: Browse all merchants | |
| browse_response = await async_client.get("/api/v1/merchants/") | |
| assert browse_response.status_code == 200 | |
| merchants = browse_response.json() | |
| assert len(merchants) == 1 | |
| # Step 2: Get detailed information for specific merchant | |
| merchant_id = merchants[0]["_id"] | |
| detail_response = await async_client.get(f"/api/v1/merchants/{merchant_id}") | |
| assert detail_response.status_code == 200 | |
| merchant_detail = detail_response.json() | |
| assert merchant_detail["_id"] == merchant_id | |
| assert merchant_detail["name"] == "Test Hair Salon" | |
| assert "services" in merchant_detail | |
| assert "business_hours" in merchant_detail | |
| async def test_error_recovery_flow(self, async_client): | |
| """Test error recovery and graceful degradation""" | |
| # Test NLP service failure with fallback | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \ | |
| patch('app.services.helper.process_free_text') as mock_fallback: | |
| # NLP service fails | |
| mock_nlp.process_query.side_effect = Exception("NLP service unavailable") | |
| # Fallback service works | |
| mock_fallback.return_value = { | |
| "query": "hair salon", | |
| "extracted_keywords": ["hair", "salon"], | |
| "suggested_category": "salon" | |
| } | |
| # Should gracefully fall back to basic processing | |
| response = await async_client.post("/api/v1/helpers/process-text", json={ | |
| "text": "hair salon", | |
| "latitude": 40.7128, | |
| "longitude": -74.0060 | |
| }) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["suggested_category"] == "salon" | |
| class TestServiceIntegration: | |
| """Test integration between different services""" | |
| async def test_nlp_to_search_integration(self, sample_merchant_data): | |
| """Test integration between NLP processing and merchant search""" | |
| from app.services.advanced_nlp import AdvancedNLPPipeline | |
| from app.services.merchant import search_merchants | |
| with patch('app.repositories.db_repository.search_merchants_in_db') as mock_db_search: | |
| mock_db_search.return_value = [sample_merchant_data] | |
| # Process query with NLP | |
| pipeline = AdvancedNLPPipeline() | |
| nlp_result = await pipeline.process_query("find a hair salon with parking") | |
| # Use NLP results for merchant search | |
| search_params = nlp_result["search_parameters"] | |
| merchants = await search_merchants(**search_params) | |
| assert len(merchants) == 1 | |
| assert merchants[0]["category"] == "salon" | |
| async def test_cache_integration(self, sample_merchant_data): | |
| """Test cache integration across services""" | |
| from app.services.merchant import get_merchant_by_id | |
| from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data | |
| with patch('app.nosql.get_redis_client') as mock_redis_client, \ | |
| patch('app.repositories.db_repository.get_merchant_by_id_from_db') as mock_db: | |
| mock_redis = AsyncMock() | |
| mock_redis_client.return_value = mock_redis | |
| mock_db.return_value = sample_merchant_data | |
| # First call - cache miss, should hit database | |
| mock_redis.get.return_value = None | |
| result1 = await get_merchant_by_id("test_merchant_123") | |
| # Should cache the result | |
| mock_redis.setex.assert_called_once() | |
| # Second call - cache hit, should not hit database | |
| mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}' | |
| result2 = await get_merchant_by_id("test_merchant_123") | |
| assert result1 is not None | |
| assert result2 is not None | |
| async def test_database_to_api_integration(self, sample_merchant_data): | |
| """Test integration from database layer to API layer""" | |
| from app.repositories.db_repository import get_merchants_from_db | |
| from app.services.merchant import get_merchants | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.return_value.limit.return_value.skip.return_value.to_list.return_value = [sample_merchant_data] | |
| # Database layer | |
| db_result = await get_merchants_from_db(limit=10, skip=0) | |
| # Service layer | |
| service_result = await get_merchants(limit=10, skip=0) | |
| assert len(db_result) == 1 | |
| assert len(service_result) == 1 | |
| assert db_result[0]["name"] == service_result[0]["name"] | |
| class TestDataFlowIntegration: | |
| """Test data flow through the entire system""" | |
| async def test_search_data_flow(self, async_client, sample_merchant_data): | |
| """Test complete search data flow""" | |
| with patch('app.nosql.get_mongodb_client') as mock_mongo, \ | |
| patch('app.nosql.get_redis_client') as mock_redis_client: | |
| # Setup MongoDB mock | |
| mock_collection = AsyncMock() | |
| mock_mongo.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] | |
| # Setup Redis mock | |
| mock_redis = AsyncMock() | |
| mock_redis_client.return_value = mock_redis | |
| mock_redis.get.return_value = None # Cache miss | |
| # Make search request | |
| response = await async_client.get("/api/v1/merchants/search", params={ | |
| "latitude": 40.7128, | |
| "longitude": -74.0060, | |
| "radius": 5000, | |
| "category": "salon" | |
| }) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 1 | |
| assert data[0]["name"] == "Test Hair Salon" | |
| # Verify data flowed through all layers | |
| mock_collection.find.assert_called_once() # Database was queried | |
| mock_redis.setex.assert_called_once() # Result was cached | |
| async def test_nlp_processing_data_flow(self, async_client): | |
| """Test NLP processing data flow""" | |
| with patch('app.services.advanced_nlp.IntentClassifier') as mock_intent, \ | |
| patch('app.services.advanced_nlp.BusinessEntityExtractor') as mock_entity, \ | |
| patch('app.services.advanced_nlp.SemanticMatcher') as mock_semantic: | |
| # Setup mocks | |
| mock_intent_instance = MagicMock() | |
| mock_intent.return_value = mock_intent_instance | |
| mock_intent_instance.get_primary_intent.return_value = ("SEARCH_SERVICE", 0.9) | |
| mock_entity_instance = MagicMock() | |
| mock_entity.return_value = mock_entity_instance | |
| mock_entity_instance.extract_entities.return_value = {"service_types": ["haircut"]} | |
| mock_semantic_instance = MagicMock() | |
| mock_semantic.return_value = mock_semantic_instance | |
| mock_semantic_instance.find_similar_services.return_value = [("salon", 0.9)] | |
| response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": "find a hair salon" | |
| }) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "success" | |
| # Verify data flowed through NLP pipeline | |
| mock_intent_instance.get_primary_intent.assert_called_once() | |
| mock_entity_instance.extract_entities.assert_called_once() | |
| mock_semantic_instance.find_similar_services.assert_called_once() | |
| class TestConcurrencyIntegration: | |
| """Test concurrent operations and race conditions""" | |
| async def test_concurrent_cache_operations(self, sample_merchant_data): | |
| """Test concurrent cache read/write operations""" | |
| from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data | |
| with patch('app.nosql.get_redis_client') as mock_redis_client: | |
| mock_redis = AsyncMock() | |
| mock_redis_client.return_value = mock_redis | |
| mock_redis.setex.return_value = True | |
| mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}' | |
| # Create concurrent cache operations | |
| cache_tasks = [ | |
| cache_merchant_data(f"merchant_{i}", sample_merchant_data) | |
| for i in range(10) | |
| ] | |
| read_tasks = [ | |
| get_cached_merchant_data(f"merchant_{i}") | |
| for i in range(10) | |
| ] | |
| # Execute concurrently | |
| cache_results = await asyncio.gather(*cache_tasks) | |
| read_results = await asyncio.gather(*read_tasks) | |
| assert all(result is True for result in cache_results) | |
| assert all(result is not None for result in read_results) | |
| async def test_concurrent_nlp_processing(self): | |
| """Test concurrent NLP processing""" | |
| from app.services.advanced_nlp import AdvancedNLPPipeline | |
| pipeline = AdvancedNLPPipeline() | |
| queries = [ | |
| "find a hair salon", | |
| "best spa near me", | |
| "gym with parking", | |
| "dental clinic", | |
| "massage therapy" | |
| ] | |
| # Process queries concurrently | |
| tasks = [pipeline.process_query(query) for query in queries] | |
| results = await asyncio.gather(*tasks) | |
| assert len(results) == 5 | |
| assert all("query" in result for result in results) | |
| assert all("primary_intent" in result for result in results) | |
| async def test_concurrent_database_operations(self, sample_merchant_data): | |
| """Test concurrent database operations""" | |
| from app.repositories.db_repository import get_merchant_by_id_from_db, search_merchants_in_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = AsyncMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find_one.return_value = sample_merchant_data | |
| mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] | |
| # Mix of different database operations | |
| tasks = [] | |
| # Add get by ID tasks | |
| for i in range(5): | |
| tasks.append(get_merchant_by_id_from_db(f"merchant_{i}")) | |
| # Add search tasks | |
| for i in range(5): | |
| tasks.append(search_merchants_in_db( | |
| latitude=40.7128 + (i * 0.01), | |
| longitude=-74.0060 + (i * 0.01), | |
| radius=5000 | |
| )) | |
| results = await asyncio.gather(*tasks) | |
| assert len(results) == 10 | |
| # First 5 should be single merchants | |
| assert all(isinstance(result, dict) for result in results[:5]) | |
| # Last 5 should be lists of merchants | |
| assert all(isinstance(result, list) for result in results[5:]) | |
| class TestErrorHandlingIntegration: | |
| """Test error handling across integrated components""" | |
| async def test_database_failure_propagation(self, async_client): | |
| """Test how database failures propagate through the system""" | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_client.side_effect = Exception("Database connection failed") | |
| response = await async_client.get("/api/v1/merchants/") | |
| assert response.status_code == 500 | |
| async def test_cache_failure_graceful_degradation(self, async_client, sample_merchant_data): | |
| """Test graceful degradation when cache fails""" | |
| with patch('app.nosql.get_redis_client') as mock_redis_client, \ | |
| patch('app.repositories.db_repository.get_merchants_from_db') as mock_db: | |
| # Cache fails | |
| mock_redis_client.side_effect = Exception("Redis connection failed") | |
| # Database works | |
| mock_db.return_value = [sample_merchant_data] | |
| response = await async_client.get("/api/v1/merchants/") | |
| # Should still work, just without caching | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 1 | |
| async def test_nlp_service_fallback(self, async_client): | |
| """Test fallback when NLP service fails""" | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \ | |
| patch('app.services.helper.process_free_text') as mock_fallback: | |
| # NLP service fails | |
| mock_nlp.process_query.side_effect = Exception("NLP service error") | |
| # Fallback service works | |
| mock_fallback.return_value = { | |
| "query": "hair salon", | |
| "extracted_keywords": ["hair", "salon"], | |
| "suggested_category": "salon" | |
| } | |
| # Try NLP endpoint first (should fail) | |
| nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": "hair salon" | |
| }) | |
| assert nlp_response.status_code == 500 | |
| # Use fallback endpoint (should work) | |
| fallback_response = await async_client.post("/api/v1/helpers/process-text", json={ | |
| "text": "hair salon", | |
| "latitude": 40.7128, | |
| "longitude": -74.0060 | |
| }) | |
| assert fallback_response.status_code == 200 | |
| class TestSecurityIntegration: | |
| """Test security measures across integrated components""" | |
| async def test_input_sanitization_flow(self, async_client): | |
| """Test input sanitization across the request flow""" | |
| # Test with potentially malicious input | |
| malicious_query = "<script>alert('xss')</script>find salon" | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp: | |
| mock_nlp.process_query.return_value = { | |
| "query": "find salon", # Should be sanitized | |
| "primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8}, | |
| "entities": {}, | |
| "similar_services": [], | |
| "search_parameters": {}, | |
| "processing_time": 0.1 | |
| } | |
| response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": malicious_query | |
| }) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # Script tags should be removed/sanitized | |
| assert "<script>" not in data["analysis"]["query"] | |
| async def test_rate_limiting_integration(self, async_client): | |
| """Test rate limiting across endpoints""" | |
| # This would require actual rate limiting implementation | |
| # For now, test that multiple requests don't crash the system | |
| tasks = [ | |
| async_client.get("/health") | |
| for _ in range(50) | |
| ] | |
| responses = await asyncio.gather(*tasks, return_exceptions=True) | |
| # All requests should either succeed or be rate limited (not crash) | |
| for response in responses: | |
| if hasattr(response, 'status_code'): | |
| assert response.status_code in [200, 429] # OK or Too Many Requests | |
| class TestPerformanceIntegration: | |
| """Test performance characteristics of integrated system""" | |
| async def test_end_to_end_performance(self, async_client, sample_merchant_data): | |
| """Test end-to-end performance of complete user journey""" | |
| import time | |
| with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \ | |
| patch('app.services.merchant.search_merchants') as mock_search: | |
| mock_nlp.process_query.return_value = { | |
| "query": "find salon", | |
| "primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8}, | |
| "entities": {"service_types": ["haircut"]}, | |
| "similar_services": [("salon", 0.9)], | |
| "search_parameters": {"merchant_category": "salon"}, | |
| "processing_time": 0.1 | |
| } | |
| mock_search.return_value = [sample_merchant_data] | |
| start_time = time.time() | |
| # Step 1: NLP processing | |
| nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={ | |
| "query": "find a hair salon" | |
| }) | |
| # Step 2: Merchant search | |
| search_response = await async_client.get("/api/v1/merchants/search", params={ | |
| "category": "salon" | |
| }) | |
| total_time = time.time() - start_time | |
| assert nlp_response.status_code == 200 | |
| assert search_response.status_code == 200 | |
| assert total_time < 3.0 # Complete journey within 3 seconds |