| """ |
| auth.py β JWT-based authentication helpers. |
| |
| Provides: |
| - generate_token(user_id, role) β signed JWT string |
| - @require_auth β validates JWT, injects g.current_user |
| - @require_admin β same as @require_auth + checks admin role |
| """ |
|
|
| import os |
| import jwt |
| import logging |
| from functools import wraps |
| from datetime import datetime, timedelta, timezone |
|
|
| from flask import request, jsonify, g |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| logger = logging.getLogger(__name__) |
|
|
| JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me") |
| JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24")) |
|
|
|
|
| |
|
|
| def generate_token(user_id: str, role: str) -> str: |
| """Create a signed JWT valid for JWT_EXPIRY_HOURS hours.""" |
| payload = { |
| "sub": str(user_id), |
| "role": role, |
| "iat": datetime.now(timezone.utc), |
| "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS), |
| } |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") |
|
|
|
|
| def decode_token(token: str) -> dict: |
| """Decode and verify a JWT. Raises jwt.exceptions on failure.""" |
| return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) |
|
|
|
|
| |
|
|
| def require_auth(f): |
| """Decorator: validates Bearer JWT and populates g.current_user.""" |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| auth_header = request.headers.get("Authorization", "") |
| if not auth_header.startswith("Bearer "): |
| return jsonify({"error": "Authorization header missing or malformed."}), 401 |
|
|
| token = auth_header.split(" ", 1)[1] |
| try: |
| payload = decode_token(token) |
| |
| |
| from utils.db import users_col |
| from bson import ObjectId as ObjId |
| col = users_col() |
| try: |
| user = col.find_one({"_id": ObjId(payload["sub"])}) |
| except Exception: |
| user = col.find_one({"_id": payload["sub"]}) |
| |
| if not user or not user.get("is_active", True): |
| return jsonify({"error": "Your account has been suspended by an administrator."}), 403 |
| |
| g.current_user = { |
| "id": payload["sub"], |
| "role": payload["role"], |
| } |
| except jwt.ExpiredSignatureError: |
| return jsonify({"error": "Token expired. Please log in again."}), 401 |
| except jwt.InvalidTokenError as exc: |
| return jsonify({"error": f"Invalid token: {exc}"}), 401 |
|
|
| return f(*args, **kwargs) |
| return decorated |
|
|
|
|
| def require_admin(f): |
| """Decorator: validates JWT AND checks for admin role.""" |
| @wraps(f) |
| @require_auth |
| def decorated(*args, **kwargs): |
| if g.current_user.get("role") != "admin": |
| return jsonify({"error": "Admin access required."}), 403 |
| return f(*args, **kwargs) |
| return decorated |
|
|