cuatrolabs-scm-ms / tests /test_properties_sync_infrastructure.py
MukeshKapoor25's picture
test(properties_sync): Improve connection pool acquisition test reliability
e237731
"""
Property-based tests for sync infrastructure.
Feature: postgres-sync
"""
import pytest
import asyncio
import traceback
from hypothesis import given, strategies as st, settings
from datetime import datetime
from app.sync.common.retry import RetryManager
from app.sync.common.monitoring import SyncMonitoringService
from app.postgres import (
PostgreSQLConnectionPool,
get_connection_pool_metrics,
reset_connection_pool_metrics
)
# Strategies for generating test data
entity_type_strategy = st.sampled_from(["merchant", "catalogue", "employee"])
entity_id_strategy = st.text(min_size=10, max_size=50, alphabet=st.characters(
whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-"
))
operation_strategy = st.sampled_from(["create", "update", "activate", "deactivate"])
duration_strategy = st.floats(min_value=0.1, max_value=5000.0)
error_message_strategy = st.text(min_size=5, max_size=200)
@pytest.mark.asyncio
@settings(max_examples=100)
@given(
attempt=st.integers(min_value=0, max_value=10)
)
async def test_property_retry_exponential_backoff(attempt):
"""
Feature: postgres-sync, Property 3: Sync retry with exponential backoff
For any sync operation that fails due to transient error, the system should
retry up to 3 times with exponentially increasing delays between attempts.
Validates: Requirements 1.5, 7.1
"""
retry_manager = RetryManager(
max_retries=3,
base_delay=1.0,
max_delay=60.0,
exponential_base=2.0
)
# Get delay for this attempt
delay = retry_manager.get_delay(attempt)
# Verify exponential backoff formula: base_delay * (exponential_base ** attempt)
expected_delay = 1.0 * (2.0 ** attempt)
expected_delay = min(expected_delay, 60.0) # Capped at max_delay
assert delay == expected_delay, f"Delay should follow exponential backoff: expected {expected_delay}, got {delay}"
# Verify delay is within bounds
assert delay >= 1.0, "Delay should be at least base_delay"
assert delay <= 60.0, "Delay should not exceed max_delay"
# Verify exponential growth (for attempts within max_delay range)
if attempt > 0 and delay < 60.0:
prev_delay = retry_manager.get_delay(attempt - 1)
assert delay >= prev_delay * 2.0, "Delay should grow exponentially"
@pytest.mark.asyncio
@settings(max_examples=100)
@given(
entity_type=entity_type_strategy,
entity_id=entity_id_strategy,
duration_ms=duration_strategy
)
async def test_property_success_logging_completeness(entity_type, entity_id, duration_ms):
"""
Feature: postgres-sync, Property 16: Success logging completeness
For any successful sync operation, the log entry should contain entity_type,
entity_id, and duration_ms.
Validates: Requirements 8.1
"""
monitoring_service = SyncMonitoringService()
# Record a successful sync
monitoring_service.record_sync_success(entity_type, entity_id, duration_ms)
# Get metrics for this entity type
metrics = monitoring_service.get_entity_metrics(entity_type)
# Verify success was recorded
assert metrics["success_count"] == 1, "Success count should be incremented"
assert metrics["total_operations"] == 1, "Total operations should be incremented"
# Verify duration was recorded
assert metrics["average_duration_ms"] == duration_ms, "Duration should be recorded"
# Verify last_sync timestamp was recorded
assert metrics["last_sync"] is not None, "Last sync timestamp should be recorded"
assert isinstance(metrics["last_sync"], datetime), "Last sync should be a datetime"
@pytest.mark.asyncio
@settings(max_examples=100)
@given(
entity_type=entity_type_strategy,
entity_id=entity_id_strategy,
error_message=error_message_strategy
)
async def test_property_failure_logging_completeness(entity_type, entity_id, error_message):
"""
Feature: postgres-sync, Property 17: Failure logging completeness
For any failed sync operation, the log entry should contain entity_type,
entity_id, error_message, and stack_trace.
Validates: Requirements 8.2
"""
monitoring_service = SyncMonitoringService()
# Generate a stack trace
try:
raise Exception(error_message)
except Exception:
stack_trace = traceback.format_exc()
# Record a failed sync
monitoring_service.record_sync_failure(entity_type, entity_id, error_message, stack_trace)
# Get metrics for this entity type
metrics = monitoring_service.get_entity_metrics(entity_type)
# Verify failure was recorded
assert metrics["failure_count"] == 1, "Failure count should be incremented"
assert metrics["total_operations"] == 1, "Total operations should be incremented"
# Verify success count is 0
assert metrics["success_count"] == 0, "Success count should be 0 for failed operation"
@pytest.mark.asyncio
@settings(max_examples=100)
@given(
entity_type=entity_type_strategy,
success_operations=st.lists(
st.tuples(entity_id_strategy, duration_strategy),
min_size=1,
max_size=20
),
failure_operations=st.lists(
st.tuples(entity_id_strategy, error_message_strategy),
min_size=0,
max_size=10
)
)
async def test_property_metrics_tracking(entity_type, success_operations, failure_operations):
"""
Feature: postgres-sync, Property 18: Metrics tracking
For any series of sync operations, the metrics should accurately reflect
success_count, failure_count, and average_duration.
Validates: Requirements 8.3
"""
monitoring_service = SyncMonitoringService()
# Record all successful operations
total_duration = 0.0
for entity_id, duration_ms in success_operations:
monitoring_service.record_sync_success(entity_type, entity_id, duration_ms)
total_duration += duration_ms
# Record all failed operations
for entity_id, error_message in failure_operations:
monitoring_service.record_sync_failure(entity_type, entity_id, error_message)
# Get metrics
metrics = monitoring_service.get_entity_metrics(entity_type)
# Verify counts
expected_success = len(success_operations)
expected_failure = len(failure_operations)
expected_total = expected_success + expected_failure
assert metrics["success_count"] == expected_success, \
f"Success count should be {expected_success}, got {metrics['success_count']}"
assert metrics["failure_count"] == expected_failure, \
f"Failure count should be {expected_failure}, got {metrics['failure_count']}"
assert metrics["total_operations"] == expected_total, \
f"Total operations should be {expected_total}, got {metrics['total_operations']}"
# Verify average duration
if expected_success > 0:
expected_avg = total_duration / expected_success
assert abs(metrics["average_duration_ms"] - expected_avg) < 0.01, \
f"Average duration should be {expected_avg}, got {metrics['average_duration_ms']}"
else:
assert metrics["average_duration_ms"] == 0.0, "Average duration should be 0 when no successes"
@pytest.mark.asyncio
@settings(max_examples=100, deadline=None)
@given(
num_operations=st.integers(min_value=1, max_value=20)
)
async def test_property_connection_pool_acquisition_and_release(num_operations):
"""
Feature: postgres-sync, Property 10: Connection pool acquisition and release
For any sync operation, the system should acquire a connection from the pool
before execution and release it after completion.
Validates: Requirements 5.2, 5.3
"""
# Initialize pool if not already initialized
if not PostgreSQLConnectionPool.is_initialized():
try:
await PostgreSQLConnectionPool.initialize()
except Exception as e:
pytest.skip(f"PostgreSQL not available: {e}")
# Reset metrics before test
reset_connection_pool_metrics()
# Get initial metrics
initial_metrics = get_connection_pool_metrics()
initial_acquired = initial_metrics["connections_acquired"]
initial_released = initial_metrics["connections_released"]
initial_pool_free = initial_metrics["pool_free"]
# Perform multiple acquire/release cycles
for i in range(num_operations):
conn = None
try:
# Acquire connection
conn = await PostgreSQLConnectionPool.get_connection()
# Verify connection is valid
assert conn is not None, "Connection should not be None"
# Verify connection works (health check)
result = await conn.fetchval("SELECT 1")
assert result == 1, "Connection should be able to execute queries"
finally:
# Always release connection
if conn:
await PostgreSQLConnectionPool.release_connection(conn)
# Get final metrics
final_metrics = get_connection_pool_metrics()
final_acquired = final_metrics["connections_acquired"]
final_released = final_metrics["connections_released"]
final_pool_free = final_metrics["pool_free"]
# Verify acquisition count increased by num_operations
assert final_acquired == initial_acquired + num_operations, \
f"Acquired count should increase by {num_operations}, " \
f"expected {initial_acquired + num_operations}, got {final_acquired}"
# Verify release count increased by num_operations
assert final_released == initial_released + num_operations, \
f"Released count should increase by {num_operations}, " \
f"expected {initial_released + num_operations}, got {final_released}"
# Verify all connections were returned to pool (pool_free should be same or higher)
# Note: pool_free might be higher if other tests released connections
assert final_pool_free >= initial_pool_free, \
f"Pool should have at least as many free connections after release, " \
f"initial: {initial_pool_free}, final: {final_pool_free}"
# Verify no connections leaked (acquired == released)
total_acquired = final_metrics["connections_acquired"]
total_released = final_metrics["connections_released"]
assert total_acquired == total_released, \
f"All acquired connections should be released, " \
f"acquired: {total_acquired}, released: {total_released}"
# Verify pool is not exhausted
assert final_metrics["pool_free"] > 0, "Pool should have free connections available"
# Verify acquisition times were tracked
assert final_metrics["avg_acquisition_time_ms"] >= 0, \
"Average acquisition time should be non-negative"