cuatrolabs-scm-ms / tests /test_properties_employee_sync.py
MukeshKapoor25's picture
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99
"""
Property-based tests for employee sync operations.
Feature: postgres-sync
"""
import pytest
import asyncio
from hypothesis import given, strategies as st, settings as hyp_settings
from datetime import datetime, timedelta, date
from motor.motor_asyncio import AsyncIOMotorClient
import asyncpg
from app.sync.employees.handler import EmployeeSyncHandler
from app.constants.collections import SCM_EMPLOYEES_COLLECTION
from app.core.config import settings
# Test database names
TEST_DB_NAME = "scm_test_db_employee_sync"
# Strategies for generating test data
employee_id_strategy = st.text(
min_size=10, max_size=50,
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-")
)
employee_code_strategy = st.text(
min_size=5, max_size=20,
alphabet=st.characters(whitelist_categories=("Lu", "Nd"), whitelist_characters="-")
)
first_name_strategy = st.text(
min_size=2, max_size=30,
alphabet=st.characters(whitelist_categories=("Lu", "Ll"))
)
last_name_strategy = st.one_of(
st.none(),
st.text(min_size=2, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll")))
)
designation_strategy = st.sampled_from(["ASM", "RSM", "TSM", "DSM", "Manager", "Executive"])
status_strategy = st.sampled_from(["active", "inactive", "onboarding", "suspended"])
city_strategy = st.one_of(
st.none(),
st.text(min_size=3, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll")))
)
state_strategy = st.one_of(
st.none(),
st.text(min_size=3, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Ll")))
)
manager_id_strategy = st.one_of(
st.none(),
employee_id_strategy
)
department_strategy = st.one_of(
st.none(),
st.sampled_from(["Sales", "Operations", "Finance", "HR", "IT"])
)
system_login_enabled_strategy = st.booleans()
def generate_employee_document(
user_id: str,
employee_code: str,
first_name: str,
designation: str,
status: str,
last_name: str = None,
city: str = None,
state: str = None,
manager_id: str = None,
department: str = None,
system_login_enabled: bool = False,
created_at: datetime = None,
updated_at: datetime = None
):
"""Generate a valid employee document for MongoDB"""
if created_at is None:
created_at = datetime.now(datetime.UTC).replace(tzinfo=None)
if updated_at is None:
updated_at = created_at
return {
"user_id": user_id,
"employee_code": employee_code,
"first_name": first_name,
"last_name": last_name,
"email": f"{employee_code.lower()}@test.com",
"phone": "+919876543210",
"designation": designation,
"manager_id": manager_id,
"base_city": city or "Mumbai",
"base_state": state or "Maharashtra",
"base_country": "India",
"region": "Western",
"dob": date(1990, 1, 1),
"doj": date(2023, 1, 1),
"department": department,
"emergency_contact": {
"name": "Emergency Contact",
"phone": "+919876543211"
},
"status": status,
"app_access": {
"has_mobile_app": system_login_enabled,
"requires_2fa": False,
"roles": ["field_sales"] if system_login_enabled else []
},
"location_settings": {
"location_tracking_consent": False
},
"created_by": "test_user",
"created_at": created_at,
"updated_at": updated_at
}
async def setup_test_db():
"""Setup test database and PostgreSQL connection"""
# MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[TEST_DB_NAME]
# PostgreSQL
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD
)
# Create schema and table
await pg_conn.execute("CREATE SCHEMA IF NOT EXISTS trans")
await pg_conn.execute("""
CREATE TABLE IF NOT EXISTS trans.employees_ref (
employee_id TEXT PRIMARY KEY,
employee_code TEXT NOT NULL,
full_name TEXT NOT NULL,
designation TEXT NOT NULL,
department TEXT,
manager_id TEXT,
city TEXT,
state TEXT,
status TEXT NOT NULL,
system_login_enabled BOOLEAN,
created_at TIMESTAMP NOT NULL
)
""")
return mongo_client, mongo_db, pg_conn
async def teardown_test_db(mongo_client, mongo_db, pg_conn):
"""Cleanup test database and connections"""
# Drop MongoDB test database
await mongo_client.drop_database(TEST_DB_NAME)
mongo_client.close()
# Drop PostgreSQL test table
await pg_conn.execute("DROP TABLE IF EXISTS trans.employees_ref")
await pg_conn.close()
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
last_name=last_name_strategy,
designation=designation_strategy,
status=status_strategy,
city=city_strategy,
state=state_strategy,
manager_id=manager_id_strategy,
department=department_strategy,
system_login_enabled=system_login_enabled_strategy
)
def test_property_employee_sync_creates_matching_records(
user_id,
employee_code,
first_name,
last_name,
designation,
status,
city,
state,
manager_id,
department,
system_login_enabled
):
"""
Feature: postgres-sync, Property 7: Employee sync creates matching records
For any employee created in MongoDB, syncing should result in a record in
trans.employees_ref with matching employee_id and all required fields populated.
Validates: Requirements 3.1, 3.5
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate employee document
employee_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
last_name=last_name,
designation=designation,
status=status,
city=city,
state=state,
manager_id=manager_id,
department=department,
system_login_enabled=system_login_enabled
)
# Insert into MongoDB
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(employee_doc)
# Sync to PostgreSQL
handler = EmployeeSyncHandler()
success = await handler.sync(
entity_id=user_id,
mongo_db=mongo_db,
pg_conn=pg_conn
)
assert success, "Sync should succeed"
# Verify record exists in PostgreSQL
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record is not None, "Record should exist in PostgreSQL"
assert pg_record["employee_id"] == user_id, "employee_id should match"
assert pg_record["employee_code"] == employee_code, "employee_code should match"
assert pg_record["designation"] == designation, "designation should match"
assert pg_record["status"] == status, "status should match"
assert pg_record["city"] == city, "city should match"
assert pg_record["state"] == state, "state should match"
assert pg_record["manager_id"] == manager_id, "manager_id should match"
assert pg_record["department"] == department, "department should match"
assert pg_record["system_login_enabled"] == system_login_enabled, "system_login_enabled should match"
assert pg_record["created_at"] is not None, "created_at should be populated"
# Verify full_name is computed correctly
expected_full_name = f"{first_name} {last_name}".strip() if last_name else first_name.strip()
assert pg_record["full_name"] == expected_full_name, "full_name should be computed correctly"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
designation=designation_strategy,
initial_status=status_strategy,
updated_status=status_strategy,
initial_city=city_strategy,
updated_city=city_strategy
)
def test_property_employee_sync_updates_existing_records(
user_id,
employee_code,
first_name,
designation,
initial_status,
updated_status,
initial_city,
updated_city
):
"""
Feature: postgres-sync, Property 8: Employee sync updates existing records
For any employee updated in MongoDB, syncing should update the corresponding
record in trans.employees_ref with the new values.
Validates: Requirements 3.2
"""
# Ensure there's a difference to test
if initial_status == updated_status and initial_city == updated_city:
return
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create initial employee document
initial_time = datetime.now(datetime.UTC).replace(tzinfo=None)
initial_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
designation=designation,
status=initial_status,
city=initial_city,
created_at=initial_time,
updated_at=initial_time
)
# Insert into MongoDB and sync
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(initial_doc)
handler = EmployeeSyncHandler()
await handler.sync(user_id, mongo_db, pg_conn)
# Verify initial state
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record["status"] == initial_status, "Initial status should match"
assert pg_record["city"] == initial_city, "Initial city should match"
# Update employee in MongoDB
updated_time = initial_time + timedelta(seconds=1)
await collection.update_one(
{"user_id": user_id},
{
"$set": {
"status": updated_status,
"base_city": updated_city,
"updated_at": updated_time
}
}
)
# Sync again
await handler.sync(user_id, mongo_db, pg_conn)
# Verify updated state
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record["status"] == updated_status, "Status should be updated"
assert pg_record["city"] == updated_city, "City should be updated"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
designation=designation_strategy,
status=status_strategy,
system_login_enabled=system_login_enabled_strategy
)
def test_property_boolean_type_conversion(
user_id,
employee_code,
first_name,
designation,
status,
system_login_enabled
):
"""
Feature: postgres-sync, Property 9: Boolean type conversion
For any employee with boolean fields (system_login_enabled), syncing should
correctly convert MongoDB boolean to PostgreSQL BOOLEAN type.
Validates: Requirements 9.3
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate employee document
employee_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
designation=designation,
status=status,
system_login_enabled=system_login_enabled
)
# Insert into MongoDB
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(employee_doc)
# Sync to PostgreSQL
handler = EmployeeSyncHandler()
await handler.sync(user_id, mongo_db, pg_conn)
# Verify boolean conversion
pg_record = await pg_conn.fetchrow(
"SELECT system_login_enabled FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record is not None, "Record should exist"
assert isinstance(pg_record["system_login_enabled"], bool), \
"system_login_enabled should be boolean type"
assert pg_record["system_login_enabled"] == system_login_enabled, \
"Boolean value should match"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
designation=designation_strategy,
status=status_strategy,
sync_count=st.integers(min_value=2, max_value=5)
)
def test_property_idempotent_employee_sync(
user_id,
employee_code,
first_name,
designation,
status,
sync_count
):
"""
Feature: postgres-sync, Property 11: Idempotent sync operations
For any entity, calling sync multiple times with the same data should
produce the same result in PostgreSQL (no duplicates, same final state).
Validates: Requirements 6.1
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate employee document
employee_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
designation=designation,
status=status
)
# Insert into MongoDB
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(employee_doc)
# Sync multiple times
handler = EmployeeSyncHandler()
for _ in range(sync_count):
await handler.sync(user_id, mongo_db, pg_conn)
# Verify only one record exists
count = await pg_conn.fetchval(
"SELECT COUNT(1) FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert count == 1, f"Should have exactly 1 record, found {count}"
# Verify data is correct
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record["employee_code"] == employee_code, "Data should remain consistent"
assert pg_record["status"] == status, "Data should remain consistent"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
designation=designation_strategy,
initial_status=status_strategy,
updated_status=status_strategy
)
def test_property_employee_upsert_behavior(
user_id,
employee_code,
first_name,
designation,
initial_status,
updated_status
):
"""
Feature: postgres-sync, Property 12: Upsert behavior
For any entity that already exists in PostgreSQL, syncing should update
the existing record rather than creating a duplicate.
Validates: Requirements 6.2
"""
if initial_status == updated_status:
return
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create and sync initial employee
initial_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
designation=designation,
status=initial_status
)
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(initial_doc)
handler = EmployeeSyncHandler()
await handler.sync(user_id, mongo_db, pg_conn)
# Update and sync again
await collection.update_one(
{"user_id": user_id},
{"$set": {"status": updated_status, "updated_at": datetime.now(datetime.UTC).replace(tzinfo=None)}}
)
await handler.sync(user_id, mongo_db, pg_conn)
# Verify only one record exists with updated data
count = await pg_conn.fetchval(
"SELECT COUNT(1) FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert count == 1, "Should have exactly 1 record (upsert, not insert)"
pg_record = await pg_conn.fetchrow(
"SELECT status FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
assert pg_record["status"] == updated_status, "Record should be updated, not duplicated"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
user_id=employee_id_strategy,
employee_code=employee_code_strategy,
first_name=first_name_strategy,
designation=designation_strategy,
status=status_strategy
)
def test_property_employee_timestamp_conflict_resolution(
user_id,
employee_code,
first_name,
designation,
status
):
"""
Feature: postgres-sync, Property 14: Timestamp-based conflict resolution
For any entity where MongoDB updated_at is older than PostgreSQL updated_at,
the sync should skip the update.
Validates: Requirements 6.5
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create employee with newer timestamp
newer_time = datetime.now(datetime.UTC).replace(tzinfo=None)
newer_doc = generate_employee_document(
user_id=user_id,
employee_code=employee_code,
first_name=first_name,
designation=designation,
status=status,
created_at=newer_time,
updated_at=newer_time
)
# Insert and sync
collection = mongo_db[SCM_EMPLOYEES_COLLECTION]
await collection.insert_one(newer_doc)
handler = EmployeeSyncHandler()
await handler.sync(user_id, mongo_db, pg_conn)
# Get initial PostgreSQL state
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
initial_status = pg_record["status"]
# Try to sync with older timestamp and different status
older_time = newer_time - timedelta(hours=1)
different_status = "inactive" if status != "inactive" else "active"
await collection.update_one(
{"user_id": user_id},
{"$set": {"status": different_status, "updated_at": older_time}}
)
await handler.sync(user_id, mongo_db, pg_conn)
# Verify PostgreSQL status didn't change (update was skipped)
pg_record = await pg_conn.fetchrow(
"SELECT status FROM trans.employees_ref WHERE employee_id = $1",
user_id
)
final_status = pg_record["status"]
assert final_status == initial_status, \
"PostgreSQL status should not change when MongoDB timestamp is older"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())