""" auth.py — JWT-based authentication helpers. Provides: - generate_token(user_id, role) → signed JWT string - @require_auth → validates JWT, injects g.current_user - @require_admin → same as @require_auth + checks admin role """ import os import jwt import logging from functools import wraps from datetime import datetime, timedelta, timezone from flask import request, jsonify, g from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me") JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24")) # ─── Token Generation ───────────────────────────────────────────────────────── def generate_token(user_id: str, role: str) -> str: """Create a signed JWT valid for JWT_EXPIRY_HOURS hours.""" payload = { "sub": str(user_id), "role": role, "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS), } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def decode_token(token: str) -> dict: """Decode and verify a JWT. Raises jwt.exceptions on failure.""" return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) # ─── Decorators ─────────────────────────────────────────────────────────────── def require_auth(f): """Decorator: validates Bearer JWT and populates g.current_user.""" @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return jsonify({"error": "Authorization header missing or malformed."}), 401 token = auth_header.split(" ", 1)[1] try: payload = decode_token(token) # Real-time suspension check from utils.db import users_col from bson import ObjectId as ObjId col = users_col() try: user = col.find_one({"_id": ObjId(payload["sub"])}) except Exception: user = col.find_one({"_id": payload["sub"]}) if not user or not user.get("is_active", True): return jsonify({"error": "Your account has been suspended by an administrator."}), 403 g.current_user = { "id": payload["sub"], "role": payload["role"], } except jwt.ExpiredSignatureError: return jsonify({"error": "Token expired. Please log in again."}), 401 except jwt.InvalidTokenError as exc: return jsonify({"error": f"Invalid token: {exc}"}), 401 return f(*args, **kwargs) return decorated def require_admin(f): """Decorator: validates JWT AND checks for admin role.""" @wraps(f) @require_auth def decorated(*args, **kwargs): if g.current_user.get("role") != "admin": return jsonify({"error": "Admin access required."}), 403 return f(*args, **kwargs) return decorated