#!/usr/bin/env python3 """ Migration script to reset pricing_levels in MongoDB. This script: 1. Drops existing pricing_levels from all catalogues 2. Regenerates them with the corrected structure (cost_price, trade_margin, max_discount_pct) Usage: python migration_reset_pricing_levels.py """ import asyncio import json from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from motor.motor_asyncio import AsyncIOMotorClient from insightfy_utils.logging import get_logger from app.core.config import settings from app.catalogues.constants import SCM_CATALOGUE_COLLECTION logger = get_logger(__name__) # Pricing level margins (discount from MRP) PRICING_MARGINS = { "ncnf": { "discount_pct": 12, "trade_margin": 15, "max_discount_pct": 5 }, "cnf": { "discount_pct": 6, "trade_margin": 18, "max_discount_pct": 8 }, "distributor": { "discount_pct": 10, "trade_margin": 20, "max_discount_pct": 15 }, "retail": { "discount_pct": 20, "trade_margin": 25, "max_discount_pct": 20 } } def calculate_pricing_level(mrp: float, level_config: dict) -> dict: """ Calculate pricing for a specific level based on MRP and configuration. Args: mrp: Maximum Retail Price level_config: Configuration with discount_pct and margins Returns: Dictionary with cost_price, trade_margin, and max_discount_pct """ mrp_decimal = Decimal(str(mrp)) discount_pct = Decimal(str(level_config["discount_pct"])) # Calculate cost price (MRP - discount%) cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100') cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP) return { "cost_price": float(cost_price), "trade_margin": level_config["trade_margin"], "max_discount_pct": level_config["max_discount_pct"] } def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict: """ Generate complete pricing_levels structure based on MRP. Args: mrp: Maximum Retail Price currency: Currency code (default: INR) Returns: Complete pricing_levels dictionary or None if invalid MRP """ if not mrp or mrp <= 0: return None pricing_levels = { "currency": currency, "mrp": mrp, "levels": {} } # Generate pricing for each level for level_name, level_config in PRICING_MARGINS.items(): pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config) return pricing_levels async def drop_existing_pricing_levels(): """Drop existing pricing_levels from all catalogues.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: logger.info("šŸ—‘ļø Dropping existing pricing_levels from all catalogues...") # Remove pricing_levels field from all documents result = await collection.update_many( {}, # Match all documents { "$unset": {"pricing_levels": 1}, "$set": { "meta.updated_at": datetime.utcnow(), "meta.updated_by": "migration_reset_pricing_levels" } } ) logger.info(f"āœ… Dropped pricing_levels from {result.modified_count} catalogues") return result.modified_count except Exception as e: logger.error(f"Error dropping pricing_levels: {e}") raise finally: mongo_client.close() async def find_catalogues_with_mrp(): """Find catalogues that have MRP and can generate pricing_levels.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: # Find catalogues with MRP (from pricing or top-level) query = { "$and": [ { "$or": [ {"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing {"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP ] }, {"meta.status": {"$ne": "Deleted"}} # Not deleted ] } projection = { "catalogue_id": 1, "catalogue_name": 1, "pricing": 1, "mrp": 1 } cursor = collection.find(query, projection) catalogues = await cursor.to_list(length=None) logger.info(f"Found {len(catalogues)} catalogues with MRP") return catalogues except Exception as e: logger.error(f"Error finding catalogues: {e}") raise finally: mongo_client.close() async def regenerate_pricing_levels(): """Regenerate pricing_levels for all catalogues with MRP.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: # Find catalogues with MRP catalogues = await find_catalogues_with_mrp() if not catalogues: logger.info("No catalogues found with MRP") return 0 updated_count = 0 skipped_count = 0 error_count = 0 logger.info(f"šŸ”„ Regenerating pricing_levels for {len(catalogues)} catalogues...") for catalogue in catalogues: try: catalogue_id = catalogue["catalogue_id"] catalogue_name = catalogue.get("catalogue_name", "Unknown") # Extract MRP from pricing or top-level mrp = None if "pricing" in catalogue and catalogue["pricing"]: mrp = catalogue["pricing"].get("mrp") if not mrp and "mrp" in catalogue: mrp = catalogue["mrp"] if not mrp or mrp <= 0: logger.warning(f"Skipping {catalogue_name} - no valid MRP found") skipped_count += 1 continue # Generate pricing_levels pricing_levels = generate_pricing_levels(mrp) if not pricing_levels: logger.warning(f"Skipping {catalogue_name} - could not generate pricing_levels") skipped_count += 1 continue # Update catalogue with new pricing_levels update_result = await collection.update_one( {"catalogue_id": catalogue_id}, { "$set": { "pricing_levels": pricing_levels, "meta.updated_at": datetime.utcnow(), "meta.updated_by": "migration_reset_pricing_levels" } } ) if update_result.modified_count > 0: updated_count += 1 logger.info(f"āœ… {catalogue_name} (MRP: ₹{mrp}) - Generated pricing_levels") else: skipped_count += 1 logger.warning(f"āš ļø {catalogue_name} - No update performed") except Exception as e: error_count += 1 logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}") logger.info("=" * 60) logger.info("REGENERATION SUMMARY") logger.info("=" * 60) logger.info(f"Total catalogues processed: {len(catalogues)}") logger.info(f"Successfully updated: {updated_count}") logger.info(f"Skipped: {skipped_count}") logger.info(f"Errors: {error_count}") logger.info("=" * 60) return updated_count except Exception as e: logger.error(f"Error regenerating pricing_levels: {e}") raise finally: mongo_client.close() async def show_sample_results(): """Show sample pricing_levels after regeneration.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) mongo_db = mongo_client[settings.MONGODB_DB_NAME] collection = mongo_db[SCM_CATALOGUE_COLLECTION] try: # Get a few samples with pricing_levels cursor = collection.find( {"pricing_levels": {"$exists": True, "$ne": None}}, {"catalogue_id": 1, "catalogue_name": 1, "pricing_levels": 1} ).limit(3) samples = await cursor.to_list(length=3) if samples: logger.info("\nšŸ“Š SAMPLE REGENERATED PRICING LEVELS:") logger.info("=" * 60) for i, sample in enumerate(samples, 1): catalogue_name = sample.get("catalogue_name", "Unknown") pricing_levels = sample.get("pricing_levels", {}) logger.info(f"\n{i}. {catalogue_name}") logger.info(f" MRP: ₹{pricing_levels.get('mrp', 'N/A')}") logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}") levels = pricing_levels.get("levels", {}) for level_name in ["ncnf", "cnf", "distributor", "retail"]: if level_name in levels: level_data = levels[level_name] cost_price = level_data.get("cost_price", 0) trade_margin = level_data.get("trade_margin", 0) max_discount = level_data.get("max_discount_pct", 0) logger.info(f" {level_name}: ₹{cost_price} (margin: {trade_margin}%, max_discount: {max_discount}%)") logger.info("\n" + "=" * 60) else: logger.warning("No catalogues found with pricing_levels") except Exception as e: logger.error(f"Error showing samples: {e}") finally: mongo_client.close() async def main(): """Main migration function.""" logger.info("šŸš€ Starting pricing_levels reset and regeneration") logger.info("=" * 60) try: # Step 1: Drop existing pricing_levels dropped_count = await drop_existing_pricing_levels() # Step 2: Regenerate pricing_levels with corrected structure updated_count = await regenerate_pricing_levels() # Step 3: Show sample results await show_sample_results() # Final summary logger.info("\nšŸŽ‰ MIGRATION COMPLETED SUCCESSFULLY!") logger.info(f" Dropped pricing_levels from: {dropped_count} catalogues") logger.info(f" Regenerated pricing_levels for: {updated_count} catalogues") if updated_count > 0: logger.info("āœ… All catalogues now have the corrected pricing_levels structure") else: logger.warning("āš ļø No catalogues were updated - check if catalogues have valid MRP") except Exception as e: logger.error(f"āŒ Migration failed: {e}") raise if __name__ == "__main__": asyncio.run(main())