Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Test script to validate pricing_levels sync from MongoDB to PostgreSQL. | |
| This script tests the sync functionality for the new pricing_levels JSON column | |
| to ensure data is properly transferred and stored. | |
| """ | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import asyncpg | |
| from insightfy_utils.logging import get_logger | |
| from app.core.config import get_settings | |
| from app.sync.catalogues.handler import CatalogueSyncHandler | |
| logger = get_logger(__name__) | |
| # Sample pricing_levels data for testing | |
| SAMPLE_PRICING_LEVELS = { | |
| "currency": "INR", | |
| "mrp": 399, | |
| "levels": { | |
| "retail": { | |
| "purchase_price": 260, | |
| "trade_margin": 20, | |
| "selling_price": 299, | |
| "retail_price": 349, | |
| "retail_margin": 30, | |
| "max_discount_pct": 15 | |
| }, | |
| "distributor": { | |
| "purchase_price": 230, | |
| "trade_margin": 25, | |
| "selling_price": 279, | |
| "retail_price": 349, | |
| "retail_margin": 30, | |
| "max_discount_pct": 20 | |
| } | |
| } | |
| } | |
| SAMPLE_CATALOGUE = { | |
| "catalogue_id": "test-pricing-levels-001", | |
| "catalogue_code": "TPL-001-000001", | |
| "catalogue_name": "Test Pricing Levels Product", | |
| "catalogue_type": "Product", | |
| "category": "Test", | |
| "brand": "TestBrand", | |
| "identifiers": { | |
| "sku": "TPL001", | |
| "barcode_number": "1234567890123" | |
| }, | |
| "pricing_levels": SAMPLE_PRICING_LEVELS, | |
| "tax": { | |
| "hsn_code": "33049900", | |
| "gst_rate": 18.0 | |
| }, | |
| "inventory": { | |
| "track_inventory": True | |
| }, | |
| "procurement": { | |
| "batch_managed": False | |
| }, | |
| "meta": { | |
| "status": "Active", | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| } | |
| async def setup_test_data(): | |
| """Insert test catalogue with pricing_levels into MongoDB.""" | |
| settings = get_settings() | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db["catalogues"] | |
| try: | |
| # Insert or update test catalogue | |
| await collection.replace_one( | |
| {"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]}, | |
| SAMPLE_CATALOGUE, | |
| upsert=True | |
| ) | |
| logger.info(f"Test catalogue inserted/updated in MongoDB: {SAMPLE_CATALOGUE['catalogue_id']}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to setup test data: {e}") | |
| return False | |
| finally: | |
| mongo_client.close() | |
| async def test_pricing_levels_sync(): | |
| """Test the sync of pricing_levels from MongoDB to PostgreSQL.""" | |
| settings = get_settings() | |
| # Setup MongoDB connection | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| # Setup PostgreSQL connection | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| database=settings.POSTGRES_DB | |
| ) | |
| try: | |
| # Initialize sync handler | |
| handler = CatalogueSyncHandler() | |
| # Fetch from MongoDB | |
| catalogue = await handler.fetch_from_mongodb( | |
| SAMPLE_CATALOGUE["catalogue_id"], | |
| mongo_db | |
| ) | |
| if not catalogue: | |
| logger.error("Test catalogue not found in MongoDB") | |
| return False | |
| logger.info("Fetched catalogue from MongoDB") | |
| logger.info(f"Pricing levels: {json.dumps(catalogue.get('pricing_levels'), indent=2)}") | |
| # Validate required fields | |
| if not handler.validate_required_fields(catalogue): | |
| logger.error("Catalogue validation failed") | |
| return False | |
| # Test sync to PostgreSQL | |
| success = await handler.upsert_to_postgres(catalogue, pg_conn) | |
| if not success: | |
| logger.error("Failed to sync catalogue to PostgreSQL") | |
| return False | |
| logger.info("Successfully synced catalogue to PostgreSQL") | |
| # Verify data in PostgreSQL | |
| query = """ | |
| SELECT catalogue_id, catalogue_name, pricing_levels | |
| FROM trans.catalogue_ref | |
| WHERE catalogue_id = $1 | |
| """ | |
| result = await pg_conn.fetchrow(query, SAMPLE_CATALOGUE["catalogue_id"]) | |
| if not result: | |
| logger.error("Catalogue not found in PostgreSQL after sync") | |
| return False | |
| logger.info("Catalogue found in PostgreSQL:") | |
| logger.info(f"ID: {result['catalogue_id']}") | |
| logger.info(f"Name: {result['catalogue_name']}") | |
| logger.info(f"Pricing levels: {json.dumps(result['pricing_levels'], indent=2)}") | |
| # Validate pricing_levels structure | |
| pg_pricing_levels = result['pricing_levels'] | |
| if not pg_pricing_levels: | |
| logger.error("pricing_levels is null in PostgreSQL") | |
| return False | |
| # Check key fields | |
| expected_fields = ['currency', 'mrp', 'levels'] | |
| for field in expected_fields: | |
| if field not in pg_pricing_levels: | |
| logger.error(f"Missing field '{field}' in pricing_levels") | |
| return False | |
| # Check levels structure | |
| levels = pg_pricing_levels.get('levels', {}) | |
| expected_levels = ['retail', 'distributor'] | |
| for level in expected_levels: | |
| if level not in levels: | |
| logger.error(f"Missing pricing level '{level}'") | |
| return False | |
| level_data = levels[level] | |
| expected_level_fields = ['purchase_price', 'trade_margin', 'selling_price', 'retail_price', 'retail_margin', 'max_discount_pct'] | |
| for level_field in expected_level_fields: | |
| if level_field not in level_data: | |
| logger.error(f"Missing field '{level_field}' in {level} level") | |
| return False | |
| logger.info("β All pricing_levels validation checks passed!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Test failed with error: {e}") | |
| return False | |
| finally: | |
| mongo_client.close() | |
| await pg_conn.close() | |
| async def cleanup_test_data(): | |
| """Clean up test data from both MongoDB and PostgreSQL.""" | |
| settings = get_settings() | |
| # Clean MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URL) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db["catalogues"] | |
| try: | |
| await collection.delete_one({"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]}) | |
| logger.info("Test catalogue removed from MongoDB") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean MongoDB: {e}") | |
| finally: | |
| mongo_client.close() | |
| # Clean PostgreSQL | |
| try: | |
| pg_conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD, | |
| database=settings.POSTGRES_DB | |
| ) | |
| await pg_conn.execute( | |
| "DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1", | |
| SAMPLE_CATALOGUE["catalogue_id"] | |
| ) | |
| logger.info("Test catalogue removed from PostgreSQL") | |
| await pg_conn.close() | |
| except Exception as e: | |
| logger.warning(f"Failed to clean PostgreSQL: {e}") | |
| async def main(): | |
| """Main test function.""" | |
| logger.info("π§ͺ Starting pricing_levels sync test") | |
| try: | |
| # Setup test data | |
| logger.info("π Setting up test data...") | |
| if not await setup_test_data(): | |
| logger.error("β Failed to setup test data") | |
| return | |
| # Run sync test | |
| logger.info("π Testing pricing_levels sync...") | |
| success = await test_pricing_levels_sync() | |
| if success: | |
| logger.info("β Pricing levels sync test PASSED!") | |
| else: | |
| logger.error("β Pricing levels sync test FAILED!") | |
| finally: | |
| # Cleanup | |
| logger.info("π§Ή Cleaning up test data...") | |
| await cleanup_test_data() | |
| logger.info("π Test completed") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |