#!/usr/bin/env python3 """ Migration script to generate pricing_levels for catalogues that don't have them. This script generates pricing_levels based on MRP with the following margins: - ncnf: MRP - 12% - cnf: MRP - 6% - distributor: MRP - 10% - retail: MRP - 20% Usage: python migration_generate_missing_pricing_levels.py """ import asyncio import json from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from motor.motor_asyncio import AsyncIOMotorClient from insightfy_utils.logging import get_logger from app.core.config import settings from app.catalogues.constants import SCM_CATALOGUE_COLLECTION logger = get_logger(__name__) # Pricing level margins (discount from MRP) PRICING_MARGINS = { "ncnf": { "discount_pct": 12, "trade_margin": 15, "max_discount_pct": 5 }, "cnf": { "discount_pct": 6, "trade_margin": 18, "max_discount_pct": 8 }, "distributor": { "discount_pct": 10, "trade_margin": 20, "max_discount_pct": 15 }, "retail": { "discount_pct": 20, "trade_margin": 25, "max_discount_pct": 20 } } def calculate_pricing_level(mrp: float, level_config: dict) -> dict: """ Calculate pricing for a specific level based on MRP and configuration. Args: mrp: Maximum Retail Price level_config: Configuration with discount_pct and margins Returns: Dictionary with cost_price, trade_margin, and max_discount_pct """ mrp_decimal = Decimal(str(mrp)) discount_pct = Decimal(str(level_config["discount_pct"])) # Calculate cost price (MRP - discount%) cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100') cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP) return { "cost_price": float(cost_price), "trade_margin": level_config["trade_margin"], "max_discount_pct": level_config["max_discount_pct"] } def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict: """ Generate complete pricing_levels structure based on MRP. Args: mrp: Maximum Retail Price currency: Currency code (default: INR) Returns: Complete pricing_levels dictionary """ if not mrp or mrp <= 0: return None pricing_levels = { "currency": currency, "mrp": mrp, "levels": {} } # Generate pricing for each level for level_name, level_config in PRICING_MARGINS.items(): pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config) return pricing_levels async def find_catalogues_without_pricing_levels(): """Find catalogues that don't have pricing_levels but have MRP.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: # Find catalogues without pricing_levels but with MRP query = { "$and": [ {"pricing_levels": {"$exists": False}}, # No pricing_levels { "$or": [ {"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing {"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP ] }, {"meta.status": {"$ne": "Deleted"}} # Not deleted ] } projection = { "catalogue_id": 1, "catalogue_name": 1, "pricing": 1, "mrp": 1, "pricing_levels": 1 } cursor = collection.find(query, projection) catalogues = await cursor.to_list(length=None) logger.info(f"Found {len(catalogues)} catalogues without pricing_levels") return catalogues except Exception as e: logger.error(f"Error finding catalogues: {e}") raise finally: mongo_client.close() async def update_catalogue_pricing_levels(catalogue_id: str, pricing_levels: dict): """Update a single catalogue with generated pricing_levels.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: # Update the catalogue update_result = await collection.update_one( {"catalogue_id": catalogue_id}, { "$set": { "pricing_levels": pricing_levels, "meta.updated_at": datetime.utcnow(), "meta.updated_by": "migration_script" } } ) if update_result.modified_count > 0: logger.debug(f"Updated pricing_levels for catalogue: {catalogue_id}") return True else: logger.warning(f"No update performed for catalogue: {catalogue_id}") return False except Exception as e: logger.error(f"Error updating catalogue {catalogue_id}: {e}") raise finally: mongo_client.close() async def migrate_pricing_levels(): """Main migration function to generate pricing_levels for catalogues.""" logger.info("Starting pricing_levels migration") try: # Find catalogues without pricing_levels catalogues = await find_catalogues_without_pricing_levels() if not catalogues: logger.info("No catalogues found that need pricing_levels migration") return updated_count = 0 skipped_count = 0 error_count = 0 for catalogue in catalogues: try: catalogue_id = catalogue["catalogue_id"] catalogue_name = catalogue.get("catalogue_name", "Unknown") # Extract MRP from pricing or top-level mrp = None if "pricing" in catalogue and catalogue["pricing"]: mrp = catalogue["pricing"].get("mrp") if not mrp and "mrp" in catalogue: mrp = catalogue["mrp"] if not mrp or mrp <= 0: logger.warning(f"Skipping catalogue {catalogue_id} - no valid MRP found") skipped_count += 1 continue # Generate pricing_levels pricing_levels = generate_pricing_levels(mrp) if not pricing_levels: logger.warning(f"Skipping catalogue {catalogue_id} - could not generate pricing_levels") skipped_count += 1 continue # Update catalogue success = await update_catalogue_pricing_levels(catalogue_id, pricing_levels) if success: updated_count += 1 logger.info(f"✅ Updated {catalogue_name} (ID: {catalogue_id}) - MRP: ₹{mrp}") else: skipped_count += 1 except Exception as e: error_count += 1 logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}") # Summary logger.info("=" * 60) logger.info("MIGRATION SUMMARY") logger.info("=" * 60) logger.info(f"Total catalogues found: {len(catalogues)}") logger.info(f"Successfully updated: {updated_count}") logger.info(f"Skipped: {skipped_count}") logger.info(f"Errors: {error_count}") logger.info("=" * 60) if updated_count > 0: logger.info("✅ Migration completed successfully!") else: logger.warning("⚠️ No catalogues were updated") except Exception as e: logger.error(f"Migration failed: {e}") raise async def preview_migration(): """Preview what the migration would do without making changes.""" logger.info("PREVIEW MODE - No changes will be made") logger.info("=" * 60) try: catalogues = await find_catalogues_without_pricing_levels() if not catalogues: logger.info("No catalogues found that need pricing_levels migration") return logger.info(f"Found {len(catalogues)} catalogues that would be updated:") logger.info("") for i, catalogue in enumerate(catalogues[:10], 1): # Show first 10 catalogue_id = catalogue["catalogue_id"] catalogue_name = catalogue.get("catalogue_name", "Unknown") # Extract MRP mrp = None if "pricing" in catalogue and catalogue["pricing"]: mrp = catalogue["pricing"].get("mrp") if not mrp and "mrp" in catalogue: mrp = catalogue["mrp"] logger.info(f"{i}. {catalogue_name}") logger.info(f" ID: {catalogue_id}") logger.info(f" MRP: ₹{mrp if mrp else 'Not found'}") if mrp and mrp > 0: # Show what pricing_levels would be generated pricing_levels = generate_pricing_levels(mrp) if pricing_levels: logger.info(" Generated pricing levels:") for level, data in pricing_levels["levels"].items(): logger.info(f" {level}: ₹{data['cost_price']} (MRP-{PRICING_MARGINS[level]['discount_pct']}%)") logger.info("") if len(catalogues) > 10: logger.info(f"... and {len(catalogues) - 10} more catalogues") logger.info("=" * 60) logger.info("Run without --preview flag to execute the migration") except Exception as e: logger.error(f"Preview failed: {e}") raise async def main(): """Main function with preview option.""" import sys if "--preview" in sys.argv: await preview_migration() else: await migrate_pricing_levels() if __name__ == "__main__": asyncio.run(main())