Spaces:
Runtime error
Runtime error
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99 | """ | |
| Property-based tests for catalogue sync operations. | |
| Feature: postgres-sync | |
| """ | |
| import pytest | |
| import asyncio | |
| from hypothesis import given, strategies as st, settings as hyp_settings | |
| from datetime import datetime, timedelta | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import asyncpg | |
| from decimal import Decimal | |
| from app.sync.catalogues.handler import CatalogueSyncHandler | |
| from app.constants.collections import SCM_CATALOGUE_COLLECTION | |
| from app.core.config import settings | |
| # Test database names | |
| TEST_DB_NAME = "scm_test_db_catalogue_sync" | |
| # Strategies for generating test data | |
| catalogue_id_strategy = st.text( | |
| min_size=10, max_size=50, | |
| alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-") | |
| ) | |
| catalogue_name_strategy = st.text( | |
| min_size=5, max_size=100, | |
| alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters=" -") | |
| ) | |
| catalogue_type_strategy = st.sampled_from(["Product", "Service", "Package"]) | |
| status_strategy = st.sampled_from(["Active", "Inactive", "Draft"]) | |
| sku_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=5, max_size=30, alphabet=st.characters(whitelist_categories=("Lu", "Nd"), whitelist_characters="-")) | |
| ) | |
| barcode_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=8, max_size=20, alphabet="0123456789") | |
| ) | |
| hsn_code_strategy = st.one_of( | |
| st.none(), | |
| st.text(min_size=4, max_size=8, alphabet="0123456789") | |
| ) | |
| # Numeric strategies with proper precision | |
| gst_rate_strategy = st.one_of( | |
| st.none(), | |
| st.floats(min_value=0.0, max_value=28.0, allow_nan=False, allow_infinity=False).map( | |
| lambda x: round(x, 2) | |
| ) | |
| ) | |
| price_strategy = st.one_of( | |
| st.none(), | |
| st.floats(min_value=0.01, max_value=999999.99, allow_nan=False, allow_infinity=False).map( | |
| lambda x: round(x, 2) | |
| ) | |
| ) | |
| boolean_strategy = st.booleans() | |
| def generate_catalogue_document( | |
| catalogue_id: str, | |
| catalogue_name: str, | |
| catalogue_type: str, | |
| status: str, | |
| sku: str = None, | |
| barcode_number: str = None, | |
| hsn_code: str = None, | |
| gst_rate: float = None, | |
| mrp: float = None, | |
| base_price: float = None, | |
| track_inventory: bool = False, | |
| batch_managed: bool = False, | |
| created_at: datetime = None, | |
| updated_at: datetime = None | |
| ): | |
| """Generate a valid catalogue document for MongoDB""" | |
| if created_at is None: | |
| created_at = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| if updated_at is None: | |
| updated_at = created_at | |
| doc = { | |
| "catalogue_id": catalogue_id, | |
| "catalogue_name": catalogue_name, | |
| "catalogue_type": catalogue_type, | |
| "category": "Test Category", | |
| "brand": "Test Brand", | |
| "description": f"Test catalogue {catalogue_name}", | |
| "meta": { | |
| "status": status, | |
| "created_at": created_at, | |
| "updated_at": updated_at, | |
| "created_by": "test_user" | |
| } | |
| } | |
| # Add identifiers if provided | |
| if sku or barcode_number: | |
| doc["identifiers"] = {} | |
| if sku: | |
| doc["identifiers"]["sku"] = sku | |
| if barcode_number: | |
| doc["identifiers"]["barcode_number"] = barcode_number | |
| # Add tax if provided | |
| if hsn_code or gst_rate is not None: | |
| doc["tax"] = {} | |
| if hsn_code: | |
| doc["tax"]["hsn_code"] = hsn_code | |
| if gst_rate is not None: | |
| doc["tax"]["gst_rate"] = gst_rate | |
| # Add pricing if provided | |
| if mrp is not None or base_price is not None: | |
| doc["pricing"] = {} | |
| if mrp is not None: | |
| doc["pricing"]["mrp"] = mrp | |
| if base_price is not None: | |
| doc["pricing"]["retail_price"] = base_price | |
| doc["pricing"]["currency"] = "INR" | |
| # Add inventory | |
| doc["inventory"] = { | |
| "track_inventory": track_inventory, | |
| "unit": "PCS" | |
| } | |
| # Add procurement for batch management | |
| if batch_managed: | |
| doc["procurement"] = { | |
| "batch_managed": True | |
| } | |
| return doc | |
| async def setup_test_db(): | |
| """Setup test database and PostgreSQL connection""" | |
| # MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[TEST_DB_NAME] | |
| # PostgreSQL | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| database=settings.POSTGRES_DB, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD | |
| ) | |
| # Create schema and table | |
| await pg_conn.execute("CREATE SCHEMA IF NOT EXISTS trans") | |
| await pg_conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.catalogue_ref ( | |
| catalogue_id TEXT PRIMARY KEY, | |
| catalogue_type TEXT NOT NULL, | |
| catalogue_name TEXT NOT NULL, | |
| sku TEXT, | |
| barcode_number TEXT, | |
| hsn_code TEXT, | |
| gst_rate NUMERIC(5,2), | |
| mrp NUMERIC(12,2), | |
| base_price NUMERIC(12,2), | |
| track_inventory BOOLEAN, | |
| batch_managed BOOLEAN, | |
| status TEXT NOT NULL, | |
| created_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| return mongo_client, mongo_db, pg_conn | |
| async def teardown_test_db(mongo_client, mongo_db, pg_conn): | |
| """Cleanup test database and connections""" | |
| # Drop MongoDB test database | |
| await mongo_client.drop_database(TEST_DB_NAME) | |
| mongo_client.close() | |
| # Drop PostgreSQL test table | |
| await pg_conn.execute("DROP TABLE IF EXISTS trans.catalogue_ref") | |
| await pg_conn.close() | |
| def test_property_catalogue_sync_creates_matching_records( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| status, | |
| sku, | |
| barcode_number, | |
| hsn_code, | |
| gst_rate, | |
| mrp, | |
| base_price, | |
| track_inventory, | |
| batch_managed | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 4: Catalogue sync creates matching records | |
| For any catalogue created in MongoDB, syncing should result in a record in | |
| trans.catalogue_ref with matching catalogue_id and all required fields populated. | |
| Validates: Requirements 2.1, 2.5 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate catalogue document | |
| catalogue_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=status, | |
| sku=sku, | |
| barcode_number=barcode_number, | |
| hsn_code=hsn_code, | |
| gst_rate=gst_rate, | |
| mrp=mrp, | |
| base_price=base_price, | |
| track_inventory=track_inventory, | |
| batch_managed=batch_managed | |
| ) | |
| # Insert into MongoDB | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(catalogue_doc) | |
| # Sync to PostgreSQL | |
| handler = CatalogueSyncHandler() | |
| success = await handler.sync( | |
| entity_id=catalogue_id, | |
| mongo_db=mongo_db, | |
| pg_conn=pg_conn | |
| ) | |
| assert success, "Sync should succeed" | |
| # Verify record exists in PostgreSQL | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert pg_record is not None, "Record should exist in PostgreSQL" | |
| assert pg_record["catalogue_id"] == catalogue_id, "catalogue_id should match" | |
| assert pg_record["catalogue_name"] == catalogue_name, "catalogue_name should match" | |
| assert pg_record["catalogue_type"] == catalogue_type, "catalogue_type should match" | |
| assert pg_record["status"] == status, "status should match" | |
| assert pg_record["sku"] == sku, "sku should match" | |
| assert pg_record["barcode_number"] == barcode_number, "barcode_number should match" | |
| assert pg_record["hsn_code"] == hsn_code, "hsn_code should match" | |
| assert pg_record["track_inventory"] == track_inventory, "track_inventory should match" | |
| assert pg_record["batch_managed"] == batch_managed, "batch_managed should match" | |
| assert pg_record["created_at"] is not None, "created_at should be populated" | |
| # Check numeric precision | |
| if gst_rate is not None: | |
| assert pg_record["gst_rate"] == Decimal(str(gst_rate)).quantize(Decimal('0.01')), \ | |
| "gst_rate should match with correct precision" | |
| else: | |
| assert pg_record["gst_rate"] is None, "gst_rate should be NULL" | |
| if mrp is not None: | |
| assert pg_record["mrp"] == Decimal(str(mrp)).quantize(Decimal('0.01')), \ | |
| "mrp should match with correct precision" | |
| else: | |
| assert pg_record["mrp"] is None, "mrp should be NULL" | |
| if base_price is not None: | |
| assert pg_record["base_price"] == Decimal(str(base_price)).quantize(Decimal('0.01')), \ | |
| "base_price should match with correct precision" | |
| else: | |
| assert pg_record["base_price"] is None, "base_price should be NULL" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_catalogue_sync_updates_existing_records( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| initial_status, | |
| updated_status, | |
| initial_price, | |
| updated_price | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 5: Catalogue sync updates existing records | |
| For any catalogue updated in MongoDB, syncing should update the corresponding | |
| record in trans.catalogue_ref with the new values. | |
| Validates: Requirements 2.2 | |
| """ | |
| # Ensure there's a difference to test | |
| if initial_status == updated_status and initial_price == updated_price: | |
| return | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create initial catalogue document | |
| initial_time = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| initial_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=initial_status, | |
| mrp=initial_price, | |
| created_at=initial_time, | |
| updated_at=initial_time | |
| ) | |
| # Insert into MongoDB and sync | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(initial_doc) | |
| handler = CatalogueSyncHandler() | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify initial state | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert pg_record["status"] == initial_status, "Initial status should match" | |
| if initial_price is not None: | |
| assert pg_record["mrp"] == Decimal(str(initial_price)).quantize(Decimal('0.01')), \ | |
| "Initial mrp should match" | |
| # Update catalogue in MongoDB | |
| updated_time = initial_time + timedelta(seconds=1) | |
| update_fields = { | |
| "meta.status": updated_status, | |
| "meta.updated_at": updated_time | |
| } | |
| if updated_price is not None: | |
| update_fields["pricing.mrp"] = updated_price | |
| await collection.update_one( | |
| {"catalogue_id": catalogue_id}, | |
| {"$set": update_fields} | |
| ) | |
| # Sync again | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify updated state | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert pg_record["status"] == updated_status, "Status should be updated" | |
| if updated_price is not None: | |
| assert pg_record["mrp"] == Decimal(str(updated_price)).quantize(Decimal('0.01')), \ | |
| "MRP should be updated" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_numeric_precision_preservation( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| status, | |
| gst_rate, | |
| mrp, | |
| base_price | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 6: Numeric precision preservation | |
| For any catalogue with numeric fields (gst_rate, mrp, base_price), syncing | |
| should preserve the precision defined in PostgreSQL schema (5,2 for gst_rate, | |
| 12,2 for prices). | |
| Validates: Requirements 9.2 | |
| """ | |
| # Only test if at least one numeric field is present | |
| if gst_rate is None and mrp is None and base_price is None: | |
| return | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate catalogue with numeric fields | |
| catalogue_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=status, | |
| gst_rate=gst_rate, | |
| mrp=mrp, | |
| base_price=base_price | |
| ) | |
| # Insert and sync | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(catalogue_doc) | |
| handler = CatalogueSyncHandler() | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify numeric precision | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT gst_rate, mrp, base_price FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| if gst_rate is not None: | |
| expected_gst = Decimal(str(gst_rate)).quantize(Decimal('0.01')) | |
| assert pg_record["gst_rate"] == expected_gst, \ | |
| f"gst_rate precision should be (5,2): expected {expected_gst}, got {pg_record['gst_rate']}" | |
| # Verify it's within NUMERIC(5,2) bounds | |
| assert abs(pg_record["gst_rate"]) < 1000, "gst_rate should fit in NUMERIC(5,2)" | |
| if mrp is not None: | |
| expected_mrp = Decimal(str(mrp)).quantize(Decimal('0.01')) | |
| assert pg_record["mrp"] == expected_mrp, \ | |
| f"mrp precision should be (12,2): expected {expected_mrp}, got {pg_record['mrp']}" | |
| # Verify it's within NUMERIC(12,2) bounds | |
| assert abs(pg_record["mrp"]) < 10000000000, "mrp should fit in NUMERIC(12,2)" | |
| if base_price is not None: | |
| expected_base = Decimal(str(base_price)).quantize(Decimal('0.01')) | |
| assert pg_record["base_price"] == expected_base, \ | |
| f"base_price precision should be (12,2): expected {expected_base}, got {pg_record['base_price']}" | |
| # Verify it's within NUMERIC(12,2) bounds | |
| assert abs(pg_record["base_price"]) < 10000000000, "base_price should fit in NUMERIC(12,2)" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_idempotent_catalogue_sync( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| status, | |
| sync_count | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 11: Idempotent sync operations | |
| For any entity, calling sync multiple times with the same data should | |
| produce the same result in PostgreSQL (no duplicates, same final state). | |
| Validates: Requirements 6.1 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Generate catalogue document | |
| catalogue_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=status | |
| ) | |
| # Insert into MongoDB | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(catalogue_doc) | |
| # Sync multiple times | |
| handler = CatalogueSyncHandler() | |
| for _ in range(sync_count): | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify only one record exists | |
| count = await pg_conn.fetchval( | |
| "SELECT COUNT(1) FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert count == 1, f"Should have exactly 1 record, found {count}" | |
| # Verify data is correct | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT * FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert pg_record["catalogue_name"] == catalogue_name, "Data should remain consistent" | |
| assert pg_record["status"] == status, "Data should remain consistent" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_catalogue_upsert_behavior( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| initial_status, | |
| updated_status | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 12: Upsert behavior | |
| For any entity that already exists in PostgreSQL, syncing should update | |
| the existing record rather than creating a duplicate. | |
| Validates: Requirements 6.2 | |
| """ | |
| if initial_status == updated_status: | |
| return | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create and sync initial catalogue | |
| initial_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=initial_status | |
| ) | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(initial_doc) | |
| handler = CatalogueSyncHandler() | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Update and sync again | |
| await collection.update_one( | |
| {"catalogue_id": catalogue_id}, | |
| { | |
| "$set": { | |
| "meta.status": updated_status, | |
| "meta.updated_at": datetime.now(datetime.UTC).replace(tzinfo=None) | |
| } | |
| } | |
| ) | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify only one record exists with updated data | |
| count = await pg_conn.fetchval( | |
| "SELECT COUNT(1) FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert count == 1, "Should have exactly 1 record (upsert, not insert)" | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT status FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| assert pg_record["status"] == updated_status, "Record should be updated, not duplicated" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |
| def test_property_catalogue_timestamp_conflict_resolution( | |
| catalogue_id, | |
| catalogue_name, | |
| catalogue_type, | |
| status | |
| ): | |
| """ | |
| Feature: postgres-sync, Property 14: Timestamp-based conflict resolution | |
| For any entity where MongoDB updated_at is older than PostgreSQL updated_at, | |
| the sync should skip the update. | |
| Validates: Requirements 6.5 | |
| """ | |
| async def run_test(): | |
| mongo_client, mongo_db, pg_conn = await setup_test_db() | |
| try: | |
| # Create catalogue with newer timestamp | |
| newer_time = datetime.now(datetime.UTC).replace(tzinfo=None) | |
| newer_doc = generate_catalogue_document( | |
| catalogue_id=catalogue_id, | |
| catalogue_name=catalogue_name, | |
| catalogue_type=catalogue_type, | |
| status=status, | |
| created_at=newer_time, | |
| updated_at=newer_time | |
| ) | |
| # Insert and sync | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| await collection.insert_one(newer_doc) | |
| handler = CatalogueSyncHandler() | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify initial sync - need to check created_at since table doesn't have updated_at | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT created_at FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| initial_pg_created_at = pg_record["created_at"] | |
| # Try to sync with older timestamp | |
| older_time = newer_time - timedelta(hours=1) | |
| await collection.update_one( | |
| {"catalogue_id": catalogue_id}, | |
| {"$set": {"meta.updated_at": older_time}} | |
| ) | |
| await handler.sync(catalogue_id, mongo_db, pg_conn) | |
| # Verify PostgreSQL record didn't change (update was skipped) | |
| pg_record = await pg_conn.fetchrow( | |
| "SELECT created_at FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue_id | |
| ) | |
| final_pg_created_at = pg_record["created_at"] | |
| assert final_pg_created_at == initial_pg_created_at, \ | |
| "PostgreSQL record should not change when MongoDB timestamp is older" | |
| finally: | |
| await teardown_test_db(mongo_client, mongo_db, pg_conn) | |
| asyncio.run(run_test()) | |