File size: 3,891 Bytes
0a5d897
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
auth.py β€” JWT-based authentication helpers.

Provides:
  - generate_token(user_id, role)  β†’ signed JWT string
  - @require_auth                  β†’ validates JWT, injects g.current_user
  - @require_role(roles_list)      β†’ validates role-based access
"""

import os
import jwt
import logging
from functools import wraps
from datetime import datetime, timedelta, timezone

from flask import request, jsonify, g
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)

JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me")
JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24"))


# ─── Token Generation ─────────────────────────────────────────────────────────

def generate_token(user_id: str, role: str) -> str:
    """Create a signed JWT valid for JWT_EXPIRY_HOURS hours."""
    payload = {
        "sub": str(user_id),
        "role": role,
        "iat": datetime.now(timezone.utc),
        "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS),
    }
    return jwt.encode(payload, JWT_SECRET, algorithm="HS256")


def decode_token(token: str) -> dict:
    """Decode and verify a JWT. Raises jwt.exceptions on failure."""
    return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])


# ─── Decorators ───────────────────────────────────────────────────────────────

def require_auth(f):
    """Decorator: validates Bearer JWT (or query token) and populates g.current_user."""
    @wraps(f)
    def decorated(*args, **kwargs):
        # Support both 'Authorization: Bearer <token>' AND '?token=<token>' query param
        token = None
        auth_header = request.headers.get("Authorization", "")
        if auth_header.startswith("Bearer "):
            token = auth_header.split(" ", 1)[1]
        else:
            token = request.args.get("token")

        if not token:
            return jsonify({"error": "Authentication token missing."}), 401

        try:
            payload = decode_token(token)
            
            # Real-time suspension check
            from utils.db import users_col
            from bson import ObjectId as ObjId
            col = users_col()
            try:
                user = col.find_one({"_id": ObjId(payload["sub"])})
            except Exception:
                user = col.find_one({"_id": payload["sub"]})
                
            if not user or not user.get("is_active", True):
                return jsonify({"error": "Your account has been suspended by an administrator."}), 403
                
            g.current_user = {
                "id": payload["sub"],
                "role": payload["role"],
                "email": user.get("email"),
                "name": user.get("name"),
            }
        except jwt.ExpiredSignatureError:
            return jsonify({"error": "Token expired. Please log in again."}), 401
        except jwt.InvalidTokenError as exc:
            return jsonify({"error": f"Invalid token: {exc}"}), 401

        return f(*args, **kwargs)
    return decorated


def require_role(allowed_roles):
    """Decorator: checks if user has one of the allowed roles."""
    def decorator(f):
        @wraps(f)
        @require_auth
        def decorated(*args, **kwargs):
            if g.current_user.get("role") not in allowed_roles:
                return jsonify({
                    "error": f"Access denied. Required: {', '.join(allowed_roles)}"
                }), 403
            return f(*args, **kwargs)
        return decorated
    return decorator


def require_admin(f):
    """Legacy helper: allows 'admin'."""
    return require_role(["admin"])(f)