Spaces:
Runtime error
Runtime error
File size: 9,012 Bytes
c93381f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# src/data/migration.py
"""
Database Migration Module
This module provides functions to fix existing database records that have
missing or invalid required fields to ensure they conform to current Pydantic model requirements.
"""
from datetime import datetime, timezone
from typing import Dict, Any
from bson import ObjectId
from pymongo.errors import ConnectionFailure, PyMongoError
from src.data.connection import get_collection, Collections
from src.models.account import Account
from src.models.patient import Patient
from src.utils.logger import logger
# Valid roles for accounts
VALID_ROLES = [
"Doctor",
"Healthcare Prof",
"Nurse",
"Caregiver",
"Physician",
"Medical Student",
"Other"
]
def fix_account_records() -> Dict[str, int]:
"""Fix account records with missing or invalid required fields."""
logger().info("π§ Starting Account records migration...")
collection = get_collection(Collections.ACCOUNT)
now = datetime.now(timezone.utc)
stats = {
'total_checked': 0,
'fixed': 0,
'errors': 0
}
try:
# Find all accounts
cursor = collection.find({})
for doc in cursor:
stats['total_checked'] += 1
doc_id = doc['_id']
try:
# Check what needs to be fixed
updates = {}
needs_fix = False
# Fix missing or None role
if not doc.get('role') or doc.get('role') is None:
updates['role'] = 'Other' # Default role
needs_fix = True
logger().info(f" π Account {doc_id}: Setting role to 'Other' (was: {doc.get('role')})")
# Fix missing or None name
if not doc.get('name') or doc.get('name') is None:
updates['name'] = f"User {str(doc_id)[:8]}" # Generate a name
needs_fix = True
logger().info(f" π Account {doc_id}: Setting name to '{updates['name']}' (was: {doc.get('name')})")
# Fix missing timestamps
if not doc.get('created_at'):
updates['created_at'] = now
needs_fix = True
logger().info(f" π Account {doc_id}: Setting created_at to {now}")
if not doc.get('updated_at'):
updates['updated_at'] = now
needs_fix = True
logger().info(f" π Account {doc_id}: Setting updated_at to {now}")
# Apply updates if needed
if needs_fix:
collection.update_one(
{"_id": doc_id},
{"$set": updates}
)
stats['fixed'] += 1
# Validate the record can be parsed by Pydantic
updated_doc = collection.find_one({"_id": doc_id})
Account.model_validate(updated_doc)
except Exception as e:
stats['errors'] += 1
logger().error(f" β Error fixing account {doc_id}: {e}")
except (ConnectionFailure, PyMongoError) as e:
logger().error(f"β Database error while fixing accounts: {e}")
raise
logger().info(f"β
Account migration completed: {stats['fixed']} records fixed, {stats['errors']} errors")
return stats
def fix_patient_records() -> Dict[str, int]:
"""Fix patient records with missing or invalid required fields."""
logger().info("π§ Starting Patient records migration...")
collection = get_collection(Collections.PATIENT)
now = datetime.now(timezone.utc)
stats = {
'total_checked': 0,
'fixed': 0,
'errors': 0
}
try:
# Find all patients
cursor = collection.find({})
for doc in cursor:
stats['total_checked'] += 1
doc_id = doc['_id']
try:
# Check what needs to be fixed
updates = {}
needs_fix = False
# Fix missing or None name
if not doc.get('name') or doc.get('name') is None:
updates['name'] = f"Patient {str(doc_id)[:8]}" # Generate a name
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting name to '{updates['name']}' (was: {doc.get('name')})")
# Fix missing or None age
if not doc.get('age') or doc.get('age') is None:
updates['age'] = 30 # Default age
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting age to 30 (was: {doc.get('age')})")
# Fix missing or None sex
if not doc.get('sex') or doc.get('sex') is None:
updates['sex'] = 'Other' # Default sex
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting sex to 'Other' (was: {doc.get('sex')})")
# Fix missing or None ethnicity
if not doc.get('ethnicity') or doc.get('ethnicity') is None:
updates['ethnicity'] = 'Not Specified' # Default ethnicity
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting ethnicity to 'Not Specified' (was: {doc.get('ethnicity')})")
# Fix missing timestamps
if not doc.get('created_at'):
updates['created_at'] = now
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting created_at to {now}")
if not doc.get('updated_at'):
updates['updated_at'] = now
needs_fix = True
logger().info(f" π Patient {doc_id}: Setting updated_at to {now}")
# Apply updates if needed
if needs_fix:
collection.update_one(
{"_id": doc_id},
{"$set": updates}
)
stats['fixed'] += 1
# Validate the record can be parsed by Pydantic
updated_doc = collection.find_one({"_id": doc_id})
Patient.model_validate(updated_doc)
except Exception as e:
stats['errors'] += 1
logger().error(f" β Error fixing patient {doc_id}: {e}")
except (ConnectionFailure, PyMongoError) as e:
logger().error(f"β Database error while fixing patients: {e}")
raise
logger().info(f"β
Patient migration completed: {stats['fixed']} records fixed, {stats['errors']} errors")
return stats
def run_database_migration() -> Dict[str, Any]:
"""Run the complete database migration to fix all records."""
logger().info("π Starting Database Migration")
logger().info("=" * 50)
try:
# Fix account records
account_stats = fix_account_records()
logger().info("=" * 50)
# Fix patient records
patient_stats = fix_patient_records()
logger().info("=" * 50)
logger().info("π MIGRATION SUMMARY")
logger().info("=" * 50)
logger().info(f"π Accounts:")
logger().info(f" - Total checked: {account_stats['total_checked']}")
logger().info(f" - Fixed: {account_stats['fixed']}")
logger().info(f" - Errors: {account_stats['errors']}")
logger().info(f"π₯ Patients:")
logger().info(f" - Total checked: {patient_stats['total_checked']}")
logger().info(f" - Fixed: {patient_stats['fixed']}")
logger().info(f" - Errors: {patient_stats['errors']}")
total_fixed = account_stats['fixed'] + patient_stats['fixed']
total_errors = account_stats['errors'] + patient_stats['errors']
logger().info(f"β
Total records fixed: {total_fixed}")
if total_errors > 0:
logger().info(f"β οΈ Total errors: {total_errors}")
logger().info("π Database migration completed successfully!")
return {
'account_stats': account_stats,
'patient_stats': patient_stats,
'total_fixed': total_fixed,
'total_errors': total_errors,
'success': True
}
except Exception as e:
logger().error(f"β Migration failed with error: {e}")
return {
'success': False,
'error': str(e)
}
|