from __future__ import annotations import json import os import secrets import unicodedata from pathlib import Path from threading import Lock from typing import Any from fastapi import HTTPException, Request from app.runtime_paths import resolve_core_path DEFAULT_USERS_FILE = resolve_core_path("auth", "usuarios.json") _USERS_LOCK = Lock() _SESSIONS_LOCK = Lock() _USERS_STATE: dict[str, Any] = { "signature": None, "users": {}, } _AUTH_SESSIONS: dict[str, dict[str, Any]] = {} def _normalize_login(value: Any) -> str: text = str(value or "").strip().lower() if not text: return "" text = unicodedata.normalize("NFKD", text) text = "".join(ch for ch in text if not unicodedata.combining(ch)) return text.replace(" ", "_") def _normalize_matricula(value: Any) -> str: raw = str(value or "").strip() if not raw: return "" only_digits = "".join(ch for ch in raw if ch.isdigit()) return only_digits or raw def _users_signature() -> tuple[str, float | None]: users_json = os.getenv("APP_USERS_JSON") if users_json and users_json.strip(): return ("env", None) users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve() mtime = users_file.stat().st_mtime if users_file.exists() else None return (str(users_file), mtime) def _load_users_source() -> list[dict[str, Any]]: users_json = os.getenv("APP_USERS_JSON") if users_json and users_json.strip(): try: payload = json.loads(users_json) except Exception as exc: raise HTTPException(status_code=500, detail=f"APP_USERS_JSON invalido: {exc}") from exc else: users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve() if not users_file.exists(): raise HTTPException(status_code=500, detail=f"Arquivo de usuarios nao encontrado: {users_file}") try: payload = json.loads(users_file.read_text(encoding="utf-8")) except Exception as exc: raise HTTPException(status_code=500, detail=f"Falha ao ler usuarios: {exc}") from exc usuarios = payload.get("usuarios") if isinstance(payload, dict) else None if not isinstance(usuarios, list): raise HTTPException(status_code=500, detail="Configuracao de usuarios invalida") return [item for item in usuarios if isinstance(item, dict)] def _load_users_index() -> dict[str, dict[str, Any]]: signature = _users_signature() with _USERS_LOCK: if _USERS_STATE.get("signature") == signature and isinstance(_USERS_STATE.get("users"), dict): return dict(_USERS_STATE["users"]) usuarios = _load_users_source() indexed: dict[str, dict[str, Any]] = {} for item in usuarios: usuario = _normalize_login(item.get("usuario")) matricula = _normalize_matricula(item.get("matricula")) if not usuario or not matricula: continue indexed[usuario] = { "usuario": usuario, "matricula": matricula, "nome": str(item.get("nome") or usuario).strip() or usuario, "perfil": str(item.get("perfil") or "viewer").strip().lower() or "viewer", } _USERS_STATE["signature"] = signature _USERS_STATE["users"] = indexed return dict(indexed) def public_user(user: dict[str, Any]) -> dict[str, Any]: return { "usuario": str(user.get("usuario") or ""), "nome": str(user.get("nome") or ""), "perfil": str(user.get("perfil") or "viewer"), } def authenticate_user(usuario: str, matricula: str) -> dict[str, Any]: usuario_norm = _normalize_login(usuario) matricula_norm = _normalize_matricula(matricula) if not usuario_norm or not matricula_norm: raise HTTPException(status_code=400, detail="Informe usuario e matricula") indexed = _load_users_index() user = indexed.get(usuario_norm) if not user: raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos") if _normalize_matricula(user.get("matricula")) != matricula_norm: raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos") return dict(user) def create_auth_session(user: dict[str, Any]) -> str: token = secrets.token_urlsafe(32) with _SESSIONS_LOCK: _AUTH_SESSIONS[token] = { "usuario": str(user.get("usuario") or ""), "nome": str(user.get("nome") or ""), "perfil": str(user.get("perfil") or "viewer"), } return token def get_user_by_token(token: str | None) -> dict[str, Any] | None: key = str(token or "").strip() if not key: return None with _SESSIONS_LOCK: user = _AUTH_SESSIONS.get(key) return dict(user) if isinstance(user, dict) else None def destroy_auth_session(token: str | None) -> None: key = str(token or "").strip() if not key: return with _SESSIONS_LOCK: _AUTH_SESSIONS.pop(key, None) def extract_token_from_request(request: Request) -> str: return str(request.headers.get("X-Auth-Token") or "").strip() def require_user(request: Request) -> dict[str, Any]: user = getattr(request.state, "auth_user", None) if not isinstance(user, dict): raise HTTPException(status_code=401, detail="Login obrigatorio") return dict(user) def require_admin(request: Request) -> dict[str, Any]: user = require_user(request) perfil = str(user.get("perfil") or "").lower() if perfil != "admin": raise HTTPException(status_code=403, detail="Acesso restrito a administradores") return user