Spaces:
Running
Running
| """ | |
| Property-based tests for sync infrastructure. | |
| Feature: postgres-sync | |
| """ | |
| import pytest | |
| import asyncio | |
| import traceback | |
| from hypothesis import given, strategies as st, settings | |
| from datetime import datetime | |
| from app.sync.common.retry import RetryManager | |
| from app.sync.common.monitoring import SyncMonitoringService | |
| from app.postgres import ( | |
| PostgreSQLConnectionPool, | |
| get_connection_pool_metrics, | |
| reset_connection_pool_metrics | |
| ) | |
| # Strategies for generating test data | |
| entity_type_strategy = st.sampled_from(["merchant", "catalogue", "employee"]) | |
| entity_id_strategy = st.text(min_size=10, max_size=50, alphabet=st.characters( | |
| whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-" | |
| )) | |
| operation_strategy = st.sampled_from(["create", "update", "activate", "deactivate"]) | |
| duration_strategy = st.floats(min_value=0.1, max_value=5000.0) | |
| error_message_strategy = st.text(min_size=5, max_size=200) | |
| async def test_property_retry_exponential_backoff(attempt): | |
| """ | |
| Feature: postgres-sync, Property 3: Sync retry with exponential backoff | |
| For any sync operation that fails due to transient error, the system should | |
| retry up to 3 times with exponentially increasing delays between attempts. | |
| Validates: Requirements 1.5, 7.1 | |
| """ | |
| retry_manager = RetryManager( | |
| max_retries=3, | |
| base_delay=1.0, | |
| max_delay=60.0, | |
| exponential_base=2.0 | |
| ) | |
| # Get delay for this attempt | |
| delay = retry_manager.get_delay(attempt) | |
| # Verify exponential backoff formula: base_delay * (exponential_base ** attempt) | |
| expected_delay = 1.0 * (2.0 ** attempt) | |
| expected_delay = min(expected_delay, 60.0) # Capped at max_delay | |
| assert delay == expected_delay, f"Delay should follow exponential backoff: expected {expected_delay}, got {delay}" | |
| # Verify delay is within bounds | |
| assert delay >= 1.0, "Delay should be at least base_delay" | |
| assert delay <= 60.0, "Delay should not exceed max_delay" | |
| # Verify exponential growth (for attempts within max_delay range) | |
| if attempt > 0 and delay < 60.0: | |
| prev_delay = retry_manager.get_delay(attempt - 1) | |
| assert delay >= prev_delay * 2.0, "Delay should grow exponentially" | |
| async def test_property_success_logging_completeness(entity_type, entity_id, duration_ms): | |
| """ | |
| Feature: postgres-sync, Property 16: Success logging completeness | |
| For any successful sync operation, the log entry should contain entity_type, | |
| entity_id, and duration_ms. | |
| Validates: Requirements 8.1 | |
| """ | |
| monitoring_service = SyncMonitoringService() | |
| # Record a successful sync | |
| monitoring_service.record_sync_success(entity_type, entity_id, duration_ms) | |
| # Get metrics for this entity type | |
| metrics = monitoring_service.get_entity_metrics(entity_type) | |
| # Verify success was recorded | |
| assert metrics["success_count"] == 1, "Success count should be incremented" | |
| assert metrics["total_operations"] == 1, "Total operations should be incremented" | |
| # Verify duration was recorded | |
| assert metrics["average_duration_ms"] == duration_ms, "Duration should be recorded" | |
| # Verify last_sync timestamp was recorded | |
| assert metrics["last_sync"] is not None, "Last sync timestamp should be recorded" | |
| assert isinstance(metrics["last_sync"], datetime), "Last sync should be a datetime" | |
| async def test_property_failure_logging_completeness(entity_type, entity_id, error_message): | |
| """ | |
| Feature: postgres-sync, Property 17: Failure logging completeness | |
| For any failed sync operation, the log entry should contain entity_type, | |
| entity_id, error_message, and stack_trace. | |
| Validates: Requirements 8.2 | |
| """ | |
| monitoring_service = SyncMonitoringService() | |
| # Generate a stack trace | |
| try: | |
| raise Exception(error_message) | |
| except Exception: | |
| stack_trace = traceback.format_exc() | |
| # Record a failed sync | |
| monitoring_service.record_sync_failure(entity_type, entity_id, error_message, stack_trace) | |
| # Get metrics for this entity type | |
| metrics = monitoring_service.get_entity_metrics(entity_type) | |
| # Verify failure was recorded | |
| assert metrics["failure_count"] == 1, "Failure count should be incremented" | |
| assert metrics["total_operations"] == 1, "Total operations should be incremented" | |
| # Verify success count is 0 | |
| assert metrics["success_count"] == 0, "Success count should be 0 for failed operation" | |
| async def test_property_metrics_tracking(entity_type, success_operations, failure_operations): | |
| """ | |
| Feature: postgres-sync, Property 18: Metrics tracking | |
| For any series of sync operations, the metrics should accurately reflect | |
| success_count, failure_count, and average_duration. | |
| Validates: Requirements 8.3 | |
| """ | |
| monitoring_service = SyncMonitoringService() | |
| # Record all successful operations | |
| total_duration = 0.0 | |
| for entity_id, duration_ms in success_operations: | |
| monitoring_service.record_sync_success(entity_type, entity_id, duration_ms) | |
| total_duration += duration_ms | |
| # Record all failed operations | |
| for entity_id, error_message in failure_operations: | |
| monitoring_service.record_sync_failure(entity_type, entity_id, error_message) | |
| # Get metrics | |
| metrics = monitoring_service.get_entity_metrics(entity_type) | |
| # Verify counts | |
| expected_success = len(success_operations) | |
| expected_failure = len(failure_operations) | |
| expected_total = expected_success + expected_failure | |
| assert metrics["success_count"] == expected_success, \ | |
| f"Success count should be {expected_success}, got {metrics['success_count']}" | |
| assert metrics["failure_count"] == expected_failure, \ | |
| f"Failure count should be {expected_failure}, got {metrics['failure_count']}" | |
| assert metrics["total_operations"] == expected_total, \ | |
| f"Total operations should be {expected_total}, got {metrics['total_operations']}" | |
| # Verify average duration | |
| if expected_success > 0: | |
| expected_avg = total_duration / expected_success | |
| assert abs(metrics["average_duration_ms"] - expected_avg) < 0.01, \ | |
| f"Average duration should be {expected_avg}, got {metrics['average_duration_ms']}" | |
| else: | |
| assert metrics["average_duration_ms"] == 0.0, "Average duration should be 0 when no successes" | |
| async def test_property_connection_pool_acquisition_and_release(num_operations): | |
| """ | |
| Feature: postgres-sync, Property 10: Connection pool acquisition and release | |
| For any sync operation, the system should acquire a connection from the pool | |
| before execution and release it after completion. | |
| Validates: Requirements 5.2, 5.3 | |
| """ | |
| # Initialize pool if not already initialized | |
| if not PostgreSQLConnectionPool.is_initialized(): | |
| try: | |
| await PostgreSQLConnectionPool.initialize() | |
| except Exception as e: | |
| pytest.skip(f"PostgreSQL not available: {e}") | |
| # Reset metrics before test | |
| reset_connection_pool_metrics() | |
| # Get initial metrics | |
| initial_metrics = get_connection_pool_metrics() | |
| initial_acquired = initial_metrics["connections_acquired"] | |
| initial_released = initial_metrics["connections_released"] | |
| initial_pool_free = initial_metrics["pool_free"] | |
| # Perform multiple acquire/release cycles | |
| for i in range(num_operations): | |
| conn = None | |
| try: | |
| # Acquire connection | |
| conn = await PostgreSQLConnectionPool.get_connection() | |
| # Verify connection is valid | |
| assert conn is not None, "Connection should not be None" | |
| # Verify connection works (health check) | |
| result = await conn.fetchval("SELECT 1") | |
| assert result == 1, "Connection should be able to execute queries" | |
| finally: | |
| # Always release connection | |
| if conn: | |
| await PostgreSQLConnectionPool.release_connection(conn) | |
| # Get final metrics | |
| final_metrics = get_connection_pool_metrics() | |
| final_acquired = final_metrics["connections_acquired"] | |
| final_released = final_metrics["connections_released"] | |
| final_pool_free = final_metrics["pool_free"] | |
| # Verify acquisition count increased by num_operations | |
| assert final_acquired == initial_acquired + num_operations, \ | |
| f"Acquired count should increase by {num_operations}, " \ | |
| f"expected {initial_acquired + num_operations}, got {final_acquired}" | |
| # Verify release count increased by num_operations | |
| assert final_released == initial_released + num_operations, \ | |
| f"Released count should increase by {num_operations}, " \ | |
| f"expected {initial_released + num_operations}, got {final_released}" | |
| # Verify all connections were returned to pool (pool_free should be same or higher) | |
| # Note: pool_free might be higher if other tests released connections | |
| assert final_pool_free >= initial_pool_free, \ | |
| f"Pool should have at least as many free connections after release, " \ | |
| f"initial: {initial_pool_free}, final: {final_pool_free}" | |
| # Verify no connections leaked (acquired == released) | |
| total_acquired = final_metrics["connections_acquired"] | |
| total_released = final_metrics["connections_released"] | |
| assert total_acquired == total_released, \ | |
| f"All acquired connections should be released, " \ | |
| f"acquired: {total_acquired}, released: {total_released}" | |
| # Verify pool is not exhausted | |
| assert final_metrics["pool_free"] > 0, "Pool should have free connections available" | |
| # Verify acquisition times were tracked | |
| assert final_metrics["avg_acquisition_time_ms"] >= 0, \ | |
| "Average acquisition time should be non-negative" | |