""" Database initialization module for AUTH Microservice. Auto-creates initial users and roles on startup. """ from datetime import datetime from passlib.context import CryptContext from app.nosql import get_database from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION from app.core.logging import get_logger logger = get_logger(__name__) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") async def initialize_database(): """Initialize database with default roles and users.""" logger.info("šŸ”§ Initializing database...") try: db = get_database() # Migrate existing data await migrate_existing_users(db) # Create default roles await create_default_roles(db) # Create initial users await create_initial_users(db) logger.info("āœ… Database initialization completed") except Exception as e: logger.error(f"āŒ Database initialization failed: {e}") raise async def migrate_existing_users(db): """Migrate existing users to add missing required fields.""" logger.info("šŸ“‹ Migrating existing users...") users_collection = db[AUTH_SYSTEM_USERS_COLLECTION] try: # Add role_id field to users missing it result = await users_collection.update_many( {"role_id": {"$exists": False}}, {"$set": {"role_id": "user"}} ) if result.modified_count > 0: logger.info(f" āœ“ Added role_id field to {result.modified_count} users") # Ensure status field exists result = await users_collection.update_many( {"status": {"$exists": False}}, {"$set": {"status": "active"}} ) if result.modified_count > 0: logger.info(f" āœ“ Added status field to {result.modified_count} users") # Ensure security_settings exists result = await users_collection.update_many( {"security_settings": {"$exists": False}}, {"$set": { "security_settings": { "require_password_change": False, "failed_login_attempts": 0, "login_attempts": [] } }} ) if result.modified_count > 0: logger.info(f" āœ“ Added security_settings to {result.modified_count} users") except Exception as e: logger.warning(f"⚠ Migration warning: {e}") async def create_default_roles(db): """Create default access roles.""" roles_collection = db[AUTH_ACCESS_ROLES_COLLECTION] default_roles = [ { "role_id": "role_super_admin", "role_name": "super_admin", "description": "Super Administrator with full system access", "permissions": { "users": ["view", "create", "update", "delete"], "roles": ["view", "create", "update", "delete"], "settings": ["view", "update"], "auth": ["view", "manage"], "system": ["view", "manage"] }, "is_active": True }, { "role_id": "role_admin", "role_name": "admin", "description": "Administrator with limited system access", "permissions": { "users": ["view", "create", "update"], "roles": ["view"], "settings": ["view", "update"], "auth": ["view"] }, "is_active": True }, { "role_id": "role_manager", "role_name": "manager", "description": "Manager with team management capabilities", "permissions": { "users": ["view", "update"], "auth": ["view"] }, "is_active": True }, { "role_id": "role_user", "role_name": "user", "description": "Standard user with basic access", "permissions": { "auth": ["view"] }, "is_active": True } ] for role in default_roles: existing = await roles_collection.find_one({"role_name": role["role_name"]}) if not existing: await roles_collection.insert_one(role) logger.info(f" āœ“ Created role: {role['role_name']}") else: logger.info(f" ⊳ Role exists: {role['role_name']}") async def create_initial_users(db): """Create initial system users.""" users_collection = db[AUTH_SYSTEM_USERS_COLLECTION] initial_users = [ { "user_id": "usr_superadmin_001", "username": "superadmin", "email": "superadmin@cuatrolabs.com", "password_hash": pwd_context.hash("SuperAdmin@123!"), "first_name": "Super", "last_name": "Admin", "phone": "+919999999999", "role_id": "role_super_admin", "permissions": { "users": ["view", "create", "update", "delete"], "roles": ["view", "create", "update", "delete"], "settings": ["view", "update"], "auth": ["view", "manage"], "system": ["view", "manage"] }, "status": "active", "security_settings": { "require_password_change": False, "failed_login_attempts": 0, "login_attempts": [] }, "timezone": "UTC", "language": "en", "created_by": "system", "created_at": datetime.utcnow() }, { "user_id": "usr_admin_001", "username": "admin", "email": "admin@cuatrolabs.com", "password_hash": pwd_context.hash("CompanyAdmin@123!"), "first_name": "Company", "last_name": "Admin", "phone": "+919999999998", "role_id": "role_company_admin", "permissions": { "users": ["view", "create", "update"], "roles": ["view"], "settings": ["view", "update"], "auth": ["view"] }, "status": "active", "security_settings": { "require_password_change": False, "failed_login_attempts": 0, "login_attempts": [] }, "timezone": "UTC", "language": "en", "created_by": "system", "created_at": datetime.utcnow() } ] for user in initial_users: existing = await users_collection.find_one({"email": user["email"]}) if not existing: await users_collection.insert_one(user) logger.info(f" āœ“ Created user: {user['email']}") else: # Update existing user if missing required fields updates = {} if "role_id" not in existing: updates["role_id"] = user["role_id"] if "status" not in existing: updates["status"] = "active" if "security_settings" not in existing: updates["security_settings"] = user["security_settings"] if updates: await users_collection.update_one( {"email": user["email"]}, {"$set": updates} ) logger.info(f" ⚠ Updated existing user: {user['email']} (added missing fields)") else: logger.info(f" ⊳ User exists: {user['email']}") logger.info("\nšŸ“ Default Credentials:") logger.info(" superadmin@cuatrolabs.com / SuperAdmin@123") logger.info(" admin@cuatrolabs.com / CompanyAdmin@123")