Spaces:
Runtime error
Runtime error
For old persisted schema that may not adapted with new scheme, add migration service that auto apply mismatch. Upd titile tag and bold bubble as external UI rendering strats
c93381f
| # src/data/migration.py | |
| """ | |
| Database Migration Module | |
| This module provides functions to fix existing database records that have | |
| missing or invalid required fields to ensure they conform to current Pydantic model requirements. | |
| """ | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any | |
| from bson import ObjectId | |
| from pymongo.errors import ConnectionFailure, PyMongoError | |
| from src.data.connection import get_collection, Collections | |
| from src.models.account import Account | |
| from src.models.patient import Patient | |
| from src.utils.logger import logger | |
| # Valid roles for accounts | |
| VALID_ROLES = [ | |
| "Doctor", | |
| "Healthcare Prof", | |
| "Nurse", | |
| "Caregiver", | |
| "Physician", | |
| "Medical Student", | |
| "Other" | |
| ] | |
| def fix_account_records() -> Dict[str, int]: | |
| """Fix account records with missing or invalid required fields.""" | |
| logger().info("π§ Starting Account records migration...") | |
| collection = get_collection(Collections.ACCOUNT) | |
| now = datetime.now(timezone.utc) | |
| stats = { | |
| 'total_checked': 0, | |
| 'fixed': 0, | |
| 'errors': 0 | |
| } | |
| try: | |
| # Find all accounts | |
| cursor = collection.find({}) | |
| for doc in cursor: | |
| stats['total_checked'] += 1 | |
| doc_id = doc['_id'] | |
| try: | |
| # Check what needs to be fixed | |
| updates = {} | |
| needs_fix = False | |
| # Fix missing or None role | |
| if not doc.get('role') or doc.get('role') is None: | |
| updates['role'] = 'Other' # Default role | |
| needs_fix = True | |
| logger().info(f" π Account {doc_id}: Setting role to 'Other' (was: {doc.get('role')})") | |
| # Fix missing or None name | |
| if not doc.get('name') or doc.get('name') is None: | |
| updates['name'] = f"User {str(doc_id)[:8]}" # Generate a name | |
| needs_fix = True | |
| logger().info(f" π Account {doc_id}: Setting name to '{updates['name']}' (was: {doc.get('name')})") | |
| # Fix missing timestamps | |
| if not doc.get('created_at'): | |
| updates['created_at'] = now | |
| needs_fix = True | |
| logger().info(f" π Account {doc_id}: Setting created_at to {now}") | |
| if not doc.get('updated_at'): | |
| updates['updated_at'] = now | |
| needs_fix = True | |
| logger().info(f" π Account {doc_id}: Setting updated_at to {now}") | |
| # Apply updates if needed | |
| if needs_fix: | |
| collection.update_one( | |
| {"_id": doc_id}, | |
| {"$set": updates} | |
| ) | |
| stats['fixed'] += 1 | |
| # Validate the record can be parsed by Pydantic | |
| updated_doc = collection.find_one({"_id": doc_id}) | |
| Account.model_validate(updated_doc) | |
| except Exception as e: | |
| stats['errors'] += 1 | |
| logger().error(f" β Error fixing account {doc_id}: {e}") | |
| except (ConnectionFailure, PyMongoError) as e: | |
| logger().error(f"β Database error while fixing accounts: {e}") | |
| raise | |
| logger().info(f"β Account migration completed: {stats['fixed']} records fixed, {stats['errors']} errors") | |
| return stats | |
| def fix_patient_records() -> Dict[str, int]: | |
| """Fix patient records with missing or invalid required fields.""" | |
| logger().info("π§ Starting Patient records migration...") | |
| collection = get_collection(Collections.PATIENT) | |
| now = datetime.now(timezone.utc) | |
| stats = { | |
| 'total_checked': 0, | |
| 'fixed': 0, | |
| 'errors': 0 | |
| } | |
| try: | |
| # Find all patients | |
| cursor = collection.find({}) | |
| for doc in cursor: | |
| stats['total_checked'] += 1 | |
| doc_id = doc['_id'] | |
| try: | |
| # Check what needs to be fixed | |
| updates = {} | |
| needs_fix = False | |
| # Fix missing or None name | |
| if not doc.get('name') or doc.get('name') is None: | |
| updates['name'] = f"Patient {str(doc_id)[:8]}" # Generate a name | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting name to '{updates['name']}' (was: {doc.get('name')})") | |
| # Fix missing or None age | |
| if not doc.get('age') or doc.get('age') is None: | |
| updates['age'] = 30 # Default age | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting age to 30 (was: {doc.get('age')})") | |
| # Fix missing or None sex | |
| if not doc.get('sex') or doc.get('sex') is None: | |
| updates['sex'] = 'Other' # Default sex | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting sex to 'Other' (was: {doc.get('sex')})") | |
| # Fix missing or None ethnicity | |
| if not doc.get('ethnicity') or doc.get('ethnicity') is None: | |
| updates['ethnicity'] = 'Not Specified' # Default ethnicity | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting ethnicity to 'Not Specified' (was: {doc.get('ethnicity')})") | |
| # Fix missing timestamps | |
| if not doc.get('created_at'): | |
| updates['created_at'] = now | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting created_at to {now}") | |
| if not doc.get('updated_at'): | |
| updates['updated_at'] = now | |
| needs_fix = True | |
| logger().info(f" π Patient {doc_id}: Setting updated_at to {now}") | |
| # Apply updates if needed | |
| if needs_fix: | |
| collection.update_one( | |
| {"_id": doc_id}, | |
| {"$set": updates} | |
| ) | |
| stats['fixed'] += 1 | |
| # Validate the record can be parsed by Pydantic | |
| updated_doc = collection.find_one({"_id": doc_id}) | |
| Patient.model_validate(updated_doc) | |
| except Exception as e: | |
| stats['errors'] += 1 | |
| logger().error(f" β Error fixing patient {doc_id}: {e}") | |
| except (ConnectionFailure, PyMongoError) as e: | |
| logger().error(f"β Database error while fixing patients: {e}") | |
| raise | |
| logger().info(f"β Patient migration completed: {stats['fixed']} records fixed, {stats['errors']} errors") | |
| return stats | |
| def run_database_migration() -> Dict[str, Any]: | |
| """Run the complete database migration to fix all records.""" | |
| logger().info("π Starting Database Migration") | |
| logger().info("=" * 50) | |
| try: | |
| # Fix account records | |
| account_stats = fix_account_records() | |
| logger().info("=" * 50) | |
| # Fix patient records | |
| patient_stats = fix_patient_records() | |
| logger().info("=" * 50) | |
| logger().info("π MIGRATION SUMMARY") | |
| logger().info("=" * 50) | |
| logger().info(f"π Accounts:") | |
| logger().info(f" - Total checked: {account_stats['total_checked']}") | |
| logger().info(f" - Fixed: {account_stats['fixed']}") | |
| logger().info(f" - Errors: {account_stats['errors']}") | |
| logger().info(f"π₯ Patients:") | |
| logger().info(f" - Total checked: {patient_stats['total_checked']}") | |
| logger().info(f" - Fixed: {patient_stats['fixed']}") | |
| logger().info(f" - Errors: {patient_stats['errors']}") | |
| total_fixed = account_stats['fixed'] + patient_stats['fixed'] | |
| total_errors = account_stats['errors'] + patient_stats['errors'] | |
| logger().info(f"β Total records fixed: {total_fixed}") | |
| if total_errors > 0: | |
| logger().info(f"β οΈ Total errors: {total_errors}") | |
| logger().info("π Database migration completed successfully!") | |
| return { | |
| 'account_stats': account_stats, | |
| 'patient_stats': patient_stats, | |
| 'total_fixed': total_fixed, | |
| 'total_errors': total_errors, | |
| 'success': True | |
| } | |
| except Exception as e: | |
| logger().error(f"β Migration failed with error: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |