Spaces:
Running
Running
File size: 6,578 Bytes
b127e68 4292ec7 b127e68 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | #!/usr/bin/env python3
"""
Migration script to fix missing fields in warehouse documents.
This script adds missing required fields to existing warehouse documents:
- created_at: Default timestamp for warehouses missing this field
- created_by: Default to "system" for warehouses missing this field
- updated_at: Set to None for warehouses missing this field
- updated_by: Set to None for warehouses missing this field
- capabilities: Default capabilities for warehouses missing this field
Usage:
python migration_fix_warehouse_fields.py
"""
import asyncio
import os
import sys
from datetime import datetime, timezone
from typing import Dict, Any
# Add the app directory to Python path
sys.path.append(os.path.join(os.path.dirname(__file__), '.'))
from app.nosql import get_database, connect_to_mongo, close_mongo_connection
from app.warehouses.constants import SCM_WAREHOUSES_COLLECTION
async def get_default_capabilities() -> Dict[str, Any]:
"""Get default capabilities structure."""
return {
"can_receive": True,
"can_fulfil": True,
"can_sell": False,
"can_adjust": True,
"can_stock_take": True
}
async def fix_warehouse_fields():
"""Fix missing fields in warehouse documents."""
print("π§ Starting warehouse fields migration...")
try:
db = get_database()
collection = db[SCM_WAREHOUSES_COLLECTION]
# Find all warehouses
warehouses = await collection.find({}).to_list(length=None)
print(f"π¦ Found {len(warehouses)} warehouse documents")
if not warehouses:
print("β
No warehouses found. Migration complete.")
return
# Default values
default_timestamp = "2025-01-01T00:00:00Z"
default_capabilities = await get_default_capabilities()
updated_count = 0
for warehouse in warehouses:
warehouse_id = warehouse.get("warehouse_id", "unknown")
updates = {}
# Check and fix missing fields
if warehouse.get("created_at") is None:
updates["created_at"] = default_timestamp
print(f" π
Adding created_at to {warehouse_id}")
if warehouse.get("created_by") is None:
updates["created_by"] = "system"
print(f" π€ Adding created_by to {warehouse_id}")
if warehouse.get("updated_at") is None:
updates["updated_at"] = None
print(f" π
Setting updated_at to null for {warehouse_id}")
if warehouse.get("updated_by") is None:
updates["updated_by"] = None
print(f" π€ Setting updated_by to null for {warehouse_id}")
if warehouse.get("capabilities") is None:
updates["capabilities"] = default_capabilities
print(f" βοΈ Adding default capabilities to {warehouse_id}")
# Apply updates if any
if updates:
result = await collection.update_one(
{"warehouse_id": warehouse_id},
{"$set": updates}
)
if result.modified_count > 0:
updated_count += 1
print(f" β
Updated {warehouse_id}")
else:
print(f" β οΈ Failed to update {warehouse_id}")
else:
print(f" β¨ {warehouse_id} already has all required fields")
print(f"\nπ Migration complete!")
print(f"π Total warehouses processed: {len(warehouses)}")
print(f"π Warehouses updated: {updated_count}")
print(f"π Warehouses already complete: {len(warehouses) - updated_count}")
except Exception as e:
print(f"β Migration failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
async def verify_migration():
"""Verify that all warehouses now have required fields."""
print("\nπ Verifying migration results...")
try:
db = get_database()
collection = db[SCM_WAREHOUSES_COLLECTION]
# Check for warehouses missing required fields
missing_fields_queries = [
{"created_at": {"$exists": False}},
{"created_at": None},
{"created_by": {"$exists": False}},
{"created_by": None},
{"capabilities": {"$exists": False}},
{"capabilities": None}
]
issues_found = False
for query in missing_fields_queries:
count = await collection.count_documents(query)
if count > 0:
field_name = list(query.keys())[0]
print(f" β οΈ Found {count} warehouses still missing {field_name}")
issues_found = True
if not issues_found:
print(" β
All warehouses have required fields!")
# Show sample of updated warehouses
sample = await collection.find({}).limit(3).to_list(length=3)
print("\nπ Sample warehouse documents:")
for warehouse in sample:
print(f" - {warehouse.get('warehouse_id')}: "
f"created_at={warehouse.get('created_at')}, "
f"created_by={warehouse.get('created_by')}, "
f"capabilities={'present' if warehouse.get('capabilities') else 'missing'}")
except Exception as e:
print(f"β Verification failed: {e}")
async def main():
"""Main migration function."""
print("π Warehouse Fields Migration Script")
print("=" * 50)
try:
# Connect to database
print("π Connecting to MongoDB...")
await connect_to_mongo()
print("β
Connected to MongoDB")
# Run migration
await fix_warehouse_fields()
# Verify results
await verify_migration()
print("\n⨠Migration script completed!")
except Exception as e:
print(f"β Migration script failed: {e}")
import traceback
traceback.print_exc()
finally:
# Close database connection
print("π Closing MongoDB connection...")
await close_mongo_connection()
print("β
MongoDB connection closed")
if __name__ == "__main__":
asyncio.run(main()) |