mesa-react / backend /app /services /auth_service.py
Guilherme Silberfarb Costa
Split shared runtime helpers from Windows launcher
6127e00
from __future__ import annotations
import json
import os
import secrets
import unicodedata
from pathlib import Path
from threading import Lock
from typing import Any
from fastapi import HTTPException, Request
from app.runtime_paths import resolve_core_path
DEFAULT_USERS_FILE = resolve_core_path("auth", "usuarios.json")
_USERS_LOCK = Lock()
_SESSIONS_LOCK = Lock()
_USERS_STATE: dict[str, Any] = {
"signature": None,
"users": {},
}
_AUTH_SESSIONS: dict[str, dict[str, Any]] = {}
def _normalize_login(value: Any) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
return text.replace(" ", "_")
def _normalize_matricula(value: Any) -> str:
raw = str(value or "").strip()
if not raw:
return ""
only_digits = "".join(ch for ch in raw if ch.isdigit())
return only_digits or raw
def _users_signature() -> tuple[str, float | None]:
users_json = os.getenv("APP_USERS_JSON")
if users_json and users_json.strip():
return ("env", None)
users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve()
mtime = users_file.stat().st_mtime if users_file.exists() else None
return (str(users_file), mtime)
def _load_users_source() -> list[dict[str, Any]]:
users_json = os.getenv("APP_USERS_JSON")
if users_json and users_json.strip():
try:
payload = json.loads(users_json)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"APP_USERS_JSON invalido: {exc}") from exc
else:
users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve()
if not users_file.exists():
raise HTTPException(status_code=500, detail=f"Arquivo de usuarios nao encontrado: {users_file}")
try:
payload = json.loads(users_file.read_text(encoding="utf-8"))
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Falha ao ler usuarios: {exc}") from exc
usuarios = payload.get("usuarios") if isinstance(payload, dict) else None
if not isinstance(usuarios, list):
raise HTTPException(status_code=500, detail="Configuracao de usuarios invalida")
return [item for item in usuarios if isinstance(item, dict)]
def _load_users_index() -> dict[str, dict[str, Any]]:
signature = _users_signature()
with _USERS_LOCK:
if _USERS_STATE.get("signature") == signature and isinstance(_USERS_STATE.get("users"), dict):
return dict(_USERS_STATE["users"])
usuarios = _load_users_source()
indexed: dict[str, dict[str, Any]] = {}
for item in usuarios:
usuario = _normalize_login(item.get("usuario"))
matricula = _normalize_matricula(item.get("matricula"))
if not usuario or not matricula:
continue
indexed[usuario] = {
"usuario": usuario,
"matricula": matricula,
"nome": str(item.get("nome") or usuario).strip() or usuario,
"perfil": str(item.get("perfil") or "viewer").strip().lower() or "viewer",
}
_USERS_STATE["signature"] = signature
_USERS_STATE["users"] = indexed
return dict(indexed)
def public_user(user: dict[str, Any]) -> dict[str, Any]:
return {
"usuario": str(user.get("usuario") or ""),
"nome": str(user.get("nome") or ""),
"perfil": str(user.get("perfil") or "viewer"),
}
def authenticate_user(usuario: str, matricula: str) -> dict[str, Any]:
usuario_norm = _normalize_login(usuario)
matricula_norm = _normalize_matricula(matricula)
if not usuario_norm or not matricula_norm:
raise HTTPException(status_code=400, detail="Informe usuario e matricula")
indexed = _load_users_index()
user = indexed.get(usuario_norm)
if not user:
raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos")
if _normalize_matricula(user.get("matricula")) != matricula_norm:
raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos")
return dict(user)
def create_auth_session(user: dict[str, Any]) -> str:
token = secrets.token_urlsafe(32)
with _SESSIONS_LOCK:
_AUTH_SESSIONS[token] = {
"usuario": str(user.get("usuario") or ""),
"nome": str(user.get("nome") or ""),
"perfil": str(user.get("perfil") or "viewer"),
}
return token
def get_user_by_token(token: str | None) -> dict[str, Any] | None:
key = str(token or "").strip()
if not key:
return None
with _SESSIONS_LOCK:
user = _AUTH_SESSIONS.get(key)
return dict(user) if isinstance(user, dict) else None
def destroy_auth_session(token: str | None) -> None:
key = str(token or "").strip()
if not key:
return
with _SESSIONS_LOCK:
_AUTH_SESSIONS.pop(key, None)
def extract_token_from_request(request: Request) -> str:
return str(request.headers.get("X-Auth-Token") or "").strip()
def require_user(request: Request) -> dict[str, Any]:
user = getattr(request.state, "auth_user", None)
if not isinstance(user, dict):
raise HTTPException(status_code=401, detail="Login obrigatorio")
return dict(user)
def require_admin(request: Request) -> dict[str, Any]:
user = require_user(request)
perfil = str(user.get("perfil") or "").lower()
if perfil != "admin":
raise HTTPException(status_code=403, detail="Acesso restrito a administradores")
return user