Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Migration script to fix missing fields in warehouse documents. | |
| This script adds missing required fields to existing warehouse documents: | |
| - created_at: Default timestamp for warehouses missing this field | |
| - created_by: Default to "system" for warehouses missing this field | |
| - updated_at: Set to None for warehouses missing this field | |
| - updated_by: Set to None for warehouses missing this field | |
| - capabilities: Default capabilities for warehouses missing this field | |
| Usage: | |
| python migration_fix_warehouse_fields.py | |
| """ | |
| import asyncio | |
| import os | |
| import sys | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any | |
| # Add the app directory to Python path | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '.')) | |
| from app.nosql import get_database, connect_to_mongo, close_mongo_connection | |
| from app.warehouses.constants import SCM_WAREHOUSES_COLLECTION | |
| async def get_default_capabilities() -> Dict[str, Any]: | |
| """Get default capabilities structure.""" | |
| return { | |
| "can_receive": True, | |
| "can_fulfil": True, | |
| "can_sell": False, | |
| "can_adjust": True, | |
| "can_stock_take": True | |
| } | |
| async def fix_warehouse_fields(): | |
| """Fix missing fields in warehouse documents.""" | |
| print("π§ Starting warehouse fields migration...") | |
| try: | |
| db = get_database() | |
| collection = db[SCM_WAREHOUSES_COLLECTION] | |
| # Find all warehouses | |
| warehouses = await collection.find({}).to_list(length=None) | |
| print(f"π¦ Found {len(warehouses)} warehouse documents") | |
| if not warehouses: | |
| print("β No warehouses found. Migration complete.") | |
| return | |
| # Default values | |
| default_timestamp = "2025-01-01T00:00:00Z" | |
| default_capabilities = await get_default_capabilities() | |
| updated_count = 0 | |
| for warehouse in warehouses: | |
| warehouse_id = warehouse.get("warehouse_id", "unknown") | |
| updates = {} | |
| # Check and fix missing fields | |
| if warehouse.get("created_at") is None: | |
| updates["created_at"] = default_timestamp | |
| print(f" π Adding created_at to {warehouse_id}") | |
| if warehouse.get("created_by") is None: | |
| updates["created_by"] = "system" | |
| print(f" π€ Adding created_by to {warehouse_id}") | |
| if warehouse.get("updated_at") is None: | |
| updates["updated_at"] = None | |
| print(f" π Setting updated_at to null for {warehouse_id}") | |
| if warehouse.get("updated_by") is None: | |
| updates["updated_by"] = None | |
| print(f" π€ Setting updated_by to null for {warehouse_id}") | |
| if warehouse.get("capabilities") is None: | |
| updates["capabilities"] = default_capabilities | |
| print(f" βοΈ Adding default capabilities to {warehouse_id}") | |
| # Apply updates if any | |
| if updates: | |
| result = await collection.update_one( | |
| {"warehouse_id": warehouse_id}, | |
| {"$set": updates} | |
| ) | |
| if result.modified_count > 0: | |
| updated_count += 1 | |
| print(f" β Updated {warehouse_id}") | |
| else: | |
| print(f" β οΈ Failed to update {warehouse_id}") | |
| else: | |
| print(f" β¨ {warehouse_id} already has all required fields") | |
| print(f"\nπ Migration complete!") | |
| print(f"π Total warehouses processed: {len(warehouses)}") | |
| print(f"π Warehouses updated: {updated_count}") | |
| print(f"π Warehouses already complete: {len(warehouses) - updated_count}") | |
| except Exception as e: | |
| print(f"β Migration failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| async def verify_migration(): | |
| """Verify that all warehouses now have required fields.""" | |
| print("\nπ Verifying migration results...") | |
| try: | |
| db = get_database() | |
| collection = db[SCM_WAREHOUSES_COLLECTION] | |
| # Check for warehouses missing required fields | |
| missing_fields_queries = [ | |
| {"created_at": {"$exists": False}}, | |
| {"created_at": None}, | |
| {"created_by": {"$exists": False}}, | |
| {"created_by": None}, | |
| {"capabilities": {"$exists": False}}, | |
| {"capabilities": None} | |
| ] | |
| issues_found = False | |
| for query in missing_fields_queries: | |
| count = await collection.count_documents(query) | |
| if count > 0: | |
| field_name = list(query.keys())[0] | |
| print(f" β οΈ Found {count} warehouses still missing {field_name}") | |
| issues_found = True | |
| if not issues_found: | |
| print(" β All warehouses have required fields!") | |
| # Show sample of updated warehouses | |
| sample = await collection.find({}).limit(3).to_list(length=3) | |
| print("\nπ Sample warehouse documents:") | |
| for warehouse in sample: | |
| print(f" - {warehouse.get('warehouse_id')}: " | |
| f"created_at={warehouse.get('created_at')}, " | |
| f"created_by={warehouse.get('created_by')}, " | |
| f"capabilities={'present' if warehouse.get('capabilities') else 'missing'}") | |
| except Exception as e: | |
| print(f"β Verification failed: {e}") | |
| async def main(): | |
| """Main migration function.""" | |
| print("π Warehouse Fields Migration Script") | |
| print("=" * 50) | |
| try: | |
| # Connect to database | |
| print("π Connecting to MongoDB...") | |
| await connect_to_mongo() | |
| print("β Connected to MongoDB") | |
| # Run migration | |
| await fix_warehouse_fields() | |
| # Verify results | |
| await verify_migration() | |
| print("\n⨠Migration script completed!") | |
| except Exception as e: | |
| print(f"β Migration script failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| # Close database connection | |
| print("π Closing MongoDB connection...") | |
| await close_mongo_connection() | |
| print("β MongoDB connection closed") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |