cuatrolabs-scm-ms / tests /test_pricing_levels_sync.py
MukeshKapoor25's picture
feat(catalogues/pricing): standardize pricing level keys to lowercase format
b3b8847
#!/usr/bin/env python3
"""
Test script to validate pricing_levels sync from MongoDB to PostgreSQL.
This script tests the sync functionality for the new pricing_levels JSON column
to ensure data is properly transferred and stored.
"""
import asyncio
import json
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient
import asyncpg
from insightfy_utils.logging import get_logger
from app.core.config import get_settings
from app.sync.catalogues.handler import CatalogueSyncHandler
logger = get_logger(__name__)
# Sample pricing_levels data for testing
SAMPLE_PRICING_LEVELS = {
"currency": "INR",
"mrp": 399,
"levels": {
"retail": {
"purchase_price": 260,
"trade_margin": 20,
"selling_price": 299,
"retail_price": 349,
"retail_margin": 30,
"max_discount_pct": 15
},
"distributor": {
"purchase_price": 230,
"trade_margin": 25,
"selling_price": 279,
"retail_price": 349,
"retail_margin": 30,
"max_discount_pct": 20
}
}
}
SAMPLE_CATALOGUE = {
"catalogue_id": "test-pricing-levels-001",
"catalogue_code": "TPL-001-000001",
"catalogue_name": "Test Pricing Levels Product",
"catalogue_type": "Product",
"category": "Test",
"brand": "TestBrand",
"identifiers": {
"sku": "TPL001",
"barcode_number": "1234567890123"
},
"pricing_levels": SAMPLE_PRICING_LEVELS,
"tax": {
"hsn_code": "33049900",
"gst_rate": 18.0
},
"inventory": {
"track_inventory": True
},
"procurement": {
"batch_managed": False
},
"meta": {
"status": "Active",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
}
async def setup_test_data():
"""Insert test catalogue with pricing_levels into MongoDB."""
settings = get_settings()
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db["catalogues"]
try:
# Insert or update test catalogue
await collection.replace_one(
{"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]},
SAMPLE_CATALOGUE,
upsert=True
)
logger.info(f"Test catalogue inserted/updated in MongoDB: {SAMPLE_CATALOGUE['catalogue_id']}")
return True
except Exception as e:
logger.error(f"Failed to setup test data: {e}")
return False
finally:
mongo_client.close()
async def test_pricing_levels_sync():
"""Test the sync of pricing_levels from MongoDB to PostgreSQL."""
settings = get_settings()
# Setup MongoDB connection
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
# Setup PostgreSQL connection
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
try:
# Initialize sync handler
handler = CatalogueSyncHandler()
# Fetch from MongoDB
catalogue = await handler.fetch_from_mongodb(
SAMPLE_CATALOGUE["catalogue_id"],
mongo_db
)
if not catalogue:
logger.error("Test catalogue not found in MongoDB")
return False
logger.info("Fetched catalogue from MongoDB")
logger.info(f"Pricing levels: {json.dumps(catalogue.get('pricing_levels'), indent=2)}")
# Validate required fields
if not handler.validate_required_fields(catalogue):
logger.error("Catalogue validation failed")
return False
# Test sync to PostgreSQL
success = await handler.upsert_to_postgres(catalogue, pg_conn)
if not success:
logger.error("Failed to sync catalogue to PostgreSQL")
return False
logger.info("Successfully synced catalogue to PostgreSQL")
# Verify data in PostgreSQL
query = """
SELECT catalogue_id, catalogue_name, pricing_levels
FROM trans.catalogue_ref
WHERE catalogue_id = $1
"""
result = await pg_conn.fetchrow(query, SAMPLE_CATALOGUE["catalogue_id"])
if not result:
logger.error("Catalogue not found in PostgreSQL after sync")
return False
logger.info("Catalogue found in PostgreSQL:")
logger.info(f"ID: {result['catalogue_id']}")
logger.info(f"Name: {result['catalogue_name']}")
logger.info(f"Pricing levels: {json.dumps(result['pricing_levels'], indent=2)}")
# Validate pricing_levels structure
pg_pricing_levels = result['pricing_levels']
if not pg_pricing_levels:
logger.error("pricing_levels is null in PostgreSQL")
return False
# Check key fields
expected_fields = ['currency', 'mrp', 'levels']
for field in expected_fields:
if field not in pg_pricing_levels:
logger.error(f"Missing field '{field}' in pricing_levels")
return False
# Check levels structure
levels = pg_pricing_levels.get('levels', {})
expected_levels = ['retail', 'distributor']
for level in expected_levels:
if level not in levels:
logger.error(f"Missing pricing level '{level}'")
return False
level_data = levels[level]
expected_level_fields = ['purchase_price', 'trade_margin', 'selling_price', 'retail_price', 'retail_margin', 'max_discount_pct']
for level_field in expected_level_fields:
if level_field not in level_data:
logger.error(f"Missing field '{level_field}' in {level} level")
return False
logger.info("βœ… All pricing_levels validation checks passed!")
return True
except Exception as e:
logger.error(f"Test failed with error: {e}")
return False
finally:
mongo_client.close()
await pg_conn.close()
async def cleanup_test_data():
"""Clean up test data from both MongoDB and PostgreSQL."""
settings = get_settings()
# Clean MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db["catalogues"]
try:
await collection.delete_one({"catalogue_id": SAMPLE_CATALOGUE["catalogue_id"]})
logger.info("Test catalogue removed from MongoDB")
except Exception as e:
logger.warning(f"Failed to clean MongoDB: {e}")
finally:
mongo_client.close()
# Clean PostgreSQL
try:
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
await pg_conn.execute(
"DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1",
SAMPLE_CATALOGUE["catalogue_id"]
)
logger.info("Test catalogue removed from PostgreSQL")
await pg_conn.close()
except Exception as e:
logger.warning(f"Failed to clean PostgreSQL: {e}")
async def main():
"""Main test function."""
logger.info("πŸ§ͺ Starting pricing_levels sync test")
try:
# Setup test data
logger.info("πŸ“ Setting up test data...")
if not await setup_test_data():
logger.error("❌ Failed to setup test data")
return
# Run sync test
logger.info("πŸ”„ Testing pricing_levels sync...")
success = await test_pricing_levels_sync()
if success:
logger.info("βœ… Pricing levels sync test PASSED!")
else:
logger.error("❌ Pricing levels sync test FAILED!")
finally:
# Cleanup
logger.info("🧹 Cleaning up test data...")
await cleanup_test_data()
logger.info("🏁 Test completed")
if __name__ == "__main__":
asyncio.run(main())