bookmyservice-mhs / app /tests /test_integration.py
MukeshKapoor25's picture
test(performance): Add comprehensive test suite for performance optimization
7611990
"""
Integration tests for end-to-end functionality
"""
import pytest
import asyncio
from unittest.mock import patch, AsyncMock, MagicMock
from typing import Dict, Any, List
class TestEndToEndUserJourney:
"""Test complete user journey scenarios"""
@pytest.mark.asyncio
async def test_complete_search_journey(self, async_client, sample_merchant_data):
"""Test complete user search journey from query to results"""
# Mock all dependencies
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \
patch('app.services.merchant.search_merchants') as mock_search:
# Setup NLP pipeline mock
mock_nlp.process_query.return_value = {
"query": "find the best hair salon near me with parking",
"primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.9},
"entities": {
"service_types": ["haircut", "hair styling"],
"amenities": ["parking"],
"location_modifiers": ["near me"],
"quality_indicators": ["best"]
},
"similar_services": [("salon", 0.95), ("beauty", 0.8)],
"search_parameters": {
"merchant_category": "salon",
"amenities": ["parking"],
"radius": 5000,
"min_rating": 4.0
},
"processing_time": 0.234
}
# Setup merchant search mock
mock_search.return_value = [sample_merchant_data]
# Step 1: User submits natural language query
nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": "find the best hair salon near me with parking",
"latitude": 40.7128,
"longitude": -74.0060
})
assert nlp_response.status_code == 200
nlp_data = nlp_response.json()
assert nlp_data["status"] == "success"
assert "analysis" in nlp_data
# Step 2: Use NLP results to search merchants
search_params = nlp_data["analysis"]["search_parameters"]
search_response = await async_client.get("/api/v1/merchants/search", params={
"latitude": 40.7128,
"longitude": -74.0060,
**search_params
})
assert search_response.status_code == 200
search_data = search_response.json()
assert len(search_data) == 1
assert search_data[0]["name"] == "Test Hair Salon"
assert "parking" in search_data[0]["amenities"]
@pytest.mark.asyncio
async def test_merchant_discovery_flow(self, async_client, sample_merchant_data):
"""Test merchant discovery and detail retrieval flow"""
with patch('app.services.merchant.get_merchants') as mock_get_all, \
patch('app.services.merchant.get_merchant_by_id') as mock_get_by_id:
mock_get_all.return_value = [sample_merchant_data]
mock_get_by_id.return_value = sample_merchant_data
# Step 1: Browse all merchants
browse_response = await async_client.get("/api/v1/merchants/")
assert browse_response.status_code == 200
merchants = browse_response.json()
assert len(merchants) == 1
# Step 2: Get detailed information for specific merchant
merchant_id = merchants[0]["_id"]
detail_response = await async_client.get(f"/api/v1/merchants/{merchant_id}")
assert detail_response.status_code == 200
merchant_detail = detail_response.json()
assert merchant_detail["_id"] == merchant_id
assert merchant_detail["name"] == "Test Hair Salon"
assert "services" in merchant_detail
assert "business_hours" in merchant_detail
@pytest.mark.asyncio
async def test_error_recovery_flow(self, async_client):
"""Test error recovery and graceful degradation"""
# Test NLP service failure with fallback
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \
patch('app.services.helper.process_free_text') as mock_fallback:
# NLP service fails
mock_nlp.process_query.side_effect = Exception("NLP service unavailable")
# Fallback service works
mock_fallback.return_value = {
"query": "hair salon",
"extracted_keywords": ["hair", "salon"],
"suggested_category": "salon"
}
# Should gracefully fall back to basic processing
response = await async_client.post("/api/v1/helpers/process-text", json={
"text": "hair salon",
"latitude": 40.7128,
"longitude": -74.0060
})
assert response.status_code == 200
data = response.json()
assert data["suggested_category"] == "salon"
class TestServiceIntegration:
"""Test integration between different services"""
@pytest.mark.asyncio
async def test_nlp_to_search_integration(self, sample_merchant_data):
"""Test integration between NLP processing and merchant search"""
from app.services.advanced_nlp import AdvancedNLPPipeline
from app.services.merchant import search_merchants
with patch('app.repositories.db_repository.search_merchants_in_db') as mock_db_search:
mock_db_search.return_value = [sample_merchant_data]
# Process query with NLP
pipeline = AdvancedNLPPipeline()
nlp_result = await pipeline.process_query("find a hair salon with parking")
# Use NLP results for merchant search
search_params = nlp_result["search_parameters"]
merchants = await search_merchants(**search_params)
assert len(merchants) == 1
assert merchants[0]["category"] == "salon"
@pytest.mark.asyncio
async def test_cache_integration(self, sample_merchant_data):
"""Test cache integration across services"""
from app.services.merchant import get_merchant_by_id
from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data
with patch('app.nosql.get_redis_client') as mock_redis_client, \
patch('app.repositories.db_repository.get_merchant_by_id_from_db') as mock_db:
mock_redis = AsyncMock()
mock_redis_client.return_value = mock_redis
mock_db.return_value = sample_merchant_data
# First call - cache miss, should hit database
mock_redis.get.return_value = None
result1 = await get_merchant_by_id("test_merchant_123")
# Should cache the result
mock_redis.setex.assert_called_once()
# Second call - cache hit, should not hit database
mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}'
result2 = await get_merchant_by_id("test_merchant_123")
assert result1 is not None
assert result2 is not None
@pytest.mark.asyncio
async def test_database_to_api_integration(self, sample_merchant_data):
"""Test integration from database layer to API layer"""
from app.repositories.db_repository import get_merchants_from_db
from app.services.merchant import get_merchants
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find.return_value.limit.return_value.skip.return_value.to_list.return_value = [sample_merchant_data]
# Database layer
db_result = await get_merchants_from_db(limit=10, skip=0)
# Service layer
service_result = await get_merchants(limit=10, skip=0)
assert len(db_result) == 1
assert len(service_result) == 1
assert db_result[0]["name"] == service_result[0]["name"]
class TestDataFlowIntegration:
"""Test data flow through the entire system"""
@pytest.mark.asyncio
async def test_search_data_flow(self, async_client, sample_merchant_data):
"""Test complete search data flow"""
with patch('app.nosql.get_mongodb_client') as mock_mongo, \
patch('app.nosql.get_redis_client') as mock_redis_client:
# Setup MongoDB mock
mock_collection = AsyncMock()
mock_mongo.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data]
# Setup Redis mock
mock_redis = AsyncMock()
mock_redis_client.return_value = mock_redis
mock_redis.get.return_value = None # Cache miss
# Make search request
response = await async_client.get("/api/v1/merchants/search", params={
"latitude": 40.7128,
"longitude": -74.0060,
"radius": 5000,
"category": "salon"
})
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["name"] == "Test Hair Salon"
# Verify data flowed through all layers
mock_collection.find.assert_called_once() # Database was queried
mock_redis.setex.assert_called_once() # Result was cached
@pytest.mark.asyncio
async def test_nlp_processing_data_flow(self, async_client):
"""Test NLP processing data flow"""
with patch('app.services.advanced_nlp.IntentClassifier') as mock_intent, \
patch('app.services.advanced_nlp.BusinessEntityExtractor') as mock_entity, \
patch('app.services.advanced_nlp.SemanticMatcher') as mock_semantic:
# Setup mocks
mock_intent_instance = MagicMock()
mock_intent.return_value = mock_intent_instance
mock_intent_instance.get_primary_intent.return_value = ("SEARCH_SERVICE", 0.9)
mock_entity_instance = MagicMock()
mock_entity.return_value = mock_entity_instance
mock_entity_instance.extract_entities.return_value = {"service_types": ["haircut"]}
mock_semantic_instance = MagicMock()
mock_semantic.return_value = mock_semantic_instance
mock_semantic_instance.find_similar_services.return_value = [("salon", 0.9)]
response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": "find a hair salon"
})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
# Verify data flowed through NLP pipeline
mock_intent_instance.get_primary_intent.assert_called_once()
mock_entity_instance.extract_entities.assert_called_once()
mock_semantic_instance.find_similar_services.assert_called_once()
class TestConcurrencyIntegration:
"""Test concurrent operations and race conditions"""
@pytest.mark.asyncio
async def test_concurrent_cache_operations(self, sample_merchant_data):
"""Test concurrent cache read/write operations"""
from app.repositories.cache_repository import cache_merchant_data, get_cached_merchant_data
with patch('app.nosql.get_redis_client') as mock_redis_client:
mock_redis = AsyncMock()
mock_redis_client.return_value = mock_redis
mock_redis.setex.return_value = True
mock_redis.get.return_value = '{"_id": "test_merchant_123", "name": "Test Hair Salon"}'
# Create concurrent cache operations
cache_tasks = [
cache_merchant_data(f"merchant_{i}", sample_merchant_data)
for i in range(10)
]
read_tasks = [
get_cached_merchant_data(f"merchant_{i}")
for i in range(10)
]
# Execute concurrently
cache_results = await asyncio.gather(*cache_tasks)
read_results = await asyncio.gather(*read_tasks)
assert all(result is True for result in cache_results)
assert all(result is not None for result in read_results)
@pytest.mark.asyncio
async def test_concurrent_nlp_processing(self):
"""Test concurrent NLP processing"""
from app.services.advanced_nlp import AdvancedNLPPipeline
pipeline = AdvancedNLPPipeline()
queries = [
"find a hair salon",
"best spa near me",
"gym with parking",
"dental clinic",
"massage therapy"
]
# Process queries concurrently
tasks = [pipeline.process_query(query) for query in queries]
results = await asyncio.gather(*tasks)
assert len(results) == 5
assert all("query" in result for result in results)
assert all("primary_intent" in result for result in results)
@pytest.mark.asyncio
async def test_concurrent_database_operations(self, sample_merchant_data):
"""Test concurrent database operations"""
from app.repositories.db_repository import get_merchant_by_id_from_db, search_merchants_in_db
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_collection = AsyncMock()
mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection
mock_collection.find_one.return_value = sample_merchant_data
mock_collection.find.return_value.limit.return_value.to_list.return_value = [sample_merchant_data]
# Mix of different database operations
tasks = []
# Add get by ID tasks
for i in range(5):
tasks.append(get_merchant_by_id_from_db(f"merchant_{i}"))
# Add search tasks
for i in range(5):
tasks.append(search_merchants_in_db(
latitude=40.7128 + (i * 0.01),
longitude=-74.0060 + (i * 0.01),
radius=5000
))
results = await asyncio.gather(*tasks)
assert len(results) == 10
# First 5 should be single merchants
assert all(isinstance(result, dict) for result in results[:5])
# Last 5 should be lists of merchants
assert all(isinstance(result, list) for result in results[5:])
class TestErrorHandlingIntegration:
"""Test error handling across integrated components"""
@pytest.mark.asyncio
async def test_database_failure_propagation(self, async_client):
"""Test how database failures propagate through the system"""
with patch('app.nosql.get_mongodb_client') as mock_client:
mock_client.side_effect = Exception("Database connection failed")
response = await async_client.get("/api/v1/merchants/")
assert response.status_code == 500
@pytest.mark.asyncio
async def test_cache_failure_graceful_degradation(self, async_client, sample_merchant_data):
"""Test graceful degradation when cache fails"""
with patch('app.nosql.get_redis_client') as mock_redis_client, \
patch('app.repositories.db_repository.get_merchants_from_db') as mock_db:
# Cache fails
mock_redis_client.side_effect = Exception("Redis connection failed")
# Database works
mock_db.return_value = [sample_merchant_data]
response = await async_client.get("/api/v1/merchants/")
# Should still work, just without caching
assert response.status_code == 200
data = response.json()
assert len(data) == 1
@pytest.mark.asyncio
async def test_nlp_service_fallback(self, async_client):
"""Test fallback when NLP service fails"""
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \
patch('app.services.helper.process_free_text') as mock_fallback:
# NLP service fails
mock_nlp.process_query.side_effect = Exception("NLP service error")
# Fallback service works
mock_fallback.return_value = {
"query": "hair salon",
"extracted_keywords": ["hair", "salon"],
"suggested_category": "salon"
}
# Try NLP endpoint first (should fail)
nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": "hair salon"
})
assert nlp_response.status_code == 500
# Use fallback endpoint (should work)
fallback_response = await async_client.post("/api/v1/helpers/process-text", json={
"text": "hair salon",
"latitude": 40.7128,
"longitude": -74.0060
})
assert fallback_response.status_code == 200
class TestSecurityIntegration:
"""Test security measures across integrated components"""
@pytest.mark.asyncio
async def test_input_sanitization_flow(self, async_client):
"""Test input sanitization across the request flow"""
# Test with potentially malicious input
malicious_query = "<script>alert('xss')</script>find salon"
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp:
mock_nlp.process_query.return_value = {
"query": "find salon", # Should be sanitized
"primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8},
"entities": {},
"similar_services": [],
"search_parameters": {},
"processing_time": 0.1
}
response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": malicious_query
})
assert response.status_code == 200
data = response.json()
# Script tags should be removed/sanitized
assert "<script>" not in data["analysis"]["query"]
@pytest.mark.asyncio
async def test_rate_limiting_integration(self, async_client):
"""Test rate limiting across endpoints"""
# This would require actual rate limiting implementation
# For now, test that multiple requests don't crash the system
tasks = [
async_client.get("/health")
for _ in range(50)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
# All requests should either succeed or be rate limited (not crash)
for response in responses:
if hasattr(response, 'status_code'):
assert response.status_code in [200, 429] # OK or Too Many Requests
class TestPerformanceIntegration:
"""Test performance characteristics of integrated system"""
@pytest.mark.asyncio
async def test_end_to_end_performance(self, async_client, sample_merchant_data):
"""Test end-to-end performance of complete user journey"""
import time
with patch('app.services.advanced_nlp.advanced_nlp_pipeline') as mock_nlp, \
patch('app.services.merchant.search_merchants') as mock_search:
mock_nlp.process_query.return_value = {
"query": "find salon",
"primary_intent": {"intent": "SEARCH_SERVICE", "confidence": 0.8},
"entities": {"service_types": ["haircut"]},
"similar_services": [("salon", 0.9)],
"search_parameters": {"merchant_category": "salon"},
"processing_time": 0.1
}
mock_search.return_value = [sample_merchant_data]
start_time = time.time()
# Step 1: NLP processing
nlp_response = await async_client.post("/api/v1/nlp/analyze-query", params={
"query": "find a hair salon"
})
# Step 2: Merchant search
search_response = await async_client.get("/api/v1/merchants/search", params={
"category": "salon"
})
total_time = time.time() - start_time
assert nlp_response.status_code == 200
assert search_response.status_code == 200
assert total_time < 3.0 # Complete journey within 3 seconds