#!/usr/bin/env python3 """ Complete test script for pricing_levels functionality. This script tests: 1. Migration of existing catalogues without pricing_levels 2. Sync handler automatic generation 3. Validation of results 4. PostgreSQL storage and retrieval Usage: python test_complete_pricing_levels_flow.py """ import asyncio import json from datetime import datetime from motor.motor_asyncio import AsyncIOMotorClient import asyncpg from insightfy_utils.logging import get_logger from app.core.config import get_settings from app.sync.catalogues.handler import CatalogueSyncHandler from app.catalogues.constants import SCM_CATALOGUE_COLLECTION logger = get_logger(__name__) # Test catalogues with different scenarios TEST_CATALOGUES = [ { "catalogue_id": "test-complete-001", "catalogue_code": "TCF-001-000001", "catalogue_name": "Test Product with Existing Pricing Levels", "catalogue_type": "Product", "category": "Test", "brand": "TestBrand", "identifiers": { "sku": "TCF001", "barcode_number": "1111111111111" }, "pricing_levels": { "currency": "INR", "mrp": 500, "levels": { "ncnf": { "purchase_price": 308, "trade_margin": 15, "selling_price": 440, "retail_price": 440, "retail_margin": 25, "max_discount_pct": 5 }, "cnf": { "purchase_price": 329, "trade_margin": 18, "selling_price": 470, "retail_price": 470, "retail_margin": 28, "max_discount_pct": 8 }, "distributor": { "purchase_price": 315, "trade_margin": 20, "selling_price": 450, "retail_price": 450, "retail_margin": 30, "max_discount_pct": 15 }, "retail": { "purchase_price": 280, "trade_margin": 25, "selling_price": 400, "retail_price": 400, "retail_margin": 35, "max_discount_pct": 20 } } }, "tax": { "hsn_code": "33049900", "gst_rate": 18.0 }, "inventory": { "track_inventory": True }, "meta": { "status": "Active", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } }, { "catalogue_id": "test-complete-002", "catalogue_code": "TCF-002-000002", "catalogue_name": "Test Product without Pricing Levels (Legacy)", "catalogue_type": "Product", "category": "Test", "brand": "TestBrand", "identifiers": { "sku": "TCF002", "barcode_number": "2222222222222" }, "pricing": { "mrp": 300, "retail_price": 250, "trade_margin": 20, "retail_margin": 25, "currency": "INR" }, "tax": { "hsn_code": "33049900", "gst_rate": 18.0 }, "inventory": { "track_inventory": True }, "meta": { "status": "Active", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } }, { "catalogue_id": "test-complete-003", "catalogue_code": "TCF-003-000003", "catalogue_name": "Test Product with Top-level MRP", "catalogue_type": "Product", "category": "Test", "brand": "TestBrand", "identifiers": { "sku": "TCF003", "barcode_number": "3333333333333" }, "mrp": 150, # Top-level MRP "tax": { "hsn_code": "33049900", "gst_rate": 18.0 }, "inventory": { "track_inventory": True }, "meta": { "status": "Active", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } } ] async def setup_test_catalogues(): """Insert test catalogues into MongoDB.""" settings = get_settings() # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: for catalogue in TEST_CATALOGUES: await collection.replace_one( {"catalogue_id": catalogue["catalogue_id"]}, catalogue, upsert=True ) logger.info(f"โœ… Setup test catalogue: {catalogue['catalogue_name']}") logger.info(f"Setup {len(TEST_CATALOGUES)} test catalogues in MongoDB") return True except Exception as e: logger.error(f"Failed to setup test catalogues: {e}") return False finally: mongo_client.close() async def test_sync_handler_generation(): """Test that sync handler generates pricing_levels for catalogues without them.""" settings = get_settings() # Setup connections mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] pg_conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) try: handler = CatalogueSyncHandler() results = [] for catalogue_data in TEST_CATALOGUES: catalogue_id = catalogue_data["catalogue_id"] catalogue_name = catalogue_data["catalogue_name"] logger.info(f"\n๐Ÿ”„ Testing sync for: {catalogue_name}") # Fetch from MongoDB catalogue = await handler.fetch_from_mongodb(catalogue_id, mongo_db) if not catalogue: logger.error(f"โŒ Catalogue not found: {catalogue_id}") continue # Check if it has pricing_levels before sync has_pricing_levels_before = "pricing_levels" in catalogue and catalogue["pricing_levels"] logger.info(f" Has pricing_levels before sync: {has_pricing_levels_before}") # Extract nested fields (this is where generation happens) nested_fields = handler.extract_nested_fields(catalogue) # Check if pricing_levels was generated has_pricing_levels_after = nested_fields.get("pricing_levels") is not None logger.info(f" Has pricing_levels after extraction: {has_pricing_levels_after}") if has_pricing_levels_after: pricing_levels = nested_fields["pricing_levels"] logger.info(f" MRP: โ‚น{pricing_levels.get('mrp', 'N/A')}") logger.info(f" Levels: {list(pricing_levels.get('levels', {}).keys())}") # Show sample pricing levels = pricing_levels.get("levels", {}) for level_name in ["ncnf", "cnf", "distributor", "retail"]: if level_name in levels: selling_price = levels[level_name].get("selling_price", 0) logger.info(f" {level_name}: โ‚น{selling_price}") # Test sync to PostgreSQL success = await handler.upsert_to_postgres(catalogue, pg_conn) results.append({ "catalogue_id": catalogue_id, "catalogue_name": catalogue_name, "had_pricing_levels_before": has_pricing_levels_before, "has_pricing_levels_after": has_pricing_levels_after, "sync_success": success }) if success: logger.info(f" โœ… Sync successful") else: logger.error(f" โŒ Sync failed") return results except Exception as e: logger.error(f"Sync test failed: {e}") raise finally: mongo_client.close() await pg_conn.close() async def verify_postgresql_data(): """Verify that data is correctly stored in PostgreSQL.""" settings = get_settings() pg_conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) try: logger.info("\n๐Ÿ” Verifying PostgreSQL data...") for catalogue_data in TEST_CATALOGUES: catalogue_id = catalogue_data["catalogue_id"] catalogue_name = catalogue_data["catalogue_name"] # Query PostgreSQL query = """ SELECT catalogue_id, catalogue_name, mrp, pricing_levels FROM trans.catalogue_ref WHERE catalogue_id = $1 """ result = await pg_conn.fetchrow(query, catalogue_id) if not result: logger.error(f"โŒ {catalogue_name}: Not found in PostgreSQL") continue logger.info(f"\n๐Ÿ“Š {catalogue_name}:") logger.info(f" ID: {result['catalogue_id']}") logger.info(f" MRP: โ‚น{result['mrp']}") pricing_levels = result['pricing_levels'] if pricing_levels: logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}") logger.info(f" Pricing Levels MRP: โ‚น{pricing_levels.get('mrp', 'N/A')}") levels = pricing_levels.get("levels", {}) logger.info(f" Available Levels: {list(levels.keys())}") for level_name in ["ncnf", "cnf", "distributor", "retail"]: if level_name in levels: level_data = levels[level_name] selling_price = level_data.get("selling_price", 0) max_discount = level_data.get("max_discount_pct", 0) logger.info(f" {level_name}: โ‚น{selling_price} (max discount: {max_discount}%)") logger.info(" โœ… Pricing levels stored correctly") else: logger.warning(" โš ๏ธ No pricing_levels in PostgreSQL") return True except Exception as e: logger.error(f"PostgreSQL verification failed: {e}") return False finally: await pg_conn.close() async def test_pricing_calculations(): """Test that pricing calculations are correct.""" logger.info("\n๐Ÿงฎ Testing pricing calculations...") handler = CatalogueSyncHandler() test_cases = [ {"mrp": 100, "expected_retail": 80}, # 100 - 20% = 80 {"mrp": 500, "expected_retail": 400}, # 500 - 20% = 400 {"mrp": 299, "expected_retail": 239.20}, # 299 - 20% = 239.20 ] for test_case in test_cases: mrp = test_case["mrp"] expected_retail = test_case["expected_retail"] pricing_levels = handler._generate_pricing_levels(mrp) if not pricing_levels: logger.error(f"โŒ Failed to generate pricing_levels for MRP โ‚น{mrp}") continue retail_price = pricing_levels["levels"]["retail"]["selling_price"] # Allow small rounding differences if abs(retail_price - expected_retail) < 0.01: logger.info(f"โœ… MRP โ‚น{mrp} โ†’ retail โ‚น{retail_price} (expected โ‚น{expected_retail})") else: logger.error(f"โŒ MRP โ‚น{mrp} โ†’ retail โ‚น{retail_price} (expected โ‚น{expected_retail})") return True async def cleanup_test_data(): """Clean up test data from both MongoDB and PostgreSQL.""" settings = get_settings() # Clean MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: for catalogue in TEST_CATALOGUES: await collection.delete_one({"catalogue_id": catalogue["catalogue_id"]}) logger.info("๐Ÿงน Cleaned test catalogues from MongoDB") except Exception as e: logger.warning(f"Failed to clean MongoDB: {e}") finally: mongo_client.close() # Clean PostgreSQL try: pg_conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) for catalogue in TEST_CATALOGUES: await pg_conn.execute( "DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1", catalogue["catalogue_id"] ) logger.info("๐Ÿงน Cleaned test catalogues from PostgreSQL") await pg_conn.close() except Exception as e: logger.warning(f"Failed to clean PostgreSQL: {e}") async def main(): """Main test function.""" logger.info("๐Ÿš€ Starting complete pricing_levels flow test") logger.info("=" * 60) try: # 1. Setup test data logger.info("๐Ÿ“ Setting up test catalogues...") if not await setup_test_catalogues(): logger.error("โŒ Failed to setup test data") return # 2. Test pricing calculations await test_pricing_calculations() # 3. Test sync handler logger.info("\n๐Ÿ”„ Testing sync handler...") sync_results = await test_sync_handler_generation() # 4. Verify PostgreSQL data await verify_postgresql_data() # 5. Summary logger.info("\n" + "=" * 60) logger.info("TEST SUMMARY") logger.info("=" * 60) for result in sync_results: status = "โœ…" if result["sync_success"] else "โŒ" generated = "๐Ÿ”ง Generated" if not result["had_pricing_levels_before"] and result["has_pricing_levels_after"] else "๐Ÿ“‹ Existing" logger.info(f"{status} {result['catalogue_name']}: {generated}") success_count = sum(1 for r in sync_results if r["sync_success"]) logger.info(f"\nSuccessful syncs: {success_count}/{len(sync_results)}") if success_count == len(sync_results): logger.info("๐ŸŽ‰ All tests PASSED!") else: logger.error("โŒ Some tests FAILED!") except Exception as e: logger.error(f"Test failed: {e}") finally: # Cleanup logger.info("\n๐Ÿงน Cleaning up test data...") await cleanup_test_data() logger.info("๐Ÿ Test completed") if __name__ == "__main__": asyncio.run(main())