Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Migration script to reset pricing_levels in MongoDB. | |
| This script: | |
| 1. Drops existing pricing_levels from all catalogues | |
| 2. Regenerates them with the corrected structure (cost_price, trade_margin, max_discount_pct) | |
| Usage: | |
| python migration_reset_pricing_levels.py | |
| """ | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from decimal import Decimal, ROUND_HALF_UP | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from insightfy_utils.logging import get_logger | |
| from app.core.config import settings | |
| from app.catalogues.constants import SCM_CATALOGUE_COLLECTION | |
| logger = get_logger(__name__) | |
| # Pricing level margins (discount from MRP) | |
| PRICING_MARGINS = { | |
| "ncnf": { | |
| "discount_pct": 12, | |
| "trade_margin": 15, | |
| "max_discount_pct": 5 | |
| }, | |
| "cnf": { | |
| "discount_pct": 6, | |
| "trade_margin": 18, | |
| "max_discount_pct": 8 | |
| }, | |
| "distributor": { | |
| "discount_pct": 10, | |
| "trade_margin": 20, | |
| "max_discount_pct": 15 | |
| }, | |
| "retail": { | |
| "discount_pct": 20, | |
| "trade_margin": 25, | |
| "max_discount_pct": 20 | |
| } | |
| } | |
| def calculate_pricing_level(mrp: float, level_config: dict) -> dict: | |
| """ | |
| Calculate pricing for a specific level based on MRP and configuration. | |
| Args: | |
| mrp: Maximum Retail Price | |
| level_config: Configuration with discount_pct and margins | |
| Returns: | |
| Dictionary with cost_price, trade_margin, and max_discount_pct | |
| """ | |
| mrp_decimal = Decimal(str(mrp)) | |
| discount_pct = Decimal(str(level_config["discount_pct"])) | |
| # Calculate cost price (MRP - discount%) | |
| cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100') | |
| cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP) | |
| return { | |
| "cost_price": float(cost_price), | |
| "trade_margin": level_config["trade_margin"], | |
| "max_discount_pct": level_config["max_discount_pct"] | |
| } | |
| def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict: | |
| """ | |
| Generate complete pricing_levels structure based on MRP. | |
| Args: | |
| mrp: Maximum Retail Price | |
| currency: Currency code (default: INR) | |
| Returns: | |
| Complete pricing_levels dictionary or None if invalid MRP | |
| """ | |
| if not mrp or mrp <= 0: | |
| return None | |
| pricing_levels = { | |
| "currency": currency, | |
| "mrp": mrp, | |
| "levels": {} | |
| } | |
| # Generate pricing for each level | |
| for level_name, level_config in PRICING_MARGINS.items(): | |
| pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config) | |
| return pricing_levels | |
| async def drop_existing_pricing_levels(): | |
| """Drop existing pricing_levels from all catalogues.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| logger.info("🗑️ Dropping existing pricing_levels from all catalogues...") | |
| # Remove pricing_levels field from all documents | |
| result = await collection.update_many( | |
| {}, # Match all documents | |
| { | |
| "$unset": {"pricing_levels": 1}, | |
| "$set": { | |
| "meta.updated_at": datetime.utcnow(), | |
| "meta.updated_by": "migration_reset_pricing_levels" | |
| } | |
| } | |
| ) | |
| logger.info(f"✅ Dropped pricing_levels from {result.modified_count} catalogues") | |
| return result.modified_count | |
| except Exception as e: | |
| logger.error(f"Error dropping pricing_levels: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| async def find_catalogues_with_mrp(): | |
| """Find catalogues that have MRP and can generate pricing_levels.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| # Find catalogues with MRP (from pricing or top-level) | |
| query = { | |
| "$and": [ | |
| { | |
| "$or": [ | |
| {"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing | |
| {"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP | |
| ] | |
| }, | |
| {"meta.status": {"$ne": "Deleted"}} # Not deleted | |
| ] | |
| } | |
| projection = { | |
| "catalogue_id": 1, | |
| "catalogue_name": 1, | |
| "pricing": 1, | |
| "mrp": 1 | |
| } | |
| cursor = collection.find(query, projection) | |
| catalogues = await cursor.to_list(length=None) | |
| logger.info(f"Found {len(catalogues)} catalogues with MRP") | |
| return catalogues | |
| except Exception as e: | |
| logger.error(f"Error finding catalogues: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| async def regenerate_pricing_levels(): | |
| """Regenerate pricing_levels for all catalogues with MRP.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| # Find catalogues with MRP | |
| catalogues = await find_catalogues_with_mrp() | |
| if not catalogues: | |
| logger.info("No catalogues found with MRP") | |
| return 0 | |
| updated_count = 0 | |
| skipped_count = 0 | |
| error_count = 0 | |
| logger.info(f"🔄 Regenerating pricing_levels for {len(catalogues)} catalogues...") | |
| for catalogue in catalogues: | |
| try: | |
| catalogue_id = catalogue["catalogue_id"] | |
| catalogue_name = catalogue.get("catalogue_name", "Unknown") | |
| # Extract MRP from pricing or top-level | |
| mrp = None | |
| if "pricing" in catalogue and catalogue["pricing"]: | |
| mrp = catalogue["pricing"].get("mrp") | |
| if not mrp and "mrp" in catalogue: | |
| mrp = catalogue["mrp"] | |
| if not mrp or mrp <= 0: | |
| logger.warning(f"Skipping {catalogue_name} - no valid MRP found") | |
| skipped_count += 1 | |
| continue | |
| # Generate pricing_levels | |
| pricing_levels = generate_pricing_levels(mrp) | |
| if not pricing_levels: | |
| logger.warning(f"Skipping {catalogue_name} - could not generate pricing_levels") | |
| skipped_count += 1 | |
| continue | |
| # Update catalogue with new pricing_levels | |
| update_result = await collection.update_one( | |
| {"catalogue_id": catalogue_id}, | |
| { | |
| "$set": { | |
| "pricing_levels": pricing_levels, | |
| "meta.updated_at": datetime.utcnow(), | |
| "meta.updated_by": "migration_reset_pricing_levels" | |
| } | |
| } | |
| ) | |
| if update_result.modified_count > 0: | |
| updated_count += 1 | |
| logger.info(f"✅ {catalogue_name} (MRP: ₹{mrp}) - Generated pricing_levels") | |
| else: | |
| skipped_count += 1 | |
| logger.warning(f"⚠️ {catalogue_name} - No update performed") | |
| except Exception as e: | |
| error_count += 1 | |
| logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}") | |
| logger.info("=" * 60) | |
| logger.info("REGENERATION SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info(f"Total catalogues processed: {len(catalogues)}") | |
| logger.info(f"Successfully updated: {updated_count}") | |
| logger.info(f"Skipped: {skipped_count}") | |
| logger.info(f"Errors: {error_count}") | |
| logger.info("=" * 60) | |
| return updated_count | |
| except Exception as e: | |
| logger.error(f"Error regenerating pricing_levels: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| async def show_sample_results(): | |
| """Show sample pricing_levels after regeneration.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| # Get a few samples with pricing_levels | |
| cursor = collection.find( | |
| {"pricing_levels": {"$exists": True, "$ne": None}}, | |
| {"catalogue_id": 1, "catalogue_name": 1, "pricing_levels": 1} | |
| ).limit(3) | |
| samples = await cursor.to_list(length=3) | |
| if samples: | |
| logger.info("\n📊 SAMPLE REGENERATED PRICING LEVELS:") | |
| logger.info("=" * 60) | |
| for i, sample in enumerate(samples, 1): | |
| catalogue_name = sample.get("catalogue_name", "Unknown") | |
| pricing_levels = sample.get("pricing_levels", {}) | |
| logger.info(f"\n{i}. {catalogue_name}") | |
| logger.info(f" MRP: ₹{pricing_levels.get('mrp', 'N/A')}") | |
| logger.info(f" Currency: {pricing_levels.get('currency', 'N/A')}") | |
| levels = pricing_levels.get("levels", {}) | |
| for level_name in ["ncnf", "cnf", "distributor", "retail"]: | |
| if level_name in levels: | |
| level_data = levels[level_name] | |
| cost_price = level_data.get("cost_price", 0) | |
| trade_margin = level_data.get("trade_margin", 0) | |
| max_discount = level_data.get("max_discount_pct", 0) | |
| logger.info(f" {level_name}: ₹{cost_price} (margin: {trade_margin}%, max_discount: {max_discount}%)") | |
| logger.info("\n" + "=" * 60) | |
| else: | |
| logger.warning("No catalogues found with pricing_levels") | |
| except Exception as e: | |
| logger.error(f"Error showing samples: {e}") | |
| finally: | |
| mongo_client.close() | |
| async def main(): | |
| """Main migration function.""" | |
| logger.info("🚀 Starting pricing_levels reset and regeneration") | |
| logger.info("=" * 60) | |
| try: | |
| # Step 1: Drop existing pricing_levels | |
| dropped_count = await drop_existing_pricing_levels() | |
| # Step 2: Regenerate pricing_levels with corrected structure | |
| updated_count = await regenerate_pricing_levels() | |
| # Step 3: Show sample results | |
| await show_sample_results() | |
| # Final summary | |
| logger.info("\n🎉 MIGRATION COMPLETED SUCCESSFULLY!") | |
| logger.info(f" Dropped pricing_levels from: {dropped_count} catalogues") | |
| logger.info(f" Regenerated pricing_levels for: {updated_count} catalogues") | |
| if updated_count > 0: | |
| logger.info("✅ All catalogues now have the corrected pricing_levels structure") | |
| else: | |
| logger.warning("⚠️ No catalogues were updated - check if catalogues have valid MRP") | |
| except Exception as e: | |
| logger.error(f"❌ Migration failed: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |