cuatrolabs-scm-ms / docs /database /migrations /migration_generate_missing_pricing_levels.py
MukeshKapoor25's picture
feat(catalogues/pricing): standardize pricing level keys to lowercase format
b3b8847
#!/usr/bin/env python3
"""
Migration script to generate pricing_levels for catalogues that don't have them.
This script generates pricing_levels based on MRP with the following margins:
- ncnf: MRP - 12%
- cnf: MRP - 6%
- distributor: MRP - 10%
- retail: MRP - 20%
Usage:
python migration_generate_missing_pricing_levels.py
"""
import asyncio
import json
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from motor.motor_asyncio import AsyncIOMotorClient
from insightfy_utils.logging import get_logger
from app.core.config import settings
from app.catalogues.constants import SCM_CATALOGUE_COLLECTION
logger = get_logger(__name__)
# Pricing level margins (discount from MRP)
PRICING_MARGINS = {
"ncnf": {
"discount_pct": 12,
"trade_margin": 15,
"max_discount_pct": 5
},
"cnf": {
"discount_pct": 6,
"trade_margin": 18,
"max_discount_pct": 8
},
"distributor": {
"discount_pct": 10,
"trade_margin": 20,
"max_discount_pct": 15
},
"retail": {
"discount_pct": 20,
"trade_margin": 25,
"max_discount_pct": 20
}
}
def calculate_pricing_level(mrp: float, level_config: dict) -> dict:
"""
Calculate pricing for a specific level based on MRP and configuration.
Args:
mrp: Maximum Retail Price
level_config: Configuration with discount_pct and margins
Returns:
Dictionary with cost_price, trade_margin, and max_discount_pct
"""
mrp_decimal = Decimal(str(mrp))
discount_pct = Decimal(str(level_config["discount_pct"]))
# Calculate cost price (MRP - discount%)
cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100')
cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
return {
"cost_price": float(cost_price),
"trade_margin": level_config["trade_margin"],
"max_discount_pct": level_config["max_discount_pct"]
}
def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict:
"""
Generate complete pricing_levels structure based on MRP.
Args:
mrp: Maximum Retail Price
currency: Currency code (default: INR)
Returns:
Complete pricing_levels dictionary
"""
if not mrp or mrp <= 0:
return None
pricing_levels = {
"currency": currency,
"mrp": mrp,
"levels": {}
}
# Generate pricing for each level
for level_name, level_config in PRICING_MARGINS.items():
pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config)
return pricing_levels
async def find_catalogues_without_pricing_levels():
"""Find catalogues that don't have pricing_levels but have MRP."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
# Find catalogues without pricing_levels but with MRP
query = {
"$and": [
{"pricing_levels": {"$exists": False}}, # No pricing_levels
{
"$or": [
{"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing
{"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP
]
},
{"meta.status": {"$ne": "Deleted"}} # Not deleted
]
}
projection = {
"catalogue_id": 1,
"catalogue_name": 1,
"pricing": 1,
"mrp": 1,
"pricing_levels": 1
}
cursor = collection.find(query, projection)
catalogues = await cursor.to_list(length=None)
logger.info(f"Found {len(catalogues)} catalogues without pricing_levels")
return catalogues
except Exception as e:
logger.error(f"Error finding catalogues: {e}")
raise
finally:
mongo_client.close()
async def update_catalogue_pricing_levels(catalogue_id: str, pricing_levels: dict):
"""Update a single catalogue with generated pricing_levels."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
# Update the catalogue
update_result = await collection.update_one(
{"catalogue_id": catalogue_id},
{
"$set": {
"pricing_levels": pricing_levels,
"meta.updated_at": datetime.utcnow(),
"meta.updated_by": "migration_script"
}
}
)
if update_result.modified_count > 0:
logger.debug(f"Updated pricing_levels for catalogue: {catalogue_id}")
return True
else:
logger.warning(f"No update performed for catalogue: {catalogue_id}")
return False
except Exception as e:
logger.error(f"Error updating catalogue {catalogue_id}: {e}")
raise
finally:
mongo_client.close()
async def migrate_pricing_levels():
"""Main migration function to generate pricing_levels for catalogues."""
logger.info("Starting pricing_levels migration")
try:
# Find catalogues without pricing_levels
catalogues = await find_catalogues_without_pricing_levels()
if not catalogues:
logger.info("No catalogues found that need pricing_levels migration")
return
updated_count = 0
skipped_count = 0
error_count = 0
for catalogue in catalogues:
try:
catalogue_id = catalogue["catalogue_id"]
catalogue_name = catalogue.get("catalogue_name", "Unknown")
# Extract MRP from pricing or top-level
mrp = None
if "pricing" in catalogue and catalogue["pricing"]:
mrp = catalogue["pricing"].get("mrp")
if not mrp and "mrp" in catalogue:
mrp = catalogue["mrp"]
if not mrp or mrp <= 0:
logger.warning(f"Skipping catalogue {catalogue_id} - no valid MRP found")
skipped_count += 1
continue
# Generate pricing_levels
pricing_levels = generate_pricing_levels(mrp)
if not pricing_levels:
logger.warning(f"Skipping catalogue {catalogue_id} - could not generate pricing_levels")
skipped_count += 1
continue
# Update catalogue
success = await update_catalogue_pricing_levels(catalogue_id, pricing_levels)
if success:
updated_count += 1
logger.info(f"✅ Updated {catalogue_name} (ID: {catalogue_id}) - MRP: ₹{mrp}")
else:
skipped_count += 1
except Exception as e:
error_count += 1
logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}")
# Summary
logger.info("=" * 60)
logger.info("MIGRATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total catalogues found: {len(catalogues)}")
logger.info(f"Successfully updated: {updated_count}")
logger.info(f"Skipped: {skipped_count}")
logger.info(f"Errors: {error_count}")
logger.info("=" * 60)
if updated_count > 0:
logger.info("✅ Migration completed successfully!")
else:
logger.warning("⚠️ No catalogues were updated")
except Exception as e:
logger.error(f"Migration failed: {e}")
raise
async def preview_migration():
"""Preview what the migration would do without making changes."""
logger.info("PREVIEW MODE - No changes will be made")
logger.info("=" * 60)
try:
catalogues = await find_catalogues_without_pricing_levels()
if not catalogues:
logger.info("No catalogues found that need pricing_levels migration")
return
logger.info(f"Found {len(catalogues)} catalogues that would be updated:")
logger.info("")
for i, catalogue in enumerate(catalogues[:10], 1): # Show first 10
catalogue_id = catalogue["catalogue_id"]
catalogue_name = catalogue.get("catalogue_name", "Unknown")
# Extract MRP
mrp = None
if "pricing" in catalogue and catalogue["pricing"]:
mrp = catalogue["pricing"].get("mrp")
if not mrp and "mrp" in catalogue:
mrp = catalogue["mrp"]
logger.info(f"{i}. {catalogue_name}")
logger.info(f" ID: {catalogue_id}")
logger.info(f" MRP: ₹{mrp if mrp else 'Not found'}")
if mrp and mrp > 0:
# Show what pricing_levels would be generated
pricing_levels = generate_pricing_levels(mrp)
if pricing_levels:
logger.info(" Generated pricing levels:")
for level, data in pricing_levels["levels"].items():
logger.info(f" {level}: ₹{data['cost_price']} (MRP-{PRICING_MARGINS[level]['discount_pct']}%)")
logger.info("")
if len(catalogues) > 10:
logger.info(f"... and {len(catalogues) - 10} more catalogues")
logger.info("=" * 60)
logger.info("Run without --preview flag to execute the migration")
except Exception as e:
logger.error(f"Preview failed: {e}")
raise
async def main():
"""Main function with preview option."""
import sys
if "--preview" in sys.argv:
await preview_migration()
else:
await migrate_pricing_levels()
if __name__ == "__main__":
asyncio.run(main())