Spaces:
Running
Running
| """ | |
| Database initialization module for AUTH Microservice. | |
| Auto-creates initial users and roles on startup. | |
| """ | |
| from datetime import datetime | |
| from passlib.context import CryptContext | |
| from app.nosql import get_database | |
| from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| async def initialize_database(): | |
| """Initialize database with default roles and users.""" | |
| logger.info("π§ Initializing database...") | |
| try: | |
| db = get_database() | |
| # Migrate existing data | |
| await migrate_existing_users(db) | |
| # Create default roles | |
| await create_default_roles(db) | |
| # Create initial users | |
| await create_initial_users(db) | |
| logger.info("β Database initialization completed") | |
| except Exception as e: | |
| logger.error(f"β Database initialization failed: {e}") | |
| raise | |
| async def migrate_existing_users(db): | |
| """Migrate existing users to add missing required fields.""" | |
| logger.info("π Migrating existing users...") | |
| users_collection = db[AUTH_SYSTEM_USERS_COLLECTION] | |
| try: | |
| # Add role_id field to users missing it | |
| result = await users_collection.update_many( | |
| {"role_id": {"$exists": False}}, | |
| {"$set": {"role_id": "user"}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f" β Added role_id field to {result.modified_count} users") | |
| # Ensure status field exists | |
| result = await users_collection.update_many( | |
| {"status": {"$exists": False}}, | |
| {"$set": {"status": "active"}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f" β Added status field to {result.modified_count} users") | |
| # Ensure security_settings exists | |
| result = await users_collection.update_many( | |
| {"security_settings": {"$exists": False}}, | |
| {"$set": { | |
| "security_settings": { | |
| "require_password_change": False, | |
| "failed_login_attempts": 0, | |
| "login_attempts": [] | |
| } | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f" β Added security_settings to {result.modified_count} users") | |
| except Exception as e: | |
| logger.warning(f"β Migration warning: {e}") | |
| async def create_default_roles(db): | |
| """Create default access roles.""" | |
| roles_collection = db[AUTH_ACCESS_ROLES_COLLECTION] | |
| default_roles = [ | |
| { | |
| "role_id": "role_super_admin", | |
| "role_name": "super_admin", | |
| "description": "Super Administrator with full system access", | |
| "permissions": { | |
| "users": ["view", "create", "update", "delete"], | |
| "roles": ["view", "create", "update", "delete"], | |
| "settings": ["view", "update"], | |
| "auth": ["view", "manage"], | |
| "system": ["view", "manage"] | |
| }, | |
| "is_active": True | |
| }, | |
| { | |
| "role_id": "role_admin", | |
| "role_name": "admin", | |
| "description": "Administrator with limited system access", | |
| "permissions": { | |
| "users": ["view", "create", "update"], | |
| "roles": ["view"], | |
| "settings": ["view", "update"], | |
| "auth": ["view"] | |
| }, | |
| "is_active": True | |
| }, | |
| { | |
| "role_id": "role_manager", | |
| "role_name": "manager", | |
| "description": "Manager with team management capabilities", | |
| "permissions": { | |
| "users": ["view", "update"], | |
| "auth": ["view"] | |
| }, | |
| "is_active": True | |
| }, | |
| { | |
| "role_id": "role_user", | |
| "role_name": "user", | |
| "description": "Standard user with basic access", | |
| "permissions": { | |
| "auth": ["view"] | |
| }, | |
| "is_active": True | |
| } | |
| ] | |
| for role in default_roles: | |
| existing = await roles_collection.find_one({"role_name": role["role_name"]}) | |
| if not existing: | |
| await roles_collection.insert_one(role) | |
| logger.info(f" β Created role: {role['role_name']}") | |
| else: | |
| logger.info(f" β³ Role exists: {role['role_name']}") | |
| async def create_initial_users(db): | |
| """Create initial system users.""" | |
| users_collection = db[AUTH_SYSTEM_USERS_COLLECTION] | |
| initial_users = [ | |
| { | |
| "user_id": "usr_superadmin_001", | |
| "username": "superadmin", | |
| "email": "superadmin@cuatrolabs.com", | |
| "password_hash": pwd_context.hash("SuperAdmin@123!"), | |
| "first_name": "Super", | |
| "last_name": "Admin", | |
| "phone": "+919999999999", | |
| "role_id": "role_super_admin", | |
| "permissions": { | |
| "users": ["view", "create", "update", "delete"], | |
| "roles": ["view", "create", "update", "delete"], | |
| "settings": ["view", "update"], | |
| "auth": ["view", "manage"], | |
| "system": ["view", "manage"] | |
| }, | |
| "status": "active", | |
| "security_settings": { | |
| "require_password_change": False, | |
| "failed_login_attempts": 0, | |
| "login_attempts": [] | |
| }, | |
| "timezone": "UTC", | |
| "language": "en", | |
| "created_by": "system", | |
| "created_at": datetime.utcnow() | |
| }, | |
| { | |
| "user_id": "usr_admin_001", | |
| "username": "admin", | |
| "email": "admin@cuatrolabs.com", | |
| "password_hash": pwd_context.hash("CompanyAdmin@123!"), | |
| "first_name": "Company", | |
| "last_name": "Admin", | |
| "phone": "+919999999998", | |
| "role_id": "role_company_admin", | |
| "permissions": { | |
| "users": ["view", "create", "update"], | |
| "roles": ["view"], | |
| "settings": ["view", "update"], | |
| "auth": ["view"] | |
| }, | |
| "status": "active", | |
| "security_settings": { | |
| "require_password_change": False, | |
| "failed_login_attempts": 0, | |
| "login_attempts": [] | |
| }, | |
| "timezone": "UTC", | |
| "language": "en", | |
| "created_by": "system", | |
| "created_at": datetime.utcnow() | |
| } | |
| ] | |
| for user in initial_users: | |
| existing = await users_collection.find_one({"email": user["email"]}) | |
| if not existing: | |
| await users_collection.insert_one(user) | |
| logger.info(f" β Created user: {user['email']}") | |
| else: | |
| # Update existing user if missing required fields | |
| updates = {} | |
| if "role_id" not in existing: | |
| updates["role_id"] = user["role_id"] | |
| if "status" not in existing: | |
| updates["status"] = "active" | |
| if "security_settings" not in existing: | |
| updates["security_settings"] = user["security_settings"] | |
| if updates: | |
| await users_collection.update_one( | |
| {"email": user["email"]}, | |
| {"$set": updates} | |
| ) | |
| logger.info(f" β Updated existing user: {user['email']} (added missing fields)") | |
| else: | |
| logger.info(f" β³ User exists: {user['email']}") | |
| logger.info("\nπ Default Credentials:") | |
| logger.info(" superadmin@cuatrolabs.com / SuperAdmin@123") | |
| logger.info(" admin@cuatrolabs.com / CompanyAdmin@123") | |