Spaces:
Sleeping
Sleeping
| """ | |
| Regression tests for database operations and repositories | |
| """ | |
| import pytest | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| from typing import Dict, Any, List | |
| import asyncio | |
| class TestMongoDBOperations: | |
| """Test MongoDB database operations""" | |
| async def test_mongodb_connection_health(self): | |
| """Test MongoDB connection health check""" | |
| from app.nosql import check_mongodb_health | |
| with patch('app.nosql.db') as mock_db: | |
| mock_db.command.return_value = {"ok": 1} | |
| result = await check_mongodb_health() | |
| assert result is True | |
| async def test_mongodb_connection_failure(self): | |
| """Test MongoDB connection failure handling""" | |
| from app.nosql import check_mongodb_health | |
| with patch('app.nosql.db') as mock_db: | |
| mock_db.command.side_effect = Exception("Connection failed") | |
| result = await check_mongodb_health() | |
| assert result is False | |
| async def test_fetch_documents(self, sample_merchant_data): | |
| """Test retrieving documents from database""" | |
| from app.repositories.db_repository import fetch_documents | |
| with patch('app.nosql.db') as mock_db: | |
| mock_collection = AsyncMock() | |
| mock_db.__getitem__.return_value = mock_collection | |
| mock_collection.count_documents.return_value = 1 | |
| mock_collection.find.return_value.skip.return_value.limit.return_value.to_list.return_value = [sample_merchant_data] | |
| result = await fetch_documents("merchants", {}, {}, 0, 10) | |
| assert result["total"] == 1 | |
| assert len(result["documents"]) == 1 | |
| assert result["documents"][0]["name"] == "Test Hair Salon" | |
| async def test_execute_query(self, sample_merchant_data): | |
| """Test executing aggregation query""" | |
| from app.repositories.db_repository import execute_query | |
| with patch('app.nosql.db') as mock_db: | |
| mock_collection = AsyncMock() | |
| mock_db.__getitem__.return_value = mock_collection | |
| mock_collection.aggregate.return_value.to_list.return_value = [sample_merchant_data] | |
| pipeline = [{"$match": {"_id": "test_merchant_123"}}] | |
| result = await execute_query("merchants", pipeline, use_optimization=False) | |
| assert len(result) == 1 | |
| assert result[0]["name"] == "Test Hair Salon" | |
| async def test_count_documents(self): | |
| """Test counting documents in collection""" | |
| from app.repositories.db_repository import count_documents | |
| with patch('app.nosql.db') as mock_db: | |
| mock_collection = AsyncMock() | |
| mock_db.__getitem__.return_value = mock_collection | |
| mock_collection.count_documents.return_value = 5 | |
| result = await count_documents("merchants", {"category": "salon"}) | |
| assert result == 5 | |
| mock_collection.count_documents.assert_called_once_with({"category": "salon"}) | |
| async def test_serialize_mongo_document(self): | |
| """Test MongoDB document serialization""" | |
| from app.repositories.db_repository import serialize_mongo_document | |
| from bson import ObjectId | |
| from datetime import datetime | |
| test_doc = { | |
| "_id": ObjectId("507f1f77bcf86cd799439011"), | |
| "name": "Test Merchant", | |
| "created_at": datetime(2024, 1, 1, 12, 0, 0), | |
| "nested": { | |
| "id": ObjectId("507f1f77bcf86cd799439012"), | |
| "date": datetime(2024, 1, 2, 12, 0, 0) | |
| } | |
| } | |
| result = serialize_mongo_document(test_doc) | |
| assert isinstance(result["_id"], str) | |
| assert isinstance(result["created_at"], str) | |
| assert isinstance(result["nested"]["id"], str) | |
| assert isinstance(result["nested"]["date"], str) | |
| class TestRedisOperations: | |
| """Test Redis cache operations""" | |
| async def test_redis_connection_health(self): | |
| """Test Redis connection health check""" | |
| from app.nosql import check_redis_health | |
| with patch('app.nosql.get_redis_client') as mock_client: | |
| mock_redis = AsyncMock() | |
| mock_client.return_value = mock_redis | |
| mock_redis.ping.return_value = True | |
| result = await check_redis_health() | |
| assert result is True | |
| async def test_redis_connection_failure(self): | |
| """Test Redis connection failure handling""" | |
| from app.nosql import check_redis_health | |
| with patch('app.nosql.get_redis_client') as mock_client: | |
| mock_client.side_effect = Exception("Redis connection failed") | |
| result = await check_redis_health() | |
| assert result is False | |
| async def test_cache_merchant_data(self, sample_merchant_data): | |
| """Test caching merchant data in Redis""" | |
| from app.repositories.cache_repository import cache_merchant_data | |
| with patch('app.nosql.get_redis_client') as mock_client: | |
| mock_redis = AsyncMock() | |
| mock_client.return_value = mock_redis | |
| mock_redis.setex.return_value = True | |
| result = await cache_merchant_data("test_merchant_123", sample_merchant_data) | |
| assert result is True | |
| mock_redis.setex.assert_called_once() | |
| async def test_cache_manager_get_or_set(self, sample_merchant_data): | |
| """Test cache manager get_or_set functionality""" | |
| from app.repositories.cache_repository import cache_manager | |
| import json | |
| with patch('app.nosql.redis_client') as mock_redis: | |
| mock_redis.get.return_value = None # Cache miss | |
| mock_redis.set.return_value = True | |
| async def fetch_func(): | |
| return sample_merchant_data | |
| result = await cache_manager.get_or_set_cache("test_key", fetch_func) | |
| assert result["name"] == "Test Hair Salon" | |
| mock_redis.set.assert_called_once() | |
| async def test_cache_manager_hit(self, sample_merchant_data): | |
| """Test cache manager cache hit""" | |
| from app.repositories.cache_repository import cache_manager | |
| import json | |
| with patch('app.nosql.redis_client') as mock_redis: | |
| mock_redis.get.return_value = json.dumps(sample_merchant_data) | |
| async def fetch_func(): | |
| return {"should": "not_be_called"} | |
| result = await cache_manager.get_or_set_cache("test_key", fetch_func) | |
| assert result["name"] == "Test Hair Salon" | |
| mock_redis.get.assert_called_once() | |
| async def test_cache_manager_invalidate(self): | |
| """Test cache invalidation""" | |
| from app.repositories.cache_repository import cache_manager | |
| with patch('app.nosql.redis_client') as mock_redis: | |
| mock_redis.delete.return_value = 1 | |
| await cache_manager.invalidate_cache("test_key") | |
| mock_redis.delete.assert_called_once() | |
| async def test_invalidate_cache(self): | |
| """Test cache invalidation""" | |
| from app.repositories.cache_repository import invalidate_cache | |
| with patch('app.nosql.get_redis_client') as mock_client: | |
| mock_redis = AsyncMock() | |
| mock_client.return_value = mock_redis | |
| mock_redis.delete.return_value = 1 | |
| result = await invalidate_cache("test_key") | |
| assert result is True | |
| mock_redis.delete.assert_called_once_with("test_key") | |
| class TestDatabaseIndexes: | |
| """Test database indexing functionality""" | |
| async def test_create_geospatial_index(self): | |
| """Test creating geospatial index""" | |
| from app.database.indexes import create_geospatial_index | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.create_index.return_value = "location_2dsphere" | |
| result = await create_geospatial_index() | |
| assert result == "location_2dsphere" | |
| mock_collection.create_index.assert_called_once() | |
| async def test_create_text_search_index(self): | |
| """Test creating text search index""" | |
| from app.database.indexes import create_text_search_index | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.create_index.return_value = "text_search_index" | |
| result = await create_text_search_index() | |
| assert result == "text_search_index" | |
| mock_collection.create_index.assert_called_once() | |
| async def test_create_category_index(self): | |
| """Test creating category index""" | |
| from app.database.indexes import create_category_index | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.create_index.return_value = "category_index" | |
| result = await create_category_index() | |
| assert result == "category_index" | |
| mock_collection.create_index.assert_called_once() | |
| class TestQueryOptimization: | |
| """Test database query optimization""" | |
| async def test_optimized_merchant_search(self, sample_merchant_data): | |
| """Test optimized merchant search query""" | |
| from app.database.query_optimizer import optimize_search_query | |
| search_params = { | |
| "category": "salon", | |
| "latitude": 40.7128, | |
| "longitude": -74.0060, | |
| "radius": 5000, | |
| "min_rating": 4.0 | |
| } | |
| optimized_query = optimize_search_query(search_params) | |
| assert isinstance(optimized_query, dict) | |
| assert "location" in optimized_query | |
| assert "category" in optimized_query | |
| assert "average_rating" in optimized_query | |
| async def test_query_performance_analysis(self): | |
| """Test query performance analysis""" | |
| from app.database.query_optimizer import analyze_query_performance | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| # Mock explain output | |
| mock_collection.find.return_value.explain.return_value = { | |
| "executionStats": { | |
| "totalDocsExamined": 100, | |
| "totalDocsReturned": 10, | |
| "executionTimeMillis": 50 | |
| } | |
| } | |
| query = {"category": "salon"} | |
| result = await analyze_query_performance(query) | |
| assert "executionStats" in result | |
| assert result["executionStats"]["totalDocsExamined"] == 100 | |
| class TestDatabaseTransactions: | |
| """Test database transaction handling""" | |
| async def test_merchant_creation_transaction(self, sample_merchant_data): | |
| """Test merchant creation with transaction""" | |
| from app.repositories.db_repository import create_merchant_with_transaction | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_session = AsyncMock() | |
| mock_client.return_value.start_session.return_value.__aenter__.return_value = mock_session | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.insert_one.return_value.inserted_id = "new_merchant_id" | |
| result = await create_merchant_with_transaction(sample_merchant_data) | |
| assert result == "new_merchant_id" | |
| mock_session.start_transaction.assert_called_once() | |
| async def test_transaction_rollback(self, sample_merchant_data): | |
| """Test transaction rollback on error""" | |
| from app.repositories.db_repository import create_merchant_with_transaction | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_session = AsyncMock() | |
| mock_client.return_value.start_session.return_value.__aenter__.return_value = mock_session | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.insert_one.side_effect = Exception("Insert failed") | |
| with pytest.raises(Exception): | |
| await create_merchant_with_transaction(sample_merchant_data) | |
| mock_session.abort_transaction.assert_called_once() | |
| class TestDatabasePerformance: | |
| """Test database performance characteristics""" | |
| async def test_concurrent_database_operations(self, sample_merchant_data): | |
| """Test concurrent database operations""" | |
| from app.repositories.db_repository import get_merchant_by_id_from_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find_one.return_value = sample_merchant_data | |
| # Create multiple concurrent requests | |
| tasks = [ | |
| get_merchant_by_id_from_db(f"merchant_{i}") | |
| for i in range(20) | |
| ] | |
| results = await asyncio.gather(*tasks) | |
| assert len(results) == 20 | |
| assert all(result["name"] == "Test Hair Salon" for result in results) | |
| async def test_large_result_set_handling(self): | |
| """Test handling of large result sets""" | |
| from app.repositories.db_repository import get_merchants_from_db | |
| # Create mock data for large result set | |
| large_dataset = [{"_id": f"merchant_{i}", "name": f"Merchant {i}"} for i in range(1000)] | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.return_value.limit.return_value.skip.return_value.to_list.return_value = large_dataset[:100] | |
| result = await get_merchants_from_db(limit=100, skip=0) | |
| assert len(result) == 100 | |
| # Verify pagination was applied | |
| mock_collection.find.return_value.limit.assert_called_with(100) | |
| async def test_connection_pool_management(self): | |
| """Test database connection pool management""" | |
| from app.nosql import get_mongodb_client | |
| # Test multiple client requests | |
| clients = [] | |
| for _ in range(10): | |
| client = get_mongodb_client() | |
| clients.append(client) | |
| # All clients should be the same instance (singleton pattern) | |
| assert all(client is clients[0] for client in clients) | |
| class TestDatabaseErrorHandling: | |
| """Test database error handling scenarios""" | |
| async def test_connection_timeout_handling(self): | |
| """Test handling of connection timeouts""" | |
| from app.repositories.db_repository import get_merchants_from_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_client.side_effect = Exception("Connection timeout") | |
| with pytest.raises(Exception) as exc_info: | |
| await get_merchants_from_db() | |
| assert "Connection timeout" in str(exc_info.value) | |
| async def test_invalid_query_handling(self): | |
| """Test handling of invalid queries""" | |
| from app.repositories.db_repository import search_merchants_in_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.find.side_effect = Exception("Invalid query") | |
| with pytest.raises(Exception) as exc_info: | |
| await search_merchants_in_db(latitude=200, longitude=200) # Invalid coordinates | |
| assert "Invalid query" in str(exc_info.value) | |
| async def test_duplicate_key_error_handling(self, sample_merchant_data): | |
| """Test handling of duplicate key errors""" | |
| from app.repositories.db_repository import create_merchant_in_db | |
| with patch('app.nosql.get_mongodb_client') as mock_client: | |
| mock_collection = MagicMock() | |
| mock_client.return_value.__getitem__.return_value.__getitem__.return_value = mock_collection | |
| mock_collection.insert_one.side_effect = Exception("Duplicate key error") | |
| with pytest.raises(Exception) as exc_info: | |
| await create_merchant_in_db(sample_merchant_data) | |
| assert "Duplicate key error" in str(exc_info.value) |