File size: 3,357 Bytes
09daf0b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | """
auth.py β JWT-based authentication helpers.
Provides:
- generate_token(user_id, role) β signed JWT string
- @require_auth β validates JWT, injects g.current_user
- @require_admin β same as @require_auth + checks admin role
"""
import os
import jwt
import logging
from functools import wraps
from datetime import datetime, timedelta, timezone
from flask import request, jsonify, g
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me")
JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24"))
# βββ Token Generation βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def generate_token(user_id: str, role: str) -> str:
"""Create a signed JWT valid for JWT_EXPIRY_HOURS hours."""
payload = {
"sub": str(user_id),
"role": role,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS),
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def decode_token(token: str) -> dict:
"""Decode and verify a JWT. Raises jwt.exceptions on failure."""
return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
# βββ Decorators βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def require_auth(f):
"""Decorator: validates Bearer JWT and populates g.current_user."""
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return jsonify({"error": "Authorization header missing or malformed."}), 401
token = auth_header.split(" ", 1)[1]
try:
payload = decode_token(token)
# Real-time suspension check
from utils.db import users_col
from bson import ObjectId as ObjId
col = users_col()
try:
user = col.find_one({"_id": ObjId(payload["sub"])})
except Exception:
user = col.find_one({"_id": payload["sub"]})
if not user or not user.get("is_active", True):
return jsonify({"error": "Your account has been suspended by an administrator."}), 403
g.current_user = {
"id": payload["sub"],
"role": payload["role"],
}
except jwt.ExpiredSignatureError:
return jsonify({"error": "Token expired. Please log in again."}), 401
except jwt.InvalidTokenError as exc:
return jsonify({"error": f"Invalid token: {exc}"}), 401
return f(*args, **kwargs)
return decorated
def require_admin(f):
"""Decorator: validates JWT AND checks for admin role."""
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
if g.current_user.get("role") != "admin":
return jsonify({"error": "Admin access required."}), 403
return f(*args, **kwargs)
return decorated
|