cuatrolabs-scm-ms / tests /test_complete_pricing_levels_flow.py
MukeshKapoor25's picture
merchant fixes
63f05fb
#!/usr/bin/env python3
"""
Complete test script for pricing_levels functionality.
This script tests:
1. Migration of existing catalogues without pricing_levels
2. Sync handler automatic generation
3. Validation of results
4. PostgreSQL storage and retrieval
Usage:
python test_complete_pricing_levels_flow.py
"""
import asyncio
import json
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient
import asyncpg
from insightfy_utils.logging import get_logger
from app.core.config import get_settings
from app.sync.catalogues.handler import CatalogueSyncHandler
from app.catalogues.constants import SCM_CATALOGUE_COLLECTION
logger = get_logger(__name__)
# Test catalogues with different scenarios
TEST_CATALOGUES = [
{
"catalogue_id": "test-complete-001",
"catalogue_code": "TCF-001-000001",
"catalogue_name": "Test Product with Existing Pricing Levels",
"catalogue_type": "Product",
"category": "Test",
"brand": "TestBrand",
"identifiers": {
"sku": "TCF001",
"barcode_number": "1111111111111"
},
"pricing_levels": {
"currency": "INR",
"mrp": 500,
"levels": {
"ncnf": {
"purchase_price": 308,
"trade_margin": 15,
"selling_price": 440,
"retail_price": 440,
"retail_margin": 25,
"max_discount_pct": 5
},
"cnf": {
"purchase_price": 329,
"trade_margin": 18,
"selling_price": 470,
"retail_price": 470,
"retail_margin": 28,
"max_discount_pct": 8
},
"distributor": {
"purchase_price": 315,
"trade_margin": 20,
"selling_price": 450,
"retail_price": 450,
"retail_margin": 30,
"max_discount_pct": 15
},
"retail": {
"purchase_price": 280,
"trade_margin": 25,
"selling_price": 400,
"retail_price": 400,
"retail_margin": 35,
"max_discount_pct": 20
}
}
},
"tax": {
"hsn_code": "33049900",
"gst_rate": 18.0
},
"inventory": {
"track_inventory": True
},
"meta": {
"status": "Active",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
},
{
"catalogue_id": "test-complete-002",
"catalogue_code": "TCF-002-000002",
"catalogue_name": "Test Product without Pricing Levels (Legacy)",
"catalogue_type": "Product",
"category": "Test",
"brand": "TestBrand",
"identifiers": {
"sku": "TCF002",
"barcode_number": "2222222222222"
},
"pricing": {
"mrp": 300,
"retail_price": 250,
"trade_margin": 20,
"retail_margin": 25,
"currency": "INR"
},
"tax": {
"hsn_code": "33049900",
"gst_rate": 18.0
},
"inventory": {
"track_inventory": True
},
"meta": {
"status": "Active",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
},
{
"catalogue_id": "test-complete-003",
"catalogue_code": "TCF-003-000003",
"catalogue_name": "Test Product with Top-level MRP",
"catalogue_type": "Product",
"category": "Test",
"brand": "TestBrand",
"identifiers": {
"sku": "TCF003",
"barcode_number": "3333333333333"
},
"mrp": 150, # Top-level MRP
"tax": {
"hsn_code": "33049900",
"gst_rate": 18.0
},
"inventory": {
"track_inventory": True
},
"meta": {
"status": "Active",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
}
]
async def setup_test_catalogues():
"""Insert test catalogues into MongoDB."""
settings = get_settings()
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
for catalogue in TEST_CATALOGUES:
await collection.replace_one(
{"catalogue_id": catalogue["catalogue_id"]},
catalogue,
upsert=True
)
logger.info(f"โœ… Setup test catalogue: {catalogue['catalogue_name']}")
logger.info(f"Setup {len(TEST_CATALOGUES)} test catalogues in MongoDB")
return True
except Exception as e:
logger.error(f"Failed to setup test catalogues: {e}")
return False
finally:
mongo_client.close()
async def test_sync_handler_generation():
"""Test that sync handler generates pricing_levels for catalogues without them."""
settings = get_settings()
# Setup connections
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
try:
handler = CatalogueSyncHandler()
results = []
for catalogue_data in TEST_CATALOGUES:
catalogue_id = catalogue_data["catalogue_id"]
catalogue_name = catalogue_data["catalogue_name"]
logger.info(f"\n๐Ÿ”„ Testing sync for: {catalogue_name}")
# Fetch from MongoDB
catalogue = await handler.fetch_from_mongodb(catalogue_id, mongo_db)
if not catalogue:
logger.error(f"โŒ Catalogue not found: {catalogue_id}")
continue
# Check if it has pricing_levels before sync
has_pricing_levels_before = "pricing_levels" in catalogue and catalogue["pricing_levels"]
logger.info(f" Has pricing_levels before sync: {has_pricing_levels_before}")
# Extract nested fields (this is where generation happens)
nested_fields = handler.extract_nested_fields(catalogue)
# Check if pricing_levels was generated
has_pricing_levels_after = nested_fields.get("pricing_levels") is not None
logger.info(f" Has pricing_levels after extraction: {has_pricing_levels_after}")
if has_pricing_levels_after:
pricing_levels = nested_fields["pricing_levels"]
logger.info(f" MRP: โ‚น{pricing_levels.get('mrp', 'N/A')}")
logger.info(f" Levels: {list(pricing_levels.get('levels', {}).keys())}")
# Show sample pricing
levels = pricing_levels.get("levels", {})
for level_name in ["ncnf", "cnf", "distributor", "retail"]:
if level_name in levels:
selling_price = levels[level_name].get("selling_price", 0)
logger.info(f" {level_name}: โ‚น{selling_price}")
# Test sync to PostgreSQL
success = await handler.upsert_to_postgres(catalogue, pg_conn)
results.append({
"catalogue_id": catalogue_id,
"catalogue_name": catalogue_name,
"had_pricing_levels_before": has_pricing_levels_before,
"has_pricing_levels_after": has_pricing_levels_after,
"sync_success": success
})
if success:
logger.info(f" โœ… Sync successful")
else:
logger.error(f" โŒ Sync failed")
return results
except Exception as e:
logger.error(f"Sync test failed: {e}")
raise
finally:
mongo_client.close()
await pg_conn.close()
async def verify_postgresql_data():
"""Verify that data is correctly stored in PostgreSQL."""
settings = get_settings()
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
try:
logger.info("\n๐Ÿ” Verifying PostgreSQL data...")
for catalogue_data in TEST_CATALOGUES:
catalogue_id = catalogue_data["catalogue_id"]
catalogue_name = catalogue_data["catalogue_name"]
# Query PostgreSQL
query = """
SELECT catalogue_id, catalogue_name, mrp, pricing_levels
FROM trans.catalogue_ref
WHERE catalogue_id = $1
"""
result = await pg_conn.fetchrow(query, catalogue_id)
if not result:
logger.error(f"โŒ {catalogue_name}: Not found in PostgreSQL")
continue
logger.info(f"\n๐Ÿ“Š {catalogue_name}:")
logger.info(f" ID: {result['catalogue_id']}")
logger.info(f" MRP: โ‚น{result['mrp']}")
pricing_levels = result['pricing_levels']
if pricing_levels:
logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}")
logger.info(f" Pricing Levels MRP: โ‚น{pricing_levels.get('mrp', 'N/A')}")
levels = pricing_levels.get("levels", {})
logger.info(f" Available Levels: {list(levels.keys())}")
for level_name in ["ncnf", "cnf", "distributor", "retail"]:
if level_name in levels:
level_data = levels[level_name]
selling_price = level_data.get("selling_price", 0)
max_discount = level_data.get("max_discount_pct", 0)
logger.info(f" {level_name}: โ‚น{selling_price} (max discount: {max_discount}%)")
logger.info(" โœ… Pricing levels stored correctly")
else:
logger.warning(" โš ๏ธ No pricing_levels in PostgreSQL")
return True
except Exception as e:
logger.error(f"PostgreSQL verification failed: {e}")
return False
finally:
await pg_conn.close()
async def test_pricing_calculations():
"""Test that pricing calculations are correct."""
logger.info("\n๐Ÿงฎ Testing pricing calculations...")
handler = CatalogueSyncHandler()
test_cases = [
{"mrp": 100, "expected_retail": 80}, # 100 - 20% = 80
{"mrp": 500, "expected_retail": 400}, # 500 - 20% = 400
{"mrp": 299, "expected_retail": 239.20}, # 299 - 20% = 239.20
]
for test_case in test_cases:
mrp = test_case["mrp"]
expected_retail = test_case["expected_retail"]
pricing_levels = handler._generate_pricing_levels(mrp)
if not pricing_levels:
logger.error(f"โŒ Failed to generate pricing_levels for MRP โ‚น{mrp}")
continue
retail_price = pricing_levels["levels"]["retail"]["selling_price"]
# Allow small rounding differences
if abs(retail_price - expected_retail) < 0.01:
logger.info(f"โœ… MRP โ‚น{mrp} โ†’ retail โ‚น{retail_price} (expected โ‚น{expected_retail})")
else:
logger.error(f"โŒ MRP โ‚น{mrp} โ†’ retail โ‚น{retail_price} (expected โ‚น{expected_retail})")
return True
async def cleanup_test_data():
"""Clean up test data from both MongoDB and PostgreSQL."""
settings = get_settings()
# Clean MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URL)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
for catalogue in TEST_CATALOGUES:
await collection.delete_one({"catalogue_id": catalogue["catalogue_id"]})
logger.info("๐Ÿงน Cleaned test catalogues from MongoDB")
except Exception as e:
logger.warning(f"Failed to clean MongoDB: {e}")
finally:
mongo_client.close()
# Clean PostgreSQL
try:
pg_conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
for catalogue in TEST_CATALOGUES:
await pg_conn.execute(
"DELETE FROM trans.catalogue_ref WHERE catalogue_id = $1",
catalogue["catalogue_id"]
)
logger.info("๐Ÿงน Cleaned test catalogues from PostgreSQL")
await pg_conn.close()
except Exception as e:
logger.warning(f"Failed to clean PostgreSQL: {e}")
async def main():
"""Main test function."""
logger.info("๐Ÿš€ Starting complete pricing_levels flow test")
logger.info("=" * 60)
try:
# 1. Setup test data
logger.info("๐Ÿ“ Setting up test catalogues...")
if not await setup_test_catalogues():
logger.error("โŒ Failed to setup test data")
return
# 2. Test pricing calculations
await test_pricing_calculations()
# 3. Test sync handler
logger.info("\n๐Ÿ”„ Testing sync handler...")
sync_results = await test_sync_handler_generation()
# 4. Verify PostgreSQL data
await verify_postgresql_data()
# 5. Summary
logger.info("\n" + "=" * 60)
logger.info("TEST SUMMARY")
logger.info("=" * 60)
for result in sync_results:
status = "โœ…" if result["sync_success"] else "โŒ"
generated = "๐Ÿ”ง Generated" if not result["had_pricing_levels_before"] and result["has_pricing_levels_after"] else "๐Ÿ“‹ Existing"
logger.info(f"{status} {result['catalogue_name']}: {generated}")
success_count = sum(1 for r in sync_results if r["sync_success"])
logger.info(f"\nSuccessful syncs: {success_count}/{len(sync_results)}")
if success_count == len(sync_results):
logger.info("๐ŸŽ‰ All tests PASSED!")
else:
logger.error("โŒ Some tests FAILED!")
except Exception as e:
logger.error(f"Test failed: {e}")
finally:
# Cleanup
logger.info("\n๐Ÿงน Cleaning up test data...")
await cleanup_test_data()
logger.info("๐Ÿ Test completed")
if __name__ == "__main__":
asyncio.run(main())