#!/usr/bin/env python3 """ Migration script to fix missing fields in warehouse documents. This script adds missing required fields to existing warehouse documents: - created_at: Default timestamp for warehouses missing this field - created_by: Default to "system" for warehouses missing this field - updated_at: Set to None for warehouses missing this field - updated_by: Set to None for warehouses missing this field - capabilities: Default capabilities for warehouses missing this field Usage: python migration_fix_warehouse_fields.py """ import asyncio import os import sys from datetime import datetime, timezone from typing import Dict, Any # Add the app directory to Python path sys.path.append(os.path.join(os.path.dirname(__file__), '.')) from app.nosql import get_database, connect_to_mongo, close_mongo_connection from app.warehouses.constants import SCM_WAREHOUSES_COLLECTION async def get_default_capabilities() -> Dict[str, Any]: """Get default capabilities structure.""" return { "can_receive": True, "can_fulfil": True, "can_sell": False, "can_adjust": True, "can_stock_take": True } async def fix_warehouse_fields(): """Fix missing fields in warehouse documents.""" print("šŸ”§ Starting warehouse fields migration...") try: db = get_database() collection = db[SCM_WAREHOUSES_COLLECTION] # Find all warehouses warehouses = await collection.find({}).to_list(length=None) print(f"šŸ“¦ Found {len(warehouses)} warehouse documents") if not warehouses: print("āœ… No warehouses found. Migration complete.") return # Default values default_timestamp = "2025-01-01T00:00:00Z" default_capabilities = await get_default_capabilities() updated_count = 0 for warehouse in warehouses: warehouse_id = warehouse.get("warehouse_id", "unknown") updates = {} # Check and fix missing fields if warehouse.get("created_at") is None: updates["created_at"] = default_timestamp print(f" šŸ“… Adding created_at to {warehouse_id}") if warehouse.get("created_by") is None: updates["created_by"] = "system" print(f" šŸ‘¤ Adding created_by to {warehouse_id}") if warehouse.get("updated_at") is None: updates["updated_at"] = None print(f" šŸ“… Setting updated_at to null for {warehouse_id}") if warehouse.get("updated_by") is None: updates["updated_by"] = None print(f" šŸ‘¤ Setting updated_by to null for {warehouse_id}") if warehouse.get("capabilities") is None: updates["capabilities"] = default_capabilities print(f" āš™ļø Adding default capabilities to {warehouse_id}") # Apply updates if any if updates: result = await collection.update_one( {"warehouse_id": warehouse_id}, {"$set": updates} ) if result.modified_count > 0: updated_count += 1 print(f" āœ… Updated {warehouse_id}") else: print(f" āš ļø Failed to update {warehouse_id}") else: print(f" ✨ {warehouse_id} already has all required fields") print(f"\nšŸŽ‰ Migration complete!") print(f"šŸ“Š Total warehouses processed: {len(warehouses)}") print(f"šŸ“Š Warehouses updated: {updated_count}") print(f"šŸ“Š Warehouses already complete: {len(warehouses) - updated_count}") except Exception as e: print(f"āŒ Migration failed: {e}") import traceback traceback.print_exc() sys.exit(1) async def verify_migration(): """Verify that all warehouses now have required fields.""" print("\nšŸ” Verifying migration results...") try: db = get_database() collection = db[SCM_WAREHOUSES_COLLECTION] # Check for warehouses missing required fields missing_fields_queries = [ {"created_at": {"$exists": False}}, {"created_at": None}, {"created_by": {"$exists": False}}, {"created_by": None}, {"capabilities": {"$exists": False}}, {"capabilities": None} ] issues_found = False for query in missing_fields_queries: count = await collection.count_documents(query) if count > 0: field_name = list(query.keys())[0] print(f" āš ļø Found {count} warehouses still missing {field_name}") issues_found = True if not issues_found: print(" āœ… All warehouses have required fields!") # Show sample of updated warehouses sample = await collection.find({}).limit(3).to_list(length=3) print("\nšŸ“‹ Sample warehouse documents:") for warehouse in sample: print(f" - {warehouse.get('warehouse_id')}: " f"created_at={warehouse.get('created_at')}, " f"created_by={warehouse.get('created_by')}, " f"capabilities={'present' if warehouse.get('capabilities') else 'missing'}") except Exception as e: print(f"āŒ Verification failed: {e}") async def main(): """Main migration function.""" print("šŸš€ Warehouse Fields Migration Script") print("=" * 50) try: # Connect to database print("šŸ”Œ Connecting to MongoDB...") await connect_to_mongo() print("āœ… Connected to MongoDB") # Run migration await fix_warehouse_fields() # Verify results await verify_migration() print("\n✨ Migration script completed!") except Exception as e: print(f"āŒ Migration script failed: {e}") import traceback traceback.print_exc() finally: # Close database connection print("šŸ”Œ Closing MongoDB connection...") await close_mongo_connection() print("āœ… MongoDB connection closed") if __name__ == "__main__": asyncio.run(main())