cuatrolabs-scm-ms / docs /database /migrations /migration_reset_pricing_levels.py
MukeshKapoor25's picture
feat(catalogues/pricing): standardize pricing level keys to lowercase format
b3b8847
#!/usr/bin/env python3
"""
Migration script to reset pricing_levels in MongoDB.
This script:
1. Drops existing pricing_levels from all catalogues
2. Regenerates them with the corrected structure (cost_price, trade_margin, max_discount_pct)
Usage:
python migration_reset_pricing_levels.py
"""
import asyncio
import json
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from motor.motor_asyncio import AsyncIOMotorClient
from insightfy_utils.logging import get_logger
from app.core.config import settings
from app.catalogues.constants import SCM_CATALOGUE_COLLECTION
logger = get_logger(__name__)
# Pricing level margins (discount from MRP)
PRICING_MARGINS = {
"ncnf": {
"discount_pct": 12,
"trade_margin": 15,
"max_discount_pct": 5
},
"cnf": {
"discount_pct": 6,
"trade_margin": 18,
"max_discount_pct": 8
},
"distributor": {
"discount_pct": 10,
"trade_margin": 20,
"max_discount_pct": 15
},
"retail": {
"discount_pct": 20,
"trade_margin": 25,
"max_discount_pct": 20
}
}
def calculate_pricing_level(mrp: float, level_config: dict) -> dict:
"""
Calculate pricing for a specific level based on MRP and configuration.
Args:
mrp: Maximum Retail Price
level_config: Configuration with discount_pct and margins
Returns:
Dictionary with cost_price, trade_margin, and max_discount_pct
"""
mrp_decimal = Decimal(str(mrp))
discount_pct = Decimal(str(level_config["discount_pct"]))
# Calculate cost price (MRP - discount%)
cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100')
cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
return {
"cost_price": float(cost_price),
"trade_margin": level_config["trade_margin"],
"max_discount_pct": level_config["max_discount_pct"]
}
def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict:
"""
Generate complete pricing_levels structure based on MRP.
Args:
mrp: Maximum Retail Price
currency: Currency code (default: INR)
Returns:
Complete pricing_levels dictionary or None if invalid MRP
"""
if not mrp or mrp <= 0:
return None
pricing_levels = {
"currency": currency,
"mrp": mrp,
"levels": {}
}
# Generate pricing for each level
for level_name, level_config in PRICING_MARGINS.items():
pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config)
return pricing_levels
async def drop_existing_pricing_levels():
"""Drop existing pricing_levels from all catalogues."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
logger.info("🗑️ Dropping existing pricing_levels from all catalogues...")
# Remove pricing_levels field from all documents
result = await collection.update_many(
{}, # Match all documents
{
"$unset": {"pricing_levels": 1},
"$set": {
"meta.updated_at": datetime.utcnow(),
"meta.updated_by": "migration_reset_pricing_levels"
}
}
)
logger.info(f"✅ Dropped pricing_levels from {result.modified_count} catalogues")
return result.modified_count
except Exception as e:
logger.error(f"Error dropping pricing_levels: {e}")
raise
finally:
mongo_client.close()
async def find_catalogues_with_mrp():
"""Find catalogues that have MRP and can generate pricing_levels."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
# Find catalogues with MRP (from pricing or top-level)
query = {
"$and": [
{
"$or": [
{"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing
{"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP
]
},
{"meta.status": {"$ne": "Deleted"}} # Not deleted
]
}
projection = {
"catalogue_id": 1,
"catalogue_name": 1,
"pricing": 1,
"mrp": 1
}
cursor = collection.find(query, projection)
catalogues = await cursor.to_list(length=None)
logger.info(f"Found {len(catalogues)} catalogues with MRP")
return catalogues
except Exception as e:
logger.error(f"Error finding catalogues: {e}")
raise
finally:
mongo_client.close()
async def regenerate_pricing_levels():
"""Regenerate pricing_levels for all catalogues with MRP."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
# Find catalogues with MRP
catalogues = await find_catalogues_with_mrp()
if not catalogues:
logger.info("No catalogues found with MRP")
return 0
updated_count = 0
skipped_count = 0
error_count = 0
logger.info(f"🔄 Regenerating pricing_levels for {len(catalogues)} catalogues...")
for catalogue in catalogues:
try:
catalogue_id = catalogue["catalogue_id"]
catalogue_name = catalogue.get("catalogue_name", "Unknown")
# Extract MRP from pricing or top-level
mrp = None
if "pricing" in catalogue and catalogue["pricing"]:
mrp = catalogue["pricing"].get("mrp")
if not mrp and "mrp" in catalogue:
mrp = catalogue["mrp"]
if not mrp or mrp <= 0:
logger.warning(f"Skipping {catalogue_name} - no valid MRP found")
skipped_count += 1
continue
# Generate pricing_levels
pricing_levels = generate_pricing_levels(mrp)
if not pricing_levels:
logger.warning(f"Skipping {catalogue_name} - could not generate pricing_levels")
skipped_count += 1
continue
# Update catalogue with new pricing_levels
update_result = await collection.update_one(
{"catalogue_id": catalogue_id},
{
"$set": {
"pricing_levels": pricing_levels,
"meta.updated_at": datetime.utcnow(),
"meta.updated_by": "migration_reset_pricing_levels"
}
}
)
if update_result.modified_count > 0:
updated_count += 1
logger.info(f"✅ {catalogue_name} (MRP: ₹{mrp}) - Generated pricing_levels")
else:
skipped_count += 1
logger.warning(f"⚠️ {catalogue_name} - No update performed")
except Exception as e:
error_count += 1
logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}")
logger.info("=" * 60)
logger.info("REGENERATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total catalogues processed: {len(catalogues)}")
logger.info(f"Successfully updated: {updated_count}")
logger.info(f"Skipped: {skipped_count}")
logger.info(f"Errors: {error_count}")
logger.info("=" * 60)
return updated_count
except Exception as e:
logger.error(f"Error regenerating pricing_levels: {e}")
raise
finally:
mongo_client.close()
async def show_sample_results():
"""Show sample pricing_levels after regeneration."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
mongo_db = mongo_client[settings.MONGODB_DB_NAME]
collection = mongo_db[SCM_CATALOGUE_COLLECTION]
try:
# Get a few samples with pricing_levels
cursor = collection.find(
{"pricing_levels": {"$exists": True, "$ne": None}},
{"catalogue_id": 1, "catalogue_name": 1, "pricing_levels": 1}
).limit(3)
samples = await cursor.to_list(length=3)
if samples:
logger.info("\n📊 SAMPLE REGENERATED PRICING LEVELS:")
logger.info("=" * 60)
for i, sample in enumerate(samples, 1):
catalogue_name = sample.get("catalogue_name", "Unknown")
pricing_levels = sample.get("pricing_levels", {})
logger.info(f"\n{i}. {catalogue_name}")
logger.info(f" MRP: ₹{pricing_levels.get('mrp', 'N/A')}")
logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}")
levels = pricing_levels.get("levels", {})
for level_name in ["ncnf", "cnf", "distributor", "retail"]:
if level_name in levels:
level_data = levels[level_name]
cost_price = level_data.get("cost_price", 0)
trade_margin = level_data.get("trade_margin", 0)
max_discount = level_data.get("max_discount_pct", 0)
logger.info(f" {level_name}: ₹{cost_price} (margin: {trade_margin}%, max_discount: {max_discount}%)")
logger.info("\n" + "=" * 60)
else:
logger.warning("No catalogues found with pricing_levels")
except Exception as e:
logger.error(f"Error showing samples: {e}")
finally:
mongo_client.close()
async def main():
"""Main migration function."""
logger.info("🚀 Starting pricing_levels reset and regeneration")
logger.info("=" * 60)
try:
# Step 1: Drop existing pricing_levels
dropped_count = await drop_existing_pricing_levels()
# Step 2: Regenerate pricing_levels with corrected structure
updated_count = await regenerate_pricing_levels()
# Step 3: Show sample results
await show_sample_results()
# Final summary
logger.info("\n🎉 MIGRATION COMPLETED SUCCESSFULLY!")
logger.info(f" Dropped pricing_levels from: {dropped_count} catalogues")
logger.info(f" Regenerated pricing_levels for: {updated_count} catalogues")
if updated_count > 0:
logger.info("✅ All catalogues now have the corrected pricing_levels structure")
else:
logger.warning("⚠️ No catalogues were updated - check if catalogues have valid MRP")
except Exception as e:
logger.error(f"❌ Migration failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())