cuatrolabs-scm-ms / tests /test_properties_catalogue_sync.py
MukeshKapoor25's picture
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99
"""
Property-based tests for catalogue sync operations.
Feature: postgres-sync
"""
import pytest
import asyncio
from hypothesis import given, strategies as st, settings as hyp_settings
from datetime import datetime, timedelta
from motor.motor_asyncio import AsyncIOMotorClient
import asyncpg
from decimal import Decimal
from app.sync.catalogues.handler import CatalogueSyncHandler
from app.constants.collections import SCM_CATALOGUE_COLLECTION
from app.core.config import settings
# Test database names
TEST_DB_NAME = "scm_test_db_catalogue_sync"
# Strategies for generating test data
catalogue_id_strategy = st.text(
min_size=10, max_size=50,
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-")
)
catalogue_name_strategy = st.text(
min_size=5, max_size=100,
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters=" -")
)
catalogue_type_strategy = st.sampled_from(["Product", "Service", "Package"])
status_strategy = st.sampled_from(["Active", "Inactive", "Draft"])
sku_strategy = st.one_of(
st.none(),
st.text(min_size=5, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Nd"), whitelist_characters="-"))
)
barcode_strategy = st.one_of(
st.none(),
st.text(min_size=8, max_size=20, alphabet="0123456789")
)
hsn_code_strategy = st.one_of(
st.none(),
st.text(min_size=4, max_size=8, alphabet="0123456789")
)
# Numeric strategies with proper precision
gst_rate_strategy = st.one_of(
st.none(),
st.floats(min_value=0.0, max_value=28.0, allow_nan=False, allow_infinity=False).map(
lambda x: round(x, 2)
)
)
price_strategy = st.one_of(
st.none(),
st.floats(min_value=0.01, max_value=999999.99, allow_nan=False, allow_infinity=False).map(
lambda x: round(x, 2)
)
)
boolean_strategy = st.booleans()
def generate_catalogue_document(
catalogue_id: str,
catalogue_name: str,
catalogue_type: str,
status: str,
sku: str = None,
barcode_number: str = None,
hsn_code: str = None,
gst_rate: float = None,
mrp: float = None,
base_price: float = None,
track_inventory: bool = False,
batch_managed: bool = False,
created_at: datetime = None,
updated_at: datetime = None
):
"""Generate a valid catalogue document for MongoDB"""
if created_at is None:
created_at = datetime.now(datetime.UTC).replace(tzinfo=None)
if updated_at is None:
updated_at = created_at
doc = {
"catalogue_id": catalogue_id,
"catalogue_name": catalogue_name,
"catalogue_type": catalogue_type,
"category": "Test Category",
"brand": "Test Brand",
"description": f"Test catalogue {catalogue_name}",
"meta": {
"status": status,
"created_at": created_at,
"updated_at": updated_at,
"created_by": "test_user"
}
}
# Add identifiers if provided
if sku or barcode_number:
doc["identifiers"] = {}
if sku:
doc["identifiers"]["sku"] = sku
if barcode_number:
doc["identifiers"]["barcode_number"] = barcode_number
# Add tax if provided
if hsn_code or gst_rate is not None:
doc["tax"] = {}
if hsn_code:
doc["tax"]["hsn_code"] = hsn_code
if gst_rate is not None:
doc["tax"]["gst_rate"] = gst_rate
# Add pricing if provided
if mrp is not None or base_price is not None:
doc["pricing"] = {}
if mrp is not None:
doc["pricing"]["mrp"] = mrp
if base_price is not None:
doc["pricing"]["retail_price"] = base_price
doc["pricing"]["currency"] = "INR"
# Add inventory
doc["inventory"] = {
"track_inventory": track_inventory,
"unit": "PCS"
}
# Add procurement for batch management
if batch_managed:
doc["procurement"] = {
"batch_managed": True
}
return doc
async def setup_test_db():
"""Setup test database and PostgreSQL connection"""
# MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[TEST_DB_NAME]
# PostgreSQL
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
database=settings.POSTGRES_DB,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD
)
# Create schema and table
await pg_conn.execute("CREATE SCHEMA IF NOT EXISTS trans")
await pg_conn.execute("""
CREATE TABLE IF NOT EXISTS trans.catalogue_ref (
catalogue_id TEXT PRIMARY KEY,
catalogue_type TEXT NOT NULL,
catalogue_name TEXT NOT NULL,
sku TEXT,
barcode_number TEXT,
hsn_code TEXT,
gst_rate NUMERIC(5,2),
mrp NUMERIC(12,2),
base_price NUMERIC(12,2),
track_inventory BOOLEAN,
batch_managed BOOLEAN,
status TEXT NOT NULL,
created_at TIMESTAMP NOT NULL
)
""")
return mongo_client, mongo_db, pg_conn
async def teardown_test_db(mongo_client, mongo_db, pg_conn):
"""Cleanup test database and connections"""
# Drop MongoDB test database
await mongo_client.drop_database(TEST_DB_NAME)
mongo_client.close()
# Drop PostgreSQL test table
await pg_conn.execute("DROP TABLE IF EXISTS trans.catalogue_ref")
await pg_conn.close()
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
status=status_strategy,
sku=sku_strategy,
barcode_number=barcode_strategy,
hsn_code=hsn_code_strategy,
gst_rate=gst_rate_strategy,
mrp=price_strategy,
base_price=price_strategy,
track_inventory=boolean_strategy,
batch_managed=boolean_strategy
)
def test_property_catalogue_sync_creates_matching_records(
catalogue_id,
catalogue_name,
catalogue_type,
status,
sku,
barcode_number,
hsn_code,
gst_rate,
mrp,
base_price,
track_inventory,
batch_managed
):
"""
Feature: postgres-sync, Property 4: Catalogue sync creates matching records
For any catalogue created in MongoDB, syncing should result in a record in
trans.catalogue_ref with matching catalogue_id and all required fields populated.
Validates: Requirements 2.1, 2.5
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate catalogue document
catalogue_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=status,
sku=sku,
barcode_number=barcode_number,
hsn_code=hsn_code,
gst_rate=gst_rate,
mrp=mrp,
base_price=base_price,
track_inventory=track_inventory,
batch_managed=batch_managed
)
# Insert into MongoDB
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(catalogue_doc)
# Sync to PostgreSQL
handler = CatalogueSyncHandler()
success = await handler.sync(
entity_id=catalogue_id,
mongo_db=mongo_db,
pg_conn=pg_conn
)
assert success, "Sync should succeed"
# Verify record exists in PostgreSQL
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert pg_record is not None, "Record should exist in PostgreSQL"
assert pg_record["catalogue_id"] == catalogue_id, "catalogue_id should match"
assert pg_record["catalogue_name"] == catalogue_name, "catalogue_name should match"
assert pg_record["catalogue_type"] == catalogue_type, "catalogue_type should match"
assert pg_record["status"] == status, "status should match"
assert pg_record["sku"] == sku, "sku should match"
assert pg_record["barcode_number"] == barcode_number, "barcode_number should match"
assert pg_record["hsn_code"] == hsn_code, "hsn_code should match"
assert pg_record["track_inventory"] == track_inventory, "track_inventory should match"
assert pg_record["batch_managed"] == batch_managed, "batch_managed should match"
assert pg_record["created_at"] is not None, "created_at should be populated"
# Check numeric precision
if gst_rate is not None:
assert pg_record["gst_rate"] == Decimal(str(gst_rate)).quantize(Decimal('0.01')), \
"gst_rate should match with correct precision"
else:
assert pg_record["gst_rate"] is None, "gst_rate should be NULL"
if mrp is not None:
assert pg_record["mrp"] == Decimal(str(mrp)).quantize(Decimal('0.01')), \
"mrp should match with correct precision"
else:
assert pg_record["mrp"] is None, "mrp should be NULL"
if base_price is not None:
assert pg_record["base_price"] == Decimal(str(base_price)).quantize(Decimal('0.01')), \
"base_price should match with correct precision"
else:
assert pg_record["base_price"] is None, "base_price should be NULL"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
initial_status=status_strategy,
updated_status=status_strategy,
initial_price=price_strategy,
updated_price=price_strategy
)
def test_property_catalogue_sync_updates_existing_records(
catalogue_id,
catalogue_name,
catalogue_type,
initial_status,
updated_status,
initial_price,
updated_price
):
"""
Feature: postgres-sync, Property 5: Catalogue sync updates existing records
For any catalogue updated in MongoDB, syncing should update the corresponding
record in trans.catalogue_ref with the new values.
Validates: Requirements 2.2
"""
# Ensure there's a difference to test
if initial_status == updated_status and initial_price == updated_price:
return
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create initial catalogue document
initial_time = datetime.now(datetime.UTC).replace(tzinfo=None)
initial_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=initial_status,
mrp=initial_price,
created_at=initial_time,
updated_at=initial_time
)
# Insert into MongoDB and sync
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(initial_doc)
handler = CatalogueSyncHandler()
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify initial state
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert pg_record["status"] == initial_status, "Initial status should match"
if initial_price is not None:
assert pg_record["mrp"] == Decimal(str(initial_price)).quantize(Decimal('0.01')), \
"Initial mrp should match"
# Update catalogue in MongoDB
updated_time = initial_time + timedelta(seconds=1)
update_fields = {
"meta.status": updated_status,
"meta.updated_at": updated_time
}
if updated_price is not None:
update_fields["pricing.mrp"] = updated_price
await collection.update_one(
{"catalogue_id": catalogue_id},
{"$set": update_fields}
)
# Sync again
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify updated state
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert pg_record["status"] == updated_status, "Status should be updated"
if updated_price is not None:
assert pg_record["mrp"] == Decimal(str(updated_price)).quantize(Decimal('0.01')), \
"MRP should be updated"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
status=status_strategy,
gst_rate=gst_rate_strategy,
mrp=price_strategy,
base_price=price_strategy
)
def test_property_numeric_precision_preservation(
catalogue_id,
catalogue_name,
catalogue_type,
status,
gst_rate,
mrp,
base_price
):
"""
Feature: postgres-sync, Property 6: Numeric precision preservation
For any catalogue with numeric fields (gst_rate, mrp, base_price), syncing
should preserve the precision defined in PostgreSQL schema (5,2 for gst_rate,
12,2 for prices).
Validates: Requirements 9.2
"""
# Only test if at least one numeric field is present
if gst_rate is None and mrp is None and base_price is None:
return
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate catalogue with numeric fields
catalogue_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=status,
gst_rate=gst_rate,
mrp=mrp,
base_price=base_price
)
# Insert and sync
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(catalogue_doc)
handler = CatalogueSyncHandler()
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify numeric precision
pg_record = await pg_conn.fetchrow(
"SELECT gst_rate, mrp, base_price FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
if gst_rate is not None:
expected_gst = Decimal(str(gst_rate)).quantize(Decimal('0.01'))
assert pg_record["gst_rate"] == expected_gst, \
f"gst_rate precision should be (5,2): expected {expected_gst}, got {pg_record['gst_rate']}"
# Verify it's within NUMERIC(5,2) bounds
assert abs(pg_record["gst_rate"]) < 1000, "gst_rate should fit in NUMERIC(5,2)"
if mrp is not None:
expected_mrp = Decimal(str(mrp)).quantize(Decimal('0.01'))
assert pg_record["mrp"] == expected_mrp, \
f"mrp precision should be (12,2): expected {expected_mrp}, got {pg_record['mrp']}"
# Verify it's within NUMERIC(12,2) bounds
assert abs(pg_record["mrp"]) < 10000000000, "mrp should fit in NUMERIC(12,2)"
if base_price is not None:
expected_base = Decimal(str(base_price)).quantize(Decimal('0.01'))
assert pg_record["base_price"] == expected_base, \
f"base_price precision should be (12,2): expected {expected_base}, got {pg_record['base_price']}"
# Verify it's within NUMERIC(12,2) bounds
assert abs(pg_record["base_price"]) < 10000000000, "base_price should fit in NUMERIC(12,2)"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
status=status_strategy,
sync_count=st.integers(min_value=2, max_value=5)
)
def test_property_idempotent_catalogue_sync(
catalogue_id,
catalogue_name,
catalogue_type,
status,
sync_count
):
"""
Feature: postgres-sync, Property 11: Idempotent sync operations
For any entity, calling sync multiple times with the same data should
produce the same result in PostgreSQL (no duplicates, same final state).
Validates: Requirements 6.1
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Generate catalogue document
catalogue_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=status
)
# Insert into MongoDB
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(catalogue_doc)
# Sync multiple times
handler = CatalogueSyncHandler()
for _ in range(sync_count):
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify only one record exists
count = await pg_conn.fetchval(
"SELECT COUNT(1) FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert count == 1, f"Should have exactly 1 record, found {count}"
# Verify data is correct
pg_record = await pg_conn.fetchrow(
"SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert pg_record["catalogue_name"] == catalogue_name, "Data should remain consistent"
assert pg_record["status"] == status, "Data should remain consistent"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
initial_status=status_strategy,
updated_status=status_strategy
)
def test_property_catalogue_upsert_behavior(
catalogue_id,
catalogue_name,
catalogue_type,
initial_status,
updated_status
):
"""
Feature: postgres-sync, Property 12: Upsert behavior
For any entity that already exists in PostgreSQL, syncing should update
the existing record rather than creating a duplicate.
Validates: Requirements 6.2
"""
if initial_status == updated_status:
return
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create and sync initial catalogue
initial_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=initial_status
)
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(initial_doc)
handler = CatalogueSyncHandler()
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Update and sync again
await collection.update_one(
{"catalogue_id": catalogue_id},
{
"$set": {
"meta.status": updated_status,
"meta.updated_at": datetime.now(datetime.UTC).replace(tzinfo=None)
}
}
)
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify only one record exists with updated data
count = await pg_conn.fetchval(
"SELECT COUNT(1) FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert count == 1, "Should have exactly 1 record (upsert, not insert)"
pg_record = await pg_conn.fetchrow(
"SELECT status FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
assert pg_record["status"] == updated_status, "Record should be updated, not duplicated"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())
@hyp_settings(max_examples=100)
@given(
catalogue_id=catalogue_id_strategy,
catalogue_name=catalogue_name_strategy,
catalogue_type=catalogue_type_strategy,
status=status_strategy
)
def test_property_catalogue_timestamp_conflict_resolution(
catalogue_id,
catalogue_name,
catalogue_type,
status
):
"""
Feature: postgres-sync, Property 14: Timestamp-based conflict resolution
For any entity where MongoDB updated_at is older than PostgreSQL updated_at,
the sync should skip the update.
Validates: Requirements 6.5
"""
async def run_test():
mongo_client, mongo_db, pg_conn = await setup_test_db()
try:
# Create catalogue with newer timestamp
newer_time = datetime.now(datetime.UTC).replace(tzinfo=None)
newer_doc = generate_catalogue_document(
catalogue_id=catalogue_id,
catalogue_name=catalogue_name,
catalogue_type=catalogue_type,
status=status,
created_at=newer_time,
updated_at=newer_time
)
# Insert and sync
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
await collection.insert_one(newer_doc)
handler = CatalogueSyncHandler()
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify initial sync - need to check created_at since table doesn't have updated_at
pg_record = await pg_conn.fetchrow(
"SELECT created_at FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
initial_pg_created_at = pg_record["created_at"]
# Try to sync with older timestamp
older_time = newer_time - timedelta(hours=1)
await collection.update_one(
{"catalogue_id": catalogue_id},
{"$set": {"meta.updated_at": older_time}}
)
await handler.sync(catalogue_id, mongo_db, pg_conn)
# Verify PostgreSQL record didn't change (update was skipped)
pg_record = await pg_conn.fetchrow(
"SELECT created_at FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue_id
)
final_pg_created_at = pg_record["created_at"]
assert final_pg_created_at == initial_pg_created_at, \
"PostgreSQL record should not change when MongoDB timestamp is older"
finally:
await teardown_test_db(mongo_client, mongo_db, pg_conn)
asyncio.run(run_test())