File size: 3,357 Bytes
09daf0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
auth.py β€” JWT-based authentication helpers.

Provides:
  - generate_token(user_id, role)  β†’ signed JWT string
  - @require_auth                  β†’ validates JWT, injects g.current_user
  - @require_admin                 β†’ same as @require_auth + checks admin role
"""

import os
import jwt
import logging
from functools import wraps
from datetime import datetime, timedelta, timezone

from flask import request, jsonify, g
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)

JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me")
JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24"))


# ─── Token Generation ─────────────────────────────────────────────────────────

def generate_token(user_id: str, role: str) -> str:
    """Create a signed JWT valid for JWT_EXPIRY_HOURS hours."""
    payload = {
        "sub": str(user_id),
        "role": role,
        "iat": datetime.now(timezone.utc),
        "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS),
    }
    return jwt.encode(payload, JWT_SECRET, algorithm="HS256")


def decode_token(token: str) -> dict:
    """Decode and verify a JWT. Raises jwt.exceptions on failure."""
    return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])


# ─── Decorators ───────────────────────────────────────────────────────────────

def require_auth(f):
    """Decorator: validates Bearer JWT and populates g.current_user."""
    @wraps(f)
    def decorated(*args, **kwargs):
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            return jsonify({"error": "Authorization header missing or malformed."}), 401

        token = auth_header.split(" ", 1)[1]
        try:
            payload = decode_token(token)
            
            # Real-time suspension check
            from utils.db import users_col
            from bson import ObjectId as ObjId
            col = users_col()
            try:
                user = col.find_one({"_id": ObjId(payload["sub"])})
            except Exception:
                user = col.find_one({"_id": payload["sub"]})
                
            if not user or not user.get("is_active", True):
                return jsonify({"error": "Your account has been suspended by an administrator."}), 403
                
            g.current_user = {
                "id": payload["sub"],
                "role": payload["role"],
            }
        except jwt.ExpiredSignatureError:
            return jsonify({"error": "Token expired. Please log in again."}), 401
        except jwt.InvalidTokenError as exc:
            return jsonify({"error": f"Invalid token: {exc}"}), 401

        return f(*args, **kwargs)
    return decorated


def require_admin(f):
    """Decorator: validates JWT AND checks for admin role."""
    @wraps(f)
    @require_auth
    def decorated(*args, **kwargs):
        if g.current_user.get("role") != "admin":
            return jsonify({"error": "Admin access required."}), 403
        return f(*args, **kwargs)
    return decorated