Spaces:
Runtime error
Runtime error
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99 | """ | |
| Property-based tests for employee sync operations. | |
| Feature: postgres-sync | |
| """ | |
| import pytest | |
| import asyncio | |
| from hypothesis import given, strategies as st, settings as hyp_settings | |
| from datetime import datetime, timedelta, date | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import asyncpg | |
| from app.sync.employees.handler import EmployeeSyncHandler | |
| from app.constants.collections import SCM_EMPLOYEES_COLLECTION | |
| from app.core.config import settings | |
| # Test database names | |
| TEST_DB_NAME = "scm_test_db_employee_sync" | |
| # Strategies for generating test data | |
| employee_id_strategy = st.text( | |
| min_size=10, max_size=50, | |
| alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-") | |
| ) | |
| employee_code_strategy = st.text( | |
| min_size=5, max_size=20, | |
| alphabet=st.characters(whitelist_categories=("Lu", "Nd"), whitelist_characters="-") | |
| ) | |
| first_name_strategy = st.text( | |
| min_size=2, max_size=30, | |
| alphabet=st.characters(whitelist_categories=("Lu", "Ll")) | |
| ) | |
| last_name_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=2, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll"))) | |
| ) | |
| designation_strategy = st.sampled_from(["ASM", "RSM", "TSM", "DSM", "Manager", "Executive"]) | |
| status_strategy = st.sampled_from(["active", "inactive", "onboarding", "suspended"]) | |
| city_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=3, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll"))) | |
| ) | |
| state_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=3, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll"))) | |
| ) | |
| manager_id_strategy = st.one_of( | |
| st.none(), | |
| employee_id_strategy | |
| ) | |
| department_strategy = st.one_of( | |
| st.none(), | |
| st.sampled_from(["Sales", "Operations", "Finance", "HR", "IT"]) | |
| ) | |
| system_login_enabled_strategy = st.booleans() | |
| def generate_employee_document( | |
| user_id: str, | |
| employee_code: str, | |
| first_name: str, | |
| designation: str, | |
| status: str, | |
| last_name: str = None, | |
| city: str = None, | |
| state: str = None, | |
| manager_id: str = None, | |
| department: str = None, | |
| system_login_enabled: bool = False, | |
| created_at: datetime = None, | |
| updated_at: datetime = None | |
| ): | |
| """Generate a valid employee document for MongoDB""" | |
| if created_at is None: | |
| created_at = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| if updated_at is None: | |
| updated_at = created_at | |
| return { | |
| "user_id": user_id, | |
| "employee_code": employee_code, | |
| "first_name": first_name, | |
| "last_name": last_name, | |
| "email": f"{employee_code.lower()}@test.com", | |
| "phone": "+919876543210", | |
| "designation": designation, | |
| "manager_id": manager_id, | |
| "base_city": city or "Mumbai", | |
| "base_state": state or "Maharashtra", | |
| "base_country": "India", | |
| "region": "Western", | |
| "dob": date(1990, 1, 1), | |
| "doj": date(2023, 1, 1), | |
| "department": department, | |
| "emergency_contact": { | |
| "name": "Emergency Contact", | |
| "phone": "+919876543211" | |
| }, | |
| "status": status, | |
| "app_access": { | |
| "has_mobile_app": system_login_enabled, | |
| "requires_2fa": False, | |
| "roles": ["field_sales"] if system_login_enabled else [] | |
| }, | |
| "location_settings": { | |
| "location_tracking_consent": False | |
| }, | |
| "created_by": "test_user", | |
| "created_at": created_at, | |
| "updated_at": updated_at | |
| } | |
| async def setup_test_db(): | |
| """Setup test database and PostgreSQL connection""" | |
| # MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[TEST_DB_NAME] | |
| # PostgreSQL | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| database=settings.POSTGRES_DB, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD | |
| ) | |
| # Create schema and table | |
| await pg_conn.execute("CREATE SCHEMA IF NOT EXISTS trans") | |
| await pg_conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.employees_ref ( | |
| employee_id TEXT PRIMARY KEY, | |
| employee_code TEXT NOT NULL, | |
| full_name TEXT NOT NULL, | |
| designation TEXT NOT NULL, | |
| department TEXT, | |
| manager_id TEXT, | |
| city TEXT, | |
| state TEXT, | |
| status TEXT NOT NULL, | |
| system_login_enabled BOOLEAN, | |
| created_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| return mongo_client, mongo_db, pg_conn | |
| async def teardown_test_db(mongo_client, mongo_db, pg_conn): | |
| """Cleanup test database and connections""" | |
| # Drop MongoDB test database | |
| await mongo_client.drop_database(TEST_DB_NAME) | |
| mongo_client.close() | |
| # Drop PostgreSQL test table | |
| await pg_conn.execute("DROP TABLE IF EXISTS trans.employees_ref") | |
| await pg_conn.close() | |
| def test_property_employee_sync_creates_matching_records( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| last_name, | |
| designation, | |
| status, | |
| city, | |
| state, | |
| manager_id, | |
| department, | |
| system_login_enabled | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 7: Employee sync creates matching records | |
| For any employee created in MongoDB, syncing should result in a record in | |
| trans.employees_ref with matching employee_id and all required fields populated. | |
| Validates: Requirements 3.1, 3.5 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate employee document | |
| employee_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| last_name=last_name, | |
| designation=designation, | |
| status=status, | |
| city=city, | |
| state=state, | |
| manager_id=manager_id, | |
| department=department, | |
| system_login_enabled=system_login_enabled | |
| ) | |
| # Insert into MongoDB | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(employee_doc) | |
| # Sync to PostgreSQL | |
| handler = EmployeeSyncHandler() | |
| success = await handler.sync( | |
| entity_id=user_id, | |
| mongo_db=mongo_db, | |
| pg_conn=pg_conn | |
| ) | |
| assert success, "Sync should succeed" | |
| # Verify record exists in PostgreSQL | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record is not None, "Record should exist in PostgreSQL" | |
| assert pg_record["employee_id"] == user_id, "employee_id should match" | |
| assert pg_record["employee_code"] == employee_code, "employee_code should match" | |
| assert pg_record["designation"] == designation, "designation should match" | |
| assert pg_record["status"] == status, "status should match" | |
| assert pg_record["city"] == city, "city should match" | |
| assert pg_record["state"] == state, "state should match" | |
| assert pg_record["manager_id"] == manager_id, "manager_id should match" | |
| assert pg_record["department"] == department, "department should match" | |
| assert pg_record["system_login_enabled"] == system_login_enabled, "system_login_enabled should match" | |
| assert pg_record["created_at"] is not None, "created_at should be populated" | |
| # Verify full_name is computed correctly | |
| expected_full_name = f"{first_name} {last_name}".strip() if last_name else first_name.strip() | |
| assert pg_record["full_name"] == expected_full_name, "full_name should be computed correctly" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_employee_sync_updates_existing_records( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| designation, | |
| initial_status, | |
| updated_status, | |
| initial_city, | |
| updated_city | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 8: Employee sync updates existing records | |
| For any employee updated in MongoDB, syncing should update the corresponding | |
| record in trans.employees_ref with the new values. | |
| Validates: Requirements 3.2 | |
| """ | |
| # Ensure there's a difference to test | |
| if initial_status == updated_status and initial_city == updated_city: | |
| return | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create initial employee document | |
| initial_time = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| initial_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| designation=designation, | |
| status=initial_status, | |
| city=initial_city, | |
| created_at=initial_time, | |
| updated_at=initial_time | |
| ) | |
| # Insert into MongoDB and sync | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(initial_doc) | |
| handler = EmployeeSyncHandler() | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify initial state | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record["status"] == initial_status, "Initial status should match" | |
| assert pg_record["city"] == initial_city, "Initial city should match" | |
| # Update employee in MongoDB | |
| updated_time = initial_time + timedelta(seconds=1) | |
| await collection.update_one( | |
| {"user_id": user_id}, | |
| { | |
| "$set": { | |
| "status": updated_status, | |
| "base_city": updated_city, | |
| "updated_at": updated_time | |
| } | |
| } | |
| ) | |
| # Sync again | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify updated state | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record["status"] == updated_status, "Status should be updated" | |
| assert pg_record["city"] == updated_city, "City should be updated" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_boolean_type_conversion( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| designation, | |
| status, | |
| system_login_enabled | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 9: Boolean type conversion | |
| For any employee with boolean fields (system_login_enabled), syncing should | |
| correctly convert MongoDB boolean to PostgreSQL BOOLEAN type. | |
| Validates: Requirements 9.3 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate employee document | |
| employee_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| designation=designation, | |
| status=status, | |
| system_login_enabled=system_login_enabled | |
| ) | |
| # Insert into MongoDB | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(employee_doc) | |
| # Sync to PostgreSQL | |
| handler = EmployeeSyncHandler() | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify boolean conversion | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT system_login_enabled FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record is not None, "Record should exist" | |
| assert isinstance(pg_record["system_login_enabled"], bool), \ | |
| "system_login_enabled should be boolean type" | |
| assert pg_record["system_login_enabled"] == system_login_enabled, \ | |
| "Boolean value should match" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_idempotent_employee_sync( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| designation, | |
| status, | |
| sync_count | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 11: Idempotent sync operations | |
| For any entity, calling sync multiple times with the same data should | |
| produce the same result in PostgreSQL (no duplicates, same final state). | |
| Validates: Requirements 6.1 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate employee document | |
| employee_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| designation=designation, | |
| status=status | |
| ) | |
| # Insert into MongoDB | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(employee_doc) | |
| # Sync multiple times | |
| handler = EmployeeSyncHandler() | |
| for _ in range(sync_count): | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify only one record exists | |
| count = await pg_conn.fetchval( | |
| "SELECT COUNT(1) FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert count == 1, f"Should have exactly 1 record, found {count}" | |
| # Verify data is correct | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record["employee_code"] == employee_code, "Data should remain consistent" | |
| assert pg_record["status"] == status, "Data should remain consistent" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_employee_upsert_behavior( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| designation, | |
| initial_status, | |
| updated_status | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 12: Upsert behavior | |
| For any entity that already exists in PostgreSQL, syncing should update | |
| the existing record rather than creating a duplicate. | |
| Validates: Requirements 6.2 | |
| """ | |
| if initial_status == updated_status: | |
| return | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create and sync initial employee | |
| initial_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| designation=designation, | |
| status=initial_status | |
| ) | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(initial_doc) | |
| handler = EmployeeSyncHandler() | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Update and sync again | |
| await collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": {"status": updated_status, "updated_at": datetime.now(datetime.UTC).replace(tzinfo=None)}} | |
| ) | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify only one record exists with updated data | |
| count = await pg_conn.fetchval( | |
| "SELECT COUNT(1) FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert count == 1, "Should have exactly 1 record (upsert, not insert)" | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT status FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| assert pg_record["status"] == updated_status, "Record should be updated, not duplicated" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_employee_timestamp_conflict_resolution( | |
| user_id, | |
| employee_code, | |
| first_name, | |
| designation, | |
| status | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 14: Timestamp-based conflict resolution | |
| For any entity where MongoDB updated_at is older than PostgreSQL updated_at, | |
| the sync should skip the update. | |
| Validates: Requirements 6.5 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create employee with newer timestamp | |
| newer_time = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| newer_doc = generate_employee_document( | |
| user_id=user_id, | |
| employee_code=employee_code, | |
| first_name=first_name, | |
| designation=designation, | |
| status=status, | |
| created_at=newer_time, | |
| updated_at=newer_time | |
| ) | |
| # Insert and sync | |
| collection = mongo_db[SCM_EMPLOYEES_COLLECTION] | |
| await collection.insert_one(newer_doc) | |
| handler = EmployeeSyncHandler() | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Get initial PostgreSQL state | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| initial_status = pg_record["status"] | |
| # Try to sync with older timestamp and different status | |
| older_time = newer_time - timedelta(hours=1) | |
| different_status = "inactive" if status != "inactive" else "active" | |
| await collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": {"status": different_status, "updated_at": older_time}} | |
| ) | |
| await handler.sync(user_id, mongo_db, pg_conn) | |
| # Verify PostgreSQL status didn't change (update was skipped) | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT status FROM trans.employees_ref WHERE employee_id = $1", | |
| user_id | |
| ) | |
| final_status = pg_record["status"] | |
| assert final_status == initial_status, \ | |
| "PostgreSQL status should not change when MongoDB timestamp is older" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |