MAC / mac /services /setup_service.py
Aaryan17's picture
chore: upload MAC codebase to HF Space
0e76632 verified
"""First-boot setup service.
Detects whether the platform has been initialized (any admin exists),
creates the founder admin account, and persists a randomly-generated
JWT secret in `system_config` so it survives restarts independently of
the .env file.
"""
import asyncio
import logging
import secrets
from typing import Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from mac.config import settings
from mac.models.user import User
from mac.models.system_config import SystemConfig
from mac.utils.security import hash_password, create_access_token
log = logging.getLogger(__name__)
JWT_SECRET_KEY = "jwt_secret"
async def is_first_run(db: AsyncSession) -> bool:
"""True iff there are no admin users yet."""
result = await db.execute(
select(func.count()).select_from(User).where(User.role == "admin")
)
return (result.scalar() or 0) == 0
async def get_or_generate_jwt_secret(db: AsyncSession) -> str:
"""Read the JWT secret from system_config; generate + persist if missing.
Returns the secret string. Once written, settings.jwt_secret_key is also
updated in-process so utils/security.py picks it up immediately.
"""
result = await db.execute(select(SystemConfig).where(SystemConfig.key == JWT_SECRET_KEY))
row = result.scalar_one_or_none()
if row and row.value:
settings.jwt_secret_key = row.value
return row.value
new_secret = secrets.token_urlsafe(64)
db.add(SystemConfig(key=JWT_SECRET_KEY, value=new_secret))
await db.flush()
settings.jwt_secret_key = new_secret
return new_secret
async def has_jwt_secret(db: AsyncSession) -> bool:
result = await db.execute(select(SystemConfig).where(SystemConfig.key == JWT_SECRET_KEY))
row = result.scalar_one_or_none()
return bool(row and row.value)
async def create_founder_admin(
db: AsyncSession,
name: str,
email: str,
password: str,
) -> tuple[Optional[User], Optional[str], Optional[str]]:
"""Create the single founder admin. Returns (user, access_token, error).
Refuses if an admin already exists."""
if not await is_first_run(db):
return (None, None, "An admin already exists. Setup is closed.")
# Idempotency: if a user with this email/roll already exists, refuse — admin
# would already block setup, but a non-admin user with the same email is bad.
result = await db.execute(select(User).where(User.roll_number == email))
if result.scalar_one_or_none():
return (None, None, "A user with this email already exists.")
await get_or_generate_jwt_secret(db)
pwd_hash = await asyncio.to_thread(hash_password, password)
user = User(
roll_number=email,
name=name,
email=email,
department="ADMIN",
role="admin",
password_hash=pwd_hash,
must_change_password=False,
is_active=True,
is_founder=True,
can_create_users=True,
)
db.add(user)
await db.flush()
token = create_access_token({"sub": user.id, "role": user.role})
return (user, token, None)