Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Migration script to generate pricing_levels for catalogues that don't have them. | |
| This script generates pricing_levels based on MRP with the following margins: | |
| - ncnf: MRP - 12% | |
| - cnf: MRP - 6% | |
| - distributor: MRP - 10% | |
| - retail: MRP - 20% | |
| Usage: | |
| python migration_generate_missing_pricing_levels.py | |
| """ | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from decimal import Decimal, ROUND_HALF_UP | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from insightfy_utils.logging import get_logger | |
| from app.core.config import settings | |
| from app.catalogues.constants import SCM_CATALOGUE_COLLECTION | |
| logger = get_logger(__name__) | |
| # Pricing level margins (discount from MRP) | |
| PRICING_MARGINS = { | |
| "ncnf": { | |
| "discount_pct": 12, | |
| "trade_margin": 15, | |
| "max_discount_pct": 5 | |
| }, | |
| "cnf": { | |
| "discount_pct": 6, | |
| "trade_margin": 18, | |
| "max_discount_pct": 8 | |
| }, | |
| "distributor": { | |
| "discount_pct": 10, | |
| "trade_margin": 20, | |
| "max_discount_pct": 15 | |
| }, | |
| "retail": { | |
| "discount_pct": 20, | |
| "trade_margin": 25, | |
| "max_discount_pct": 20 | |
| } | |
| } | |
| def calculate_pricing_level(mrp: float, level_config: dict) -> dict: | |
| """ | |
| Calculate pricing for a specific level based on MRP and configuration. | |
| Args: | |
| mrp: Maximum Retail Price | |
| level_config: Configuration with discount_pct and margins | |
| Returns: | |
| Dictionary with cost_price, trade_margin, and max_discount_pct | |
| """ | |
| mrp_decimal = Decimal(str(mrp)) | |
| discount_pct = Decimal(str(level_config["discount_pct"])) | |
| # Calculate cost price (MRP - discount%) | |
| cost_price = mrp_decimal * (Decimal('100') - discount_pct) / Decimal('100') | |
| cost_price = cost_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP) | |
| return { | |
| "cost_price": float(cost_price), | |
| "trade_margin": level_config["trade_margin"], | |
| "max_discount_pct": level_config["max_discount_pct"] | |
| } | |
| def generate_pricing_levels(mrp: float, currency: str = "INR") -> dict: | |
| """ | |
| Generate complete pricing_levels structure based on MRP. | |
| Args: | |
| mrp: Maximum Retail Price | |
| currency: Currency code (default: INR) | |
| Returns: | |
| Complete pricing_levels dictionary | |
| """ | |
| if not mrp or mrp <= 0: | |
| return None | |
| pricing_levels = { | |
| "currency": currency, | |
| "mrp": mrp, | |
| "levels": {} | |
| } | |
| # Generate pricing for each level | |
| for level_name, level_config in PRICING_MARGINS.items(): | |
| pricing_levels["levels"][level_name] = calculate_pricing_level(mrp, level_config) | |
| return pricing_levels | |
| async def find_catalogues_without_pricing_levels(): | |
| """Find catalogues that don't have pricing_levels but have MRP.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| # Find catalogues without pricing_levels but with MRP | |
| query = { | |
| "$and": [ | |
| {"pricing_levels": {"$exists": False}}, # No pricing_levels | |
| { | |
| "$or": [ | |
| {"pricing.mrp": {"$exists": True, "$ne": None, "$gt": 0}}, # Has MRP in pricing | |
| {"mrp": {"$exists": True, "$ne": None, "$gt": 0}} # Has top-level MRP | |
| ] | |
| }, | |
| {"meta.status": {"$ne": "Deleted"}} # Not deleted | |
| ] | |
| } | |
| projection = { | |
| "catalogue_id": 1, | |
| "catalogue_name": 1, | |
| "pricing": 1, | |
| "mrp": 1, | |
| "pricing_levels": 1 | |
| } | |
| cursor = collection.find(query, projection) | |
| catalogues = await cursor.to_list(length=None) | |
| logger.info(f"Found {len(catalogues)} catalogues without pricing_levels") | |
| return catalogues | |
| except Exception as e: | |
| logger.error(f"Error finding catalogues: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| async def update_catalogue_pricing_levels(catalogue_id: str, pricing_levels: dict): | |
| """Update a single catalogue with generated pricing_levels.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| mongo_db = mongo_client[settings.MONGODB_DB_NAME] | |
| collection = mongo_db[SCM_CATALOGUE_COLLECTION] | |
| try: | |
| # Update the catalogue | |
| update_result = await collection.update_one( | |
| {"catalogue_id": catalogue_id}, | |
| { | |
| "$set": { | |
| "pricing_levels": pricing_levels, | |
| "meta.updated_at": datetime.utcnow(), | |
| "meta.updated_by": "migration_script" | |
| } | |
| } | |
| ) | |
| if update_result.modified_count > 0: | |
| logger.debug(f"Updated pricing_levels for catalogue: {catalogue_id}") | |
| return True | |
| else: | |
| logger.warning(f"No update performed for catalogue: {catalogue_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error updating catalogue {catalogue_id}: {e}") | |
| raise | |
| finally: | |
| mongo_client.close() | |
| async def migrate_pricing_levels(): | |
| """Main migration function to generate pricing_levels for catalogues.""" | |
| logger.info("Starting pricing_levels migration") | |
| try: | |
| # Find catalogues without pricing_levels | |
| catalogues = await find_catalogues_without_pricing_levels() | |
| if not catalogues: | |
| logger.info("No catalogues found that need pricing_levels migration") | |
| return | |
| updated_count = 0 | |
| skipped_count = 0 | |
| error_count = 0 | |
| for catalogue in catalogues: | |
| try: | |
| catalogue_id = catalogue["catalogue_id"] | |
| catalogue_name = catalogue.get("catalogue_name", "Unknown") | |
| # Extract MRP from pricing or top-level | |
| mrp = None | |
| if "pricing" in catalogue and catalogue["pricing"]: | |
| mrp = catalogue["pricing"].get("mrp") | |
| if not mrp and "mrp" in catalogue: | |
| mrp = catalogue["mrp"] | |
| if not mrp or mrp <= 0: | |
| logger.warning(f"Skipping catalogue {catalogue_id} - no valid MRP found") | |
| skipped_count += 1 | |
| continue | |
| # Generate pricing_levels | |
| pricing_levels = generate_pricing_levels(mrp) | |
| if not pricing_levels: | |
| logger.warning(f"Skipping catalogue {catalogue_id} - could not generate pricing_levels") | |
| skipped_count += 1 | |
| continue | |
| # Update catalogue | |
| success = await update_catalogue_pricing_levels(catalogue_id, pricing_levels) | |
| if success: | |
| updated_count += 1 | |
| logger.info(f"✅ Updated {catalogue_name} (ID: {catalogue_id}) - MRP: ₹{mrp}") | |
| else: | |
| skipped_count += 1 | |
| except Exception as e: | |
| error_count += 1 | |
| logger.error(f"Error processing catalogue {catalogue.get('catalogue_id', 'unknown')}: {e}") | |
| # Summary | |
| logger.info("=" * 60) | |
| logger.info("MIGRATION SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info(f"Total catalogues found: {len(catalogues)}") | |
| logger.info(f"Successfully updated: {updated_count}") | |
| logger.info(f"Skipped: {skipped_count}") | |
| logger.info(f"Errors: {error_count}") | |
| logger.info("=" * 60) | |
| if updated_count > 0: | |
| logger.info("✅ Migration completed successfully!") | |
| else: | |
| logger.warning("⚠️ No catalogues were updated") | |
| except Exception as e: | |
| logger.error(f"Migration failed: {e}") | |
| raise | |
| async def preview_migration(): | |
| """Preview what the migration would do without making changes.""" | |
| logger.info("PREVIEW MODE - No changes will be made") | |
| logger.info("=" * 60) | |
| try: | |
| catalogues = await find_catalogues_without_pricing_levels() | |
| if not catalogues: | |
| logger.info("No catalogues found that need pricing_levels migration") | |
| return | |
| logger.info(f"Found {len(catalogues)} catalogues that would be updated:") | |
| logger.info("") | |
| for i, catalogue in enumerate(catalogues[:10], 1): # Show first 10 | |
| catalogue_id = catalogue["catalogue_id"] | |
| catalogue_name = catalogue.get("catalogue_name", "Unknown") | |
| # Extract MRP | |
| mrp = None | |
| if "pricing" in catalogue and catalogue["pricing"]: | |
| mrp = catalogue["pricing"].get("mrp") | |
| if not mrp and "mrp" in catalogue: | |
| mrp = catalogue["mrp"] | |
| logger.info(f"{i}. {catalogue_name}") | |
| logger.info(f" ID: {catalogue_id}") | |
| logger.info(f" MRP: ₹{mrp if mrp else 'Not found'}") | |
| if mrp and mrp > 0: | |
| # Show what pricing_levels would be generated | |
| pricing_levels = generate_pricing_levels(mrp) | |
| if pricing_levels: | |
| logger.info(" Generated pricing levels:") | |
| for level, data in pricing_levels["levels"].items(): | |
| logger.info(f" {level}: ₹{data['cost_price']} (MRP-{PRICING_MARGINS[level]['discount_pct']}%)") | |
| logger.info("") | |
| if len(catalogues) > 10: | |
| logger.info(f"... and {len(catalogues) - 10} more catalogues") | |
| logger.info("=" * 60) | |
| logger.info("Run without --preview flag to execute the migration") | |
| except Exception as e: | |
| logger.error(f"Preview failed: {e}") | |
| raise | |
| async def main(): | |
| """Main function with preview option.""" | |
| import sys | |
| if "--preview" in sys.argv: | |
| await preview_migration() | |
| else: | |
| await migrate_pricing_levels() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |