Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Complete test script for pricing_levels functionality. | |
| This script tests: | |
| 1. Migration of existing catalogues without pricing_levels | |
| 2. Sync handler automatic generation | |
| 3. Validation of results | |
| 4. PostgreSQL storage and retrieval | |
| Usage: | |
| python test_complete_pricing_levels_flow.py | |
| """ | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import asyncpg | |
| from insightfy_utils.logging import get_logger | |
| from app.core.config import get_settings | |
| from app.sync.catalogues.handler import CatalogueSyncHandler | |
| from app.catalogues.constants import SCM_CATALOGUE_COLLECTION | |
| logger = get_logger(__name__) | |
| # Test catalogues with different scenarios | |
| TEST_CATALOGUES = [ | |
| { | |
| "catalogue_id": "test-complete-001", | |
| "catalogue_code": "TCF-001-000001", | |
| "catalogue_name": "Test Product with Existing Pricing Levels", | |
| "catalogue_type": "Product", | |
| "category": "Test", | |
| "brand": "TestBrand", | |
| "identifiers": { | |
| "sku": "TCF001", | |
| "barcode_number": "1111111111111" | |
| }, | |
| "pricing_levels": { | |
| "currency": "INR", | |
| "mrp": 500, | |
| "levels": { | |
| "ncnf": { | |
| "purchase_price": 308, | |
| "trade_margin": 15, | |
| "selling_price": 440, | |
| "retail_price": 440, | |
| "retail_margin": 25, | |
| "max_discount_pct": 5 | |
| }, | |
| "cnf": { | |
| "purchase_price": 329, | |
| "trade_margin": 18, | |
| "selling_price": 470, | |
| "retail_price": 470, | |
| "retail_margin": 28, | |
| "max_discount_pct": 8 | |
| }, | |
| "distributor": { | |
| "purchase_price": 315, | |
| "trade_margin": 20, | |
| "selling_price": 450, | |
| "retail_price": 450, | |
| "retail_margin": 30, | |
| "max_discount_pct": 15 | |
| }, | |
| "retail": { | |
| "purchase_price": 280, | |
| "trade_margin": 25, | |
| "selling_price": 400, | |
| "retail_price": 400, | |
| "retail_margin": 35, | |
| "max_discount_pct": 20 | |
| } | |
| } | |
| }, | |
| "tax": { | |
| "hsn_code": "33049900", | |
| "gst_rate": 18.0 | |
| }, | |
| "inventory": { | |
| "track_inventory": True | |
| }, | |
| "meta": { | |
| "status": "Active", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| }, | |
| { | |
| "catalogue_id": "test-complete-002", | |
| "catalogue_code": "TCF-002-000002", | |
| "catalogue_name": "Test Product without Pricing Levels (Legacy)", | |
| "catalogue_type": "Product", | |
| "category": "Test", | |
| "brand": "TestBrand", | |
| "identifiers": { | |
| "sku": "TCF002", | |
| "barcode_number": "2222222222222" | |
| }, | |
| "pricing": { | |
| "mrp": 300, | |
| "retail_price": 250, | |
| "trade_margin": 20, | |
| "retail_margin": 25, | |
| "currency": "INR" | |
| }, | |
| "tax": { | |
| "hsn_code": "33049900", | |
| "gst_rate": 18.0 | |
| }, | |
| "inventory": { | |
| "track_inventory": True | |
| }, | |
| "meta": { | |
| "status": "Active", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| }, | |
| { | |
| "catalogue_id": "test-complete-003", | |
| "catalogue_code": "TCF-003-000003", | |
| "catalogue_name": "Test Product with Top-level MRP", | |
| "catalogue_type": "Product", | |
| "category": "Test", | |
| "brand": "TestBrand", | |
| "identifiers": { | |
| "sku": "TCF003", | |
| "barcode_number": "3333333333333" | |
| }, | |
| "mrp": 150, # Top-level MRP | |
| "tax": { | |
| "hsn_code": "33049900", | |
| "gst_rate": 18.0 | |
| }, | |
| "inventory": { | |
| "track_inventory": True | |
| }, | |
| "meta": { | |
| "status": "Active", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| ] | |
| async def setup_test_catalogues(): | |
| """Insert test catalogues into MongoDB.""" | |
| settings = get_settings() | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| for catalogue in TEST_CATALOGUES: | |
| await collection.replace_one( | |
| {"catalogue_id": catalogue["catalogue_id"]}, | |
| catalogue, | |
| upsert=True | |
| ) | |
| logger.info(f"โ Setup test catalogue: {catalogue['catalogue_name']}") | |
| logger.info(f"Setup {len(TEST_CATALOGUES)} test catalogues in MongoDB") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to setup test catalogues: {e}") | |
| return False | |
| finally: | |
| mongo_client.close() | |
| async def test_sync_handler_generation(): | |
| """Test that sync handler generates pricing_levels for catalogues without them.""" | |
| settings = get_settings() | |
| # Setup connections | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| database=settings.POSTGRES_DB | |
| ) | |
| try: | |
| handler = CatalogueSyncHandler() | |
| results = [] | |
| for catalogue_data in TEST_CATALOGUES: | |
| catalogue_id = catalogue_data["catalogue_id"] | |
| catalogue_name = catalogue_data["catalogue_name"] | |
| logger.info(f"\n๐ Testing sync for: {catalogue_name}") | |
| # Fetch from MongoDB | |
| catalogue = await handler.fetch_from_mongodb(catalogue_id, mongo_db) | |
| if not catalogue: | |
| logger.error(f"โ Catalogue not found: {catalogue_id}") | |
| continue | |
| # Check if it has pricing_levels before sync | |
| has_pricing_levels_before = "pricing_levels" in catalogue and catalogue["pricing_levels"] | |
| logger.info(f" Has pricing_levels before sync: {has_pricing_levels_before}") | |
| # Extract nested fields (this is where generation happens) | |
| nested_fields = handler.extract_nested_fields(catalogue) | |
| # Check if pricing_levels was generated | |
| has_pricing_levels_after = nested_fields.get("pricing_levels") is not None | |
| logger.info(f" Has pricing_levels after extraction: {has_pricing_levels_after}") | |
| if has_pricing_levels_after: | |
| pricing_levels = nested_fields["pricing_levels"] | |
| logger.info(f" MRP: โน{pricing_levels.get('mrp', 'N/A')}") | |
| logger.info(f" Levels: {list(pricing_levels.get('levels', {}).keys())}") | |
| # Show sample pricing | |
| levels = pricing_levels.get("levels", {}) | |
| for level_name in ["ncnf", "cnf", "distributor", "retail"]: | |
| if level_name in levels: | |
| selling_price = levels[level_name].get("selling_price", 0) | |
| logger.info(f" {level_name}: โน{selling_price}") | |
| # Test sync to PostgreSQL | |
| success = await handler.upsert_to_postgres(catalogue, pg_conn) | |
| results.append({ | |
| "catalogue_id": catalogue_id, | |
| "catalogue_name": catalogue_name, | |
| "had_pricing_levels_before": has_pricing_levels_before, | |
| "has_pricing_levels_after": has_pricing_levels_after, | |
| "sync_success": success | |
| }) | |
| if success: | |
| logger.info(f" โ Sync successful") | |
| else: | |
| logger.error(f" โ Sync failed") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Sync test failed: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| await pg_conn.close() | |
| async def verify_postgresql_data(): | |
| """Verify that data is correctly stored in PostgreSQL.""" | |
| settings = get_settings() | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| database=settings.POSTGRES_DB | |
| ) | |
| try: | |
| logger.info("\n๐ Verifying PostgreSQL data...") | |
| for catalogue_data in TEST_CATALOGUES: | |
| catalogue_id = catalogue_data["catalogue_id"] | |
| catalogue_name = catalogue_data["catalogue_name"] | |
| # Query PostgreSQL | |
| query = """ | |
| SELECT catalogue_id, catalogue_name, mrp, pricing_levels | |
| FROM trans.catalogue_ref | |
| WHERE catalogue_id = $1 | |
| """ | |
| result = await pg_conn.fetchrow(query, catalogue_id) | |
| if not result: | |
| logger.error(f"โ {catalogue_name}: Not found in PostgreSQL") | |
| continue | |
| logger.info(f"\n๐ {catalogue_name}:") | |
| logger.info(f" ID: {result['catalogue_id']}") | |
| logger.info(f" MRP: โน{result['mrp']}") | |
| pricing_levels = result['pricing_levels'] | |
| if pricing_levels: | |
| logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}") | |
| logger.info(f" Pricing Levels MRP: โน{pricing_levels.get('mrp', 'N/A')}") | |
| levels = pricing_levels.get("levels", {}) | |
| logger.info(f" Available Levels: {list(levels.keys())}") | |
| for level_name in ["ncnf", "cnf", "distributor", "retail"]: | |
| if level_name in levels: | |
| level_data = levels[level_name] | |
| selling_price = level_data.get("selling_price", 0) | |
| max_discount = level_data.get("max_discount_pct", 0) | |
| logger.info(f" {level_name}: โน{selling_price} (max discount: {max_discount}%)") | |
| logger.info(" โ Pricing levels stored correctly") | |
| else: | |
| logger.warning(" โ ๏ธ No pricing_levels in PostgreSQL") | |
| return True | |
| except Exception as e: | |
| logger.error(f"PostgreSQL verification failed: {e}") | |
| return False | |
| finally: | |
| await pg_conn.close() | |
| async def test_pricing_calculations(): | |
| """Test that pricing calculations are correct.""" | |
| logger.info("\n๐งฎ Testing pricing calculations...") | |
| handler = CatalogueSyncHandler() | |
| test_cases = [ | |
| {"mrp": 100, "expected_retail": 80}, # 100 - 20% = 80 | |
| {"mrp": 500, "expected_retail": 400}, # 500 - 20% = 400 | |
| {"mrp": 299, "expected_retail": 239.20}, # 299 - 20% = 239.20 | |
| ] | |
| for test_case in test_cases: | |
| mrp = test_case["mrp"] | |
| expected_retail = test_case["expected_retail"] | |
| pricing_levels = handler._generate_pricing_levels(mrp) | |
| if not pricing_levels: | |
| logger.error(f"โ Failed to generate pricing_levels for MRP โน{mrp}") | |
| continue | |
| retail_price = pricing_levels["levels"]["retail"]["selling_price"] | |
| # Allow small rounding differences | |
| if abs(retail_price - expected_retail) < 0.01: | |
| logger.info(f"โ MRP โน{mrp} โ retail โน{retail_price} (expected โน{expected_retail})") | |
| else: | |
| logger.error(f"โ MRP โน{mrp} โ retail โน{retail_price} (expected โน{expected_retail})") | |
| return True | |
| async def cleanup_test_data(): | |
| """Clean up test data from both MongoDB and PostgreSQL.""" | |
| settings = get_settings() | |
| # Clean MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| for catalogue in TEST_CATALOGUES: | |
| await collection.delete_one({"catalogue_id": catalogue["catalogue_id"]}) | |
| logger.info("๐งน Cleaned test catalogues from MongoDB") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean MongoDB: {e}") | |
| finally: | |
| mongo_client.close() | |
| # Clean PostgreSQL | |
| try: | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| database=settings.POSTGRES_DB | |
| ) | |
| for catalogue in TEST_CATALOGUES: | |
| await pg_conn.execute( | |
| "DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| catalogue["catalogue_id"] | |
| ) | |
| logger.info("๐งน Cleaned test catalogues from PostgreSQL") | |
| await pg_conn.close() | |
| except Exception as e: | |
| logger.warning(f"Failed to clean PostgreSQL: {e}") | |
| async def main(): | |
| """Main test function.""" | |
| logger.info("๐ Starting complete pricing_levels flow test") | |
| logger.info("=" * 60) | |
| try: | |
| # 1. Setup test data | |
| logger.info("๐ Setting up test catalogues...") | |
| if not await setup_test_catalogues(): | |
| logger.error("โ Failed to setup test data") | |
| return | |
| # 2. Test pricing calculations | |
| await test_pricing_calculations() | |
| # 3. Test sync handler | |
| logger.info("\n๐ Testing sync handler...") | |
| sync_results = await test_sync_handler_generation() | |
| # 4. Verify PostgreSQL data | |
| await verify_postgresql_data() | |
| # 5. Summary | |
| logger.info("\n" + "=" * 60) | |
| logger.info("TEST SUMMARY") | |
| logger.info("=" * 60) | |
| for result in sync_results: | |
| status = "โ " if result["sync_success"] else "โ" | |
| generated = "๐ง Generated" if not result["had_pricing_levels_before"] and result["has_pricing_levels_after"] else "๐ Existing" | |
| logger.info(f"{status} {result['catalogue_name']}: {generated}") | |
| success_count = sum(1 for r in sync_results if r["sync_success"]) | |
| logger.info(f"\nSuccessful syncs: {success_count}/{len(sync_results)}") | |
| if success_count == len(sync_results): | |
| logger.info("๐ All tests PASSED!") | |
| else: | |
| logger.error("โ Some tests FAILED!") | |
| except Exception as e: | |
| logger.error(f"Test failed: {e}") | |
| finally: | |
| # Cleanup | |
| logger.info("\n๐งน Cleaning up test data...") | |
| await cleanup_test_data() | |
| logger.info("๐ Test completed") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |