Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import secrets | |
| import unicodedata | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Any | |
| from fastapi import HTTPException, Request | |
| from app.runtime_paths import resolve_core_path | |
| DEFAULT_USERS_FILE = resolve_core_path("auth", "usuarios.json") | |
| _USERS_LOCK = Lock() | |
| _SESSIONS_LOCK = Lock() | |
| _USERS_STATE: dict[str, Any] = { | |
| "signature": None, | |
| "users": {}, | |
| } | |
| _AUTH_SESSIONS: dict[str, dict[str, Any]] = {} | |
| def _normalize_login(value: Any) -> str: | |
| text = str(value or "").strip().lower() | |
| if not text: | |
| return "" | |
| text = unicodedata.normalize("NFKD", text) | |
| text = "".join(ch for ch in text if not unicodedata.combining(ch)) | |
| return text.replace(" ", "_") | |
| def _normalize_matricula(value: Any) -> str: | |
| raw = str(value or "").strip() | |
| if not raw: | |
| return "" | |
| only_digits = "".join(ch for ch in raw if ch.isdigit()) | |
| return only_digits or raw | |
| def _users_signature() -> tuple[str, float | None]: | |
| users_json = os.getenv("APP_USERS_JSON") | |
| if users_json and users_json.strip(): | |
| return ("env", None) | |
| users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve() | |
| mtime = users_file.stat().st_mtime if users_file.exists() else None | |
| return (str(users_file), mtime) | |
| def _load_users_source() -> list[dict[str, Any]]: | |
| users_json = os.getenv("APP_USERS_JSON") | |
| if users_json and users_json.strip(): | |
| try: | |
| payload = json.loads(users_json) | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=f"APP_USERS_JSON invalido: {exc}") from exc | |
| else: | |
| users_file = Path(str(os.getenv("APP_USERS_FILE") or DEFAULT_USERS_FILE)).expanduser().resolve() | |
| if not users_file.exists(): | |
| raise HTTPException(status_code=500, detail=f"Arquivo de usuarios nao encontrado: {users_file}") | |
| try: | |
| payload = json.loads(users_file.read_text(encoding="utf-8")) | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=f"Falha ao ler usuarios: {exc}") from exc | |
| usuarios = payload.get("usuarios") if isinstance(payload, dict) else None | |
| if not isinstance(usuarios, list): | |
| raise HTTPException(status_code=500, detail="Configuracao de usuarios invalida") | |
| return [item for item in usuarios if isinstance(item, dict)] | |
| def _load_users_index() -> dict[str, dict[str, Any]]: | |
| signature = _users_signature() | |
| with _USERS_LOCK: | |
| if _USERS_STATE.get("signature") == signature and isinstance(_USERS_STATE.get("users"), dict): | |
| return dict(_USERS_STATE["users"]) | |
| usuarios = _load_users_source() | |
| indexed: dict[str, dict[str, Any]] = {} | |
| for item in usuarios: | |
| usuario = _normalize_login(item.get("usuario")) | |
| matricula = _normalize_matricula(item.get("matricula")) | |
| if not usuario or not matricula: | |
| continue | |
| indexed[usuario] = { | |
| "usuario": usuario, | |
| "matricula": matricula, | |
| "nome": str(item.get("nome") or usuario).strip() or usuario, | |
| "perfil": str(item.get("perfil") or "viewer").strip().lower() or "viewer", | |
| } | |
| _USERS_STATE["signature"] = signature | |
| _USERS_STATE["users"] = indexed | |
| return dict(indexed) | |
| def public_user(user: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "usuario": str(user.get("usuario") or ""), | |
| "nome": str(user.get("nome") or ""), | |
| "perfil": str(user.get("perfil") or "viewer"), | |
| } | |
| def authenticate_user(usuario: str, matricula: str) -> dict[str, Any]: | |
| usuario_norm = _normalize_login(usuario) | |
| matricula_norm = _normalize_matricula(matricula) | |
| if not usuario_norm or not matricula_norm: | |
| raise HTTPException(status_code=400, detail="Informe usuario e matricula") | |
| indexed = _load_users_index() | |
| user = indexed.get(usuario_norm) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos") | |
| if _normalize_matricula(user.get("matricula")) != matricula_norm: | |
| raise HTTPException(status_code=401, detail="Usuario ou matricula invalidos") | |
| return dict(user) | |
| def create_auth_session(user: dict[str, Any]) -> str: | |
| token = secrets.token_urlsafe(32) | |
| with _SESSIONS_LOCK: | |
| _AUTH_SESSIONS[token] = { | |
| "usuario": str(user.get("usuario") or ""), | |
| "nome": str(user.get("nome") or ""), | |
| "perfil": str(user.get("perfil") or "viewer"), | |
| } | |
| return token | |
| def get_user_by_token(token: str | None) -> dict[str, Any] | None: | |
| key = str(token or "").strip() | |
| if not key: | |
| return None | |
| with _SESSIONS_LOCK: | |
| user = _AUTH_SESSIONS.get(key) | |
| return dict(user) if isinstance(user, dict) else None | |
| def destroy_auth_session(token: str | None) -> None: | |
| key = str(token or "").strip() | |
| if not key: | |
| return | |
| with _SESSIONS_LOCK: | |
| _AUTH_SESSIONS.pop(key, None) | |
| def extract_token_from_request(request: Request) -> str: | |
| return str(request.headers.get("X-Auth-Token") or "").strip() | |
| def require_user(request: Request) -> dict[str, Any]: | |
| user = getattr(request.state, "auth_user", None) | |
| if not isinstance(user, dict): | |
| raise HTTPException(status_code=401, detail="Login obrigatorio") | |
| return dict(user) | |
| def require_admin(request: Request) -> dict[str, Any]: | |
| user = require_user(request) | |
| perfil = str(user.get("perfil") or "").lower() | |
| if perfil != "admin": | |
| raise HTTPException(status_code=403, detail="Acesso restrito a administradores") | |
| return user | |