MukeshKapoor25's picture
Implement structured logging system across AUTH Microservice
755d6cd
"""
Database initialization module for AUTH Microservice.
Auto-creates initial users and roles on startup.
"""
from datetime import datetime
from passlib.context import CryptContext
from app.nosql import get_database
from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION
from app.core.logging import get_logger
logger = get_logger(__name__)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def initialize_database():
"""Initialize database with default roles and users."""
logger.info("πŸ”§ Initializing database...")
try:
db = get_database()
# Migrate existing data
await migrate_existing_users(db)
# Create default roles
await create_default_roles(db)
# Create initial users
await create_initial_users(db)
logger.info("βœ… Database initialization completed")
except Exception as e:
logger.error(f"❌ Database initialization failed: {e}")
raise
async def migrate_existing_users(db):
"""Migrate existing users to add missing required fields."""
logger.info("πŸ“‹ Migrating existing users...")
users_collection = db[AUTH_SYSTEM_USERS_COLLECTION]
try:
# Add role_id field to users missing it
result = await users_collection.update_many(
{"role_id": {"$exists": False}},
{"$set": {"role_id": "user"}}
)
if result.modified_count > 0:
logger.info(f" βœ“ Added role_id field to {result.modified_count} users")
# Ensure status field exists
result = await users_collection.update_many(
{"status": {"$exists": False}},
{"$set": {"status": "active"}}
)
if result.modified_count > 0:
logger.info(f" βœ“ Added status field to {result.modified_count} users")
# Ensure security_settings exists
result = await users_collection.update_many(
{"security_settings": {"$exists": False}},
{"$set": {
"security_settings": {
"require_password_change": False,
"failed_login_attempts": 0,
"login_attempts": []
}
}}
)
if result.modified_count > 0:
logger.info(f" βœ“ Added security_settings to {result.modified_count} users")
except Exception as e:
logger.warning(f"⚠ Migration warning: {e}")
async def create_default_roles(db):
"""Create default access roles."""
roles_collection = db[AUTH_ACCESS_ROLES_COLLECTION]
default_roles = [
{
"role_id": "role_super_admin",
"role_name": "super_admin",
"description": "Super Administrator with full system access",
"permissions": {
"users": ["view", "create", "update", "delete"],
"roles": ["view", "create", "update", "delete"],
"settings": ["view", "update"],
"auth": ["view", "manage"],
"system": ["view", "manage"]
},
"is_active": True
},
{
"role_id": "role_admin",
"role_name": "admin",
"description": "Administrator with limited system access",
"permissions": {
"users": ["view", "create", "update"],
"roles": ["view"],
"settings": ["view", "update"],
"auth": ["view"]
},
"is_active": True
},
{
"role_id": "role_manager",
"role_name": "manager",
"description": "Manager with team management capabilities",
"permissions": {
"users": ["view", "update"],
"auth": ["view"]
},
"is_active": True
},
{
"role_id": "role_user",
"role_name": "user",
"description": "Standard user with basic access",
"permissions": {
"auth": ["view"]
},
"is_active": True
}
]
for role in default_roles:
existing = await roles_collection.find_one({"role_name": role["role_name"]})
if not existing:
await roles_collection.insert_one(role)
logger.info(f" βœ“ Created role: {role['role_name']}")
else:
logger.info(f" ⊳ Role exists: {role['role_name']}")
async def create_initial_users(db):
"""Create initial system users."""
users_collection = db[AUTH_SYSTEM_USERS_COLLECTION]
initial_users = [
{
"user_id": "usr_superadmin_001",
"username": "superadmin",
"email": "superadmin@cuatrolabs.com",
"password_hash": pwd_context.hash("SuperAdmin@123!"),
"first_name": "Super",
"last_name": "Admin",
"phone": "+919999999999",
"role_id": "role_super_admin",
"permissions": {
"users": ["view", "create", "update", "delete"],
"roles": ["view", "create", "update", "delete"],
"settings": ["view", "update"],
"auth": ["view", "manage"],
"system": ["view", "manage"]
},
"status": "active",
"security_settings": {
"require_password_change": False,
"failed_login_attempts": 0,
"login_attempts": []
},
"timezone": "UTC",
"language": "en",
"created_by": "system",
"created_at": datetime.utcnow()
},
{
"user_id": "usr_admin_001",
"username": "admin",
"email": "admin@cuatrolabs.com",
"password_hash": pwd_context.hash("CompanyAdmin@123!"),
"first_name": "Company",
"last_name": "Admin",
"phone": "+919999999998",
"role_id": "role_company_admin",
"permissions": {
"users": ["view", "create", "update"],
"roles": ["view"],
"settings": ["view", "update"],
"auth": ["view"]
},
"status": "active",
"security_settings": {
"require_password_change": False,
"failed_login_attempts": 0,
"login_attempts": []
},
"timezone": "UTC",
"language": "en",
"created_by": "system",
"created_at": datetime.utcnow()
}
]
for user in initial_users:
existing = await users_collection.find_one({"email": user["email"]})
if not existing:
await users_collection.insert_one(user)
logger.info(f" βœ“ Created user: {user['email']}")
else:
# Update existing user if missing required fields
updates = {}
if "role_id" not in existing:
updates["role_id"] = user["role_id"]
if "status" not in existing:
updates["status"] = "active"
if "security_settings" not in existing:
updates["security_settings"] = user["security_settings"]
if updates:
await users_collection.update_one(
{"email": user["email"]},
{"$set": updates}
)
logger.info(f" ⚠ Updated existing user: {user['email']} (added missing fields)")
else:
logger.info(f" ⊳ User exists: {user['email']}")
logger.info("\nπŸ“ Default Credentials:")
logger.info(" superadmin@cuatrolabs.com / SuperAdmin@123")
logger.info(" admin@cuatrolabs.com / CompanyAdmin@123")