""" Property-based tests for sync infrastructure. Feature: postgres-sync """ import pytest import asyncio import traceback from hypothesis import given, strategies as st, settings from datetime import datetime from app.sync.common.retry import RetryManager from app.sync.common.monitoring import SyncMonitoringService from app.postgres import ( PostgreSQLConnectionPool, get_connection_pool_metrics, reset_connection_pool_metrics ) # Strategies for generating test data entity_type_strategy = st.sampled_from(["merchant", "catalogue", "employee"]) entity_id_strategy = st.text(min_size=10, max_size=50, alphabet=st.characters( whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-" )) operation_strategy = st.sampled_from(["create", "update", "activate", "deactivate"]) duration_strategy = st.floats(min_value=0.1, max_value=5000.0) error_message_strategy = st.text(min_size=5, max_size=200) @pytest.mark.asyncio @settings(max_examples=100) @given( attempt=st.integers(min_value=0, max_value=10) ) async def test_property_retry_exponential_backoff(attempt): """ Feature: postgres-sync, Property 3: Sync retry with exponential backoff For any sync operation that fails due to transient error, the system should retry up to 3 times with exponentially increasing delays between attempts. Validates: Requirements 1.5, 7.1 """ retry_manager = RetryManager( max_retries=3, base_delay=1.0, max_delay=60.0, exponential_base=2.0 ) # Get delay for this attempt delay = retry_manager.get_delay(attempt) # Verify exponential backoff formula: base_delay * (exponential_base ** attempt) expected_delay = 1.0 * (2.0 ** attempt) expected_delay = min(expected_delay, 60.0) # Capped at max_delay assert delay == expected_delay, f"Delay should follow exponential backoff: expected {expected_delay}, got {delay}" # Verify delay is within bounds assert delay >= 1.0, "Delay should be at least base_delay" assert delay <= 60.0, "Delay should not exceed max_delay" # Verify exponential growth (for attempts within max_delay range) if attempt > 0 and delay < 60.0: prev_delay = retry_manager.get_delay(attempt - 1) assert delay >= prev_delay * 2.0, "Delay should grow exponentially" @pytest.mark.asyncio @settings(max_examples=100) @given( entity_type=entity_type_strategy, entity_id=entity_id_strategy, duration_ms=duration_strategy ) async def test_property_success_logging_completeness(entity_type, entity_id, duration_ms): """ Feature: postgres-sync, Property 16: Success logging completeness For any successful sync operation, the log entry should contain entity_type, entity_id, and duration_ms. Validates: Requirements 8.1 """ monitoring_service = SyncMonitoringService() # Record a successful sync monitoring_service.record_sync_success(entity_type, entity_id, duration_ms) # Get metrics for this entity type metrics = monitoring_service.get_entity_metrics(entity_type) # Verify success was recorded assert metrics["success_count"] == 1, "Success count should be incremented" assert metrics["total_operations"] == 1, "Total operations should be incremented" # Verify duration was recorded assert metrics["average_duration_ms"] == duration_ms, "Duration should be recorded" # Verify last_sync timestamp was recorded assert metrics["last_sync"] is not None, "Last sync timestamp should be recorded" assert isinstance(metrics["last_sync"], datetime), "Last sync should be a datetime" @pytest.mark.asyncio @settings(max_examples=100) @given( entity_type=entity_type_strategy, entity_id=entity_id_strategy, error_message=error_message_strategy ) async def test_property_failure_logging_completeness(entity_type, entity_id, error_message): """ Feature: postgres-sync, Property 17: Failure logging completeness For any failed sync operation, the log entry should contain entity_type, entity_id, error_message, and stack_trace. Validates: Requirements 8.2 """ monitoring_service = SyncMonitoringService() # Generate a stack trace try: raise Exception(error_message) except Exception: stack_trace = traceback.format_exc() # Record a failed sync monitoring_service.record_sync_failure(entity_type, entity_id, error_message, stack_trace) # Get metrics for this entity type metrics = monitoring_service.get_entity_metrics(entity_type) # Verify failure was recorded assert metrics["failure_count"] == 1, "Failure count should be incremented" assert metrics["total_operations"] == 1, "Total operations should be incremented" # Verify success count is 0 assert metrics["success_count"] == 0, "Success count should be 0 for failed operation" @pytest.mark.asyncio @settings(max_examples=100) @given( entity_type=entity_type_strategy, success_operations=st.lists( st.tuples(entity_id_strategy, duration_strategy), min_size=1, max_size=20 ), failure_operations=st.lists( st.tuples(entity_id_strategy, error_message_strategy), min_size=0, max_size=10 ) ) async def test_property_metrics_tracking(entity_type, success_operations, failure_operations): """ Feature: postgres-sync, Property 18: Metrics tracking For any series of sync operations, the metrics should accurately reflect success_count, failure_count, and average_duration. Validates: Requirements 8.3 """ monitoring_service = SyncMonitoringService() # Record all successful operations total_duration = 0.0 for entity_id, duration_ms in success_operations: monitoring_service.record_sync_success(entity_type, entity_id, duration_ms) total_duration += duration_ms # Record all failed operations for entity_id, error_message in failure_operations: monitoring_service.record_sync_failure(entity_type, entity_id, error_message) # Get metrics metrics = monitoring_service.get_entity_metrics(entity_type) # Verify counts expected_success = len(success_operations) expected_failure = len(failure_operations) expected_total = expected_success + expected_failure assert metrics["success_count"] == expected_success, \ f"Success count should be {expected_success}, got {metrics['success_count']}" assert metrics["failure_count"] == expected_failure, \ f"Failure count should be {expected_failure}, got {metrics['failure_count']}" assert metrics["total_operations"] == expected_total, \ f"Total operations should be {expected_total}, got {metrics['total_operations']}" # Verify average duration if expected_success > 0: expected_avg = total_duration / expected_success assert abs(metrics["average_duration_ms"] - expected_avg) < 0.01, \ f"Average duration should be {expected_avg}, got {metrics['average_duration_ms']}" else: assert metrics["average_duration_ms"] == 0.0, "Average duration should be 0 when no successes" @pytest.mark.asyncio @settings(max_examples=100, deadline=None) @given( num_operations=st.integers(min_value=1, max_value=20) ) async def test_property_connection_pool_acquisition_and_release(num_operations): """ Feature: postgres-sync, Property 10: Connection pool acquisition and release For any sync operation, the system should acquire a connection from the pool before execution and release it after completion. Validates: Requirements 5.2, 5.3 """ # Initialize pool if not already initialized if not PostgreSQLConnectionPool.is_initialized(): try: await PostgreSQLConnectionPool.initialize() except Exception as e: pytest.skip(f"PostgreSQL not available: {e}") # Reset metrics before test reset_connection_pool_metrics() # Get initial metrics initial_metrics = get_connection_pool_metrics() initial_acquired = initial_metrics["connections_acquired"] initial_released = initial_metrics["connections_released"] initial_pool_free = initial_metrics["pool_free"] # Perform multiple acquire/release cycles for i in range(num_operations): conn = None try: # Acquire connection conn = await PostgreSQLConnectionPool.get_connection() # Verify connection is valid assert conn is not None, "Connection should not be None" # Verify connection works (health check) result = await conn.fetchval("SELECT 1") assert result == 1, "Connection should be able to execute queries" finally: # Always release connection if conn: await PostgreSQLConnectionPool.release_connection(conn) # Get final metrics final_metrics = get_connection_pool_metrics() final_acquired = final_metrics["connections_acquired"] final_released = final_metrics["connections_released"] final_pool_free = final_metrics["pool_free"] # Verify acquisition count increased by num_operations assert final_acquired == initial_acquired + num_operations, \ f"Acquired count should increase by {num_operations}, " \ f"expected {initial_acquired + num_operations}, got {final_acquired}" # Verify release count increased by num_operations assert final_released == initial_released + num_operations, \ f"Released count should increase by {num_operations}, " \ f"expected {initial_released + num_operations}, got {final_released}" # Verify all connections were returned to pool (pool_free should be same or higher) # Note: pool_free might be higher if other tests released connections assert final_pool_free >= initial_pool_free, \ f"Pool should have at least as many free connections after release, " \ f"initial: {initial_pool_free}, final: {final_pool_free}" # Verify no connections leaked (acquired == released) total_acquired = final_metrics["connections_acquired"] total_released = final_metrics["connections_released"] assert total_acquired == total_released, \ f"All acquired connections should be released, " \ f"acquired: {total_acquired}, released: {total_released}" # Verify pool is not exhausted assert final_metrics["pool_free"] > 0, "Pool should have free connections available" # Verify acquisition times were tracked assert final_metrics["avg_acquisition_time_ms"] >= 0, \ "Average acquisition time should be non-negative"