""" auth.py — JWT-based authentication helpers. Provides: - generate_token(user_id, role) → signed JWT string - @require_auth → validates JWT, injects g.current_user - @require_role(roles_list) → validates role-based access """ import os import jwt import logging from functools import wraps from datetime import datetime, timedelta, timezone from flask import request, jsonify, g from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me") JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24")) # ─── Token Generation ───────────────────────────────────────────────────────── def generate_token(user_id: str, role: str) -> str: """Create a signed JWT valid for JWT_EXPIRY_HOURS hours.""" payload = { "sub": str(user_id), "role": role, "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS), } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def decode_token(token: str) -> dict: """Decode and verify a JWT. Raises jwt.exceptions on failure.""" return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) # ─── Decorators ─────────────────────────────────────────────────────────────── def require_auth(f): """Decorator: validates Bearer JWT (or query token) and populates g.current_user.""" @wraps(f) def decorated(*args, **kwargs): # Support both 'Authorization: Bearer ' AND '?token=' query param token = None auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header.split(" ", 1)[1] else: token = request.args.get("token") if not token: return jsonify({"error": "Authentication token missing."}), 401 try: payload = decode_token(token) # Real-time suspension check from utils.db import users_col from bson import ObjectId as ObjId col = users_col() try: user = col.find_one({"_id": ObjId(payload["sub"])}) except Exception: user = col.find_one({"_id": payload["sub"]}) if not user or not user.get("is_active", True): return jsonify({"error": "Your account has been suspended by an administrator."}), 403 g.current_user = { "id": payload["sub"], "role": payload["role"], "email": user.get("email"), "name": user.get("name"), } except jwt.ExpiredSignatureError: return jsonify({"error": "Token expired. Please log in again."}), 401 except jwt.InvalidTokenError as exc: return jsonify({"error": f"Invalid token: {exc}"}), 401 return f(*args, **kwargs) return decorated def require_role(allowed_roles): """Decorator: checks if user has one of the allowed roles.""" def decorator(f): @wraps(f) @require_auth def decorated(*args, **kwargs): if g.current_user.get("role") not in allowed_roles: return jsonify({ "error": f"Access denied. Required: {', '.join(allowed_roles)}" }), 403 return f(*args, **kwargs) return decorated return decorator def require_admin(f): """Legacy helper: allows 'admin'.""" return require_role(["admin"])(f)