cuatrolabs-scm-ms / docs /database /migrations /migration_fix_warehouse_fields.py
MukeshKapoor25's picture
refactor(database): reorganize database scripts and examples into docs directory
f24ee1d
#!/usr/bin/env python3
"""
Migration script to fix missing fields in warehouse documents.
This script adds missing required fields to existing warehouse documents:
- created_at: Default timestamp for warehouses missing this field
- created_by: Default to "system" for warehouses missing this field
- updated_at: Set to None for warehouses missing this field
- updated_by: Set to None for warehouses missing this field
- capabilities: Default capabilities for warehouses missing this field
Usage:
python migration_fix_warehouse_fields.py
"""
import asyncio
import os
import sys
from datetime import datetime, timezone
from typing import Dict, Any
# Add the app directory to Python path
sys.path.append(os.path.join(os.path.dirname(__file__), '.'))
from app.nosql import get_database, connect_to_mongo, close_mongo_connection
from app.warehouses.constants import SCM_WAREHOUSES_COLLECTION
async def get_default_capabilities() -> Dict[str, Any]:
"""Get default capabilities structure."""
return {
"can_receive": True,
"can_fulfil": True,
"can_sell": False,
"can_adjust": True,
"can_stock_take": True
}
async def fix_warehouse_fields():
"""Fix missing fields in warehouse documents."""
print("πŸ”§ Starting warehouse fields migration...")
try:
db = get_database()
collection = db[SCM_WAREHOUSES_COLLECTION]
# Find all warehouses
warehouses = await collection.find({}).to_list(length=None)
print(f"πŸ“¦ Found {len(warehouses)} warehouse documents")
if not warehouses:
print("βœ… No warehouses found. Migration complete.")
return
# Default values
default_timestamp = "2025-01-01T00:00:00Z"
default_capabilities = await get_default_capabilities()
updated_count = 0
for warehouse in warehouses:
warehouse_id = warehouse.get("warehouse_id", "unknown")
updates = {}
# Check and fix missing fields
if warehouse.get("created_at") is None:
updates["created_at"] = default_timestamp
print(f" πŸ“… Adding created_at to {warehouse_id}")
if warehouse.get("created_by") is None:
updates["created_by"] = "system"
print(f" πŸ‘€ Adding created_by to {warehouse_id}")
if warehouse.get("updated_at") is None:
updates["updated_at"] = None
print(f" πŸ“… Setting updated_at to null for {warehouse_id}")
if warehouse.get("updated_by") is None:
updates["updated_by"] = None
print(f" πŸ‘€ Setting updated_by to null for {warehouse_id}")
if warehouse.get("capabilities") is None:
updates["capabilities"] = default_capabilities
print(f" βš™οΈ Adding default capabilities to {warehouse_id}")
# Apply updates if any
if updates:
result = await collection.update_one(
{"warehouse_id": warehouse_id},
{"$set": updates}
)
if result.modified_count > 0:
updated_count += 1
print(f" βœ… Updated {warehouse_id}")
else:
print(f" ⚠️ Failed to update {warehouse_id}")
else:
print(f" ✨ {warehouse_id} already has all required fields")
print(f"\nπŸŽ‰ Migration complete!")
print(f"πŸ“Š Total warehouses processed: {len(warehouses)}")
print(f"πŸ“Š Warehouses updated: {updated_count}")
print(f"πŸ“Š Warehouses already complete: {len(warehouses) - updated_count}")
except Exception as e:
print(f"❌ Migration failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
async def verify_migration():
"""Verify that all warehouses now have required fields."""
print("\nπŸ” Verifying migration results...")
try:
db = get_database()
collection = db[SCM_WAREHOUSES_COLLECTION]
# Check for warehouses missing required fields
missing_fields_queries = [
{"created_at": {"$exists": False}},
{"created_at": None},
{"created_by": {"$exists": False}},
{"created_by": None},
{"capabilities": {"$exists": False}},
{"capabilities": None}
]
issues_found = False
for query in missing_fields_queries:
count = await collection.count_documents(query)
if count > 0:
field_name = list(query.keys())[0]
print(f" ⚠️ Found {count} warehouses still missing {field_name}")
issues_found = True
if not issues_found:
print(" βœ… All warehouses have required fields!")
# Show sample of updated warehouses
sample = await collection.find({}).limit(3).to_list(length=3)
print("\nπŸ“‹ Sample warehouse documents:")
for warehouse in sample:
print(f" - {warehouse.get('warehouse_id')}: "
f"created_at={warehouse.get('created_at')}, "
f"created_by={warehouse.get('created_by')}, "
f"capabilities={'present' if warehouse.get('capabilities') else 'missing'}")
except Exception as e:
print(f"❌ Verification failed: {e}")
async def main():
"""Main migration function."""
print("πŸš€ Warehouse Fields Migration Script")
print("=" * 50)
try:
# Connect to database
print("πŸ”Œ Connecting to MongoDB...")
await connect_to_mongo()
print("βœ… Connected to MongoDB")
# Run migration
await fix_warehouse_fields()
# Verify results
await verify_migration()
print("\n✨ Migration script completed!")
except Exception as e:
print(f"❌ Migration script failed: {e}")
import traceback
traceback.print_exc()
finally:
# Close database connection
print("πŸ”Œ Closing MongoDB connection...")
await close_mongo_connection()
print("βœ… MongoDB connection closed")
if __name__ == "__main__":
asyncio.run(main())