#!/usr/bin/env python3 """ Test script to validate pricing_levels sync from MongoDB to PostgreSQL. This script tests the sync functionality for the new pricing_levels JSON column to ensure data is properly transferred and stored. """ import asyncio import json from datetime import datetime from motor.motor_asyncio import AsyncIOMotorClient import asyncpg from insightfy_utils.logging import get_logger from app.core.config import get_settings from app.sync.catalogues.handler import CatalogueSyncHandler logger = get_logger(__name__) # Sample pricing_levels data for testing SAMPLE_PRICING_LEVELS = { "currency": "INR", "mrp": 399, "levels": { "retail": { "purchase_price": 260, "trade_margin": 20, "selling_price": 299, "retail_price": 349, "retail_margin": 30, "max_discount_pct": 15 }, "distributor": { "purchase_price": 230, "trade_margin": 25, "selling_price": 279, "retail_price": 349, "retail_margin": 30, "max_discount_pct": 20 } } } SAMPLE_CATALOGUE = { "catalogue_id": "test-pricing-levels-001", "catalogue_code": "TPL-001-000001", "catalogue_name": "Test Pricing Levels Product", "catalogue_type": "Product", "category": "Test", "brand": "TestBrand", "identifiers": { "sku": "TPL001", "barcode_number": "1234567890123" }, "pricing_levels": SAMPLE_PRICING_LEVELS, "tax": { "hsn_code": "33049900", "gst_rate": 18.0 }, "inventory": { "track_inventory": True }, "procurement": { "batch_managed": False }, "meta": { "status": "Active", "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } } async def setup_test_data(): """Insert test catalogue with pricing_levels into MongoDB.""" settings = get_settings() # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db["catalogues"] try: # Insert or update test catalogue await collection.replace_one( {"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]}, SAMPLE_CATALOGUE, upsert=True ) logger.info(f"Test catalogue inserted/updated in MongoDB: {SAMPLE_CATALOGUE['catalogue_id']}") return True except Exception as e: logger.error(f"Failed to setup test data: {e}") return False finally: mongo_client.close() async def test_pricing_levels_sync(): """Test the sync of pricing_levels from MongoDB to PostgreSQL.""" settings = get_settings() # Setup MongoDB connection mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] # Setup PostgreSQL connection pg_conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) try: # Initialize sync handler handler = CatalogueSyncHandler() # Fetch from MongoDB catalogue = await handler.fetch_from_mongodb( SAMPLE_CATALOGUE["catalogue_id"], mongo_db ) if not catalogue: logger.error("Test catalogue not found in MongoDB") return False logger.info("Fetched catalogue from MongoDB") logger.info(f"Pricing levels: {json.dumps(catalogue.get('pricing_levels'), indent=2)}") # Validate required fields if not handler.validate_required_fields(catalogue): logger.error("Catalogue validation failed") return False # Test sync to PostgreSQL success = await handler.upsert_to_postgres(catalogue, pg_conn) if not success: logger.error("Failed to sync catalogue to PostgreSQL") return False logger.info("Successfully synced catalogue to PostgreSQL") # Verify data in PostgreSQL query = """ SELECT catalogue_id, catalogue_name, pricing_levels FROM trans.catalogue_ref WHERE catalogue_id = $1 """ result = await pg_conn.fetchrow(query, SAMPLE_CATALOGUE["catalogue_id"]) if not result: logger.error("Catalogue not found in PostgreSQL after sync") return False logger.info("Catalogue found in PostgreSQL:") logger.info(f"ID: {result['catalogue_id']}") logger.info(f"Name: {result['catalogue_name']}") logger.info(f"Pricing levels: {json.dumps(result['pricing_levels'], indent=2)}") # Validate pricing_levels structure pg_pricing_levels = result['pricing_levels'] if not pg_pricing_levels: logger.error("pricing_levels is null in PostgreSQL") return False # Check key fields expected_fields = ['currency', 'mrp', 'levels'] for field in expected_fields: if field not in pg_pricing_levels: logger.error(f"Missing field '{field}' in pricing_levels") return False # Check levels structure levels = pg_pricing_levels.get('levels', {}) expected_levels = ['retail', 'distributor'] for level in expected_levels: if level not in levels: logger.error(f"Missing pricing level '{level}'") return False level_data = levels[level] expected_level_fields = ['purchase_price', 'trade_margin', 'selling_price', 'retail_price', 'retail_margin', 'max_discount_pct'] for level_field in expected_level_fields: if level_field not in level_data: logger.error(f"Missing field '{level_field}' in {level} level") return False logger.info("โœ… All pricing_levels validation checks passed!") return True except Exception as e: logger.error(f"Test failed with error: {e}") return False finally: mongo_client.close() await pg_conn.close() async def cleanup_test_data(): """Clean up test data from both MongoDB and PostgreSQL.""" settings = get_settings() # Clean MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db["catalogues"] try: await collection.delete_one({"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]}) logger.info("Test catalogue removed from MongoDB") except Exception as e: logger.warning(f"Failed to clean MongoDB: {e}") finally: mongo_client.close() # Clean PostgreSQL try: pg_conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) await pg_conn.execute( "DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1", SAMPLE_CATALOGUE["catalogue_id"] ) logger.info("Test catalogue removed from PostgreSQL") await pg_conn.close() except Exception as e: logger.warning(f"Failed to clean PostgreSQL: {e}") async def main(): """Main test function.""" logger.info("๐Ÿงช Starting pricing_levels sync test") try: # Setup test data logger.info("๐Ÿ“ Setting up test data...") if not await setup_test_data(): logger.error("โŒ Failed to setup test data") return # Run sync test logger.info("๐Ÿ”„ Testing pricing_levels sync...") success = await test_pricing_levels_sync() if success: logger.info("โœ… Pricing levels sync test PASSED!") else: logger.error("โŒ Pricing levels sync test FAILED!") finally: # Cleanup logger.info("๐Ÿงน Cleaning up test data...") await cleanup_test_data() logger.info("๐Ÿ Test completed") if __name__ == "__main__": asyncio.run(main())