| """First-boot setup service. |
| |
| Detects whether the platform has been initialized (any admin exists), |
| creates the founder admin account, and persists a randomly-generated |
| JWT secret in `system_config` so it survives restarts independently of |
| the .env file. |
| """ |
|
|
| import asyncio |
| import logging |
| import secrets |
| from typing import Optional |
|
|
| from sqlalchemy import select, func |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from mac.config import settings |
| from mac.models.user import User |
| from mac.models.system_config import SystemConfig |
| from mac.utils.security import hash_password, create_access_token |
|
|
| log = logging.getLogger(__name__) |
|
|
| JWT_SECRET_KEY = "jwt_secret" |
|
|
|
|
| async def is_first_run(db: AsyncSession) -> bool: |
| """True iff there are no admin users yet.""" |
| result = await db.execute( |
| select(func.count()).select_from(User).where(User.role == "admin") |
| ) |
| return (result.scalar() or 0) == 0 |
|
|
|
|
| async def get_or_generate_jwt_secret(db: AsyncSession) -> str: |
| """Read the JWT secret from system_config; generate + persist if missing. |
| Returns the secret string. Once written, settings.jwt_secret_key is also |
| updated in-process so utils/security.py picks it up immediately. |
| """ |
| result = await db.execute(select(SystemConfig).where(SystemConfig.key == JWT_SECRET_KEY)) |
| row = result.scalar_one_or_none() |
| if row and row.value: |
| settings.jwt_secret_key = row.value |
| return row.value |
| new_secret = secrets.token_urlsafe(64) |
| db.add(SystemConfig(key=JWT_SECRET_KEY, value=new_secret)) |
| await db.flush() |
| settings.jwt_secret_key = new_secret |
| return new_secret |
|
|
|
|
| async def has_jwt_secret(db: AsyncSession) -> bool: |
| result = await db.execute(select(SystemConfig).where(SystemConfig.key == JWT_SECRET_KEY)) |
| row = result.scalar_one_or_none() |
| return bool(row and row.value) |
|
|
|
|
| async def create_founder_admin( |
| db: AsyncSession, |
| name: str, |
| email: str, |
| password: str, |
| ) -> tuple[Optional[User], Optional[str], Optional[str]]: |
| """Create the single founder admin. Returns (user, access_token, error). |
| Refuses if an admin already exists.""" |
| if not await is_first_run(db): |
| return (None, None, "An admin already exists. Setup is closed.") |
|
|
| |
| |
| result = await db.execute(select(User).where(User.roll_number == email)) |
| if result.scalar_one_or_none(): |
| return (None, None, "A user with this email already exists.") |
|
|
| await get_or_generate_jwt_secret(db) |
|
|
| pwd_hash = await asyncio.to_thread(hash_password, password) |
| user = User( |
| roll_number=email, |
| name=name, |
| email=email, |
| department="ADMIN", |
| role="admin", |
| password_hash=pwd_hash, |
| must_change_password=False, |
| is_active=True, |
| is_founder=True, |
| can_create_users=True, |
| ) |
| db.add(user) |
| await db.flush() |
| token = create_access_token({"sub": user.id, "role": user.role}) |
| return (user, token, None) |
|
|