SQuAD / auth.py
tnp554's picture
feat: deploy SQuAD backend with all AI models
09daf0b
"""
auth.py β€” JWT-based authentication helpers.
Provides:
- generate_token(user_id, role) β†’ signed JWT string
- @require_auth β†’ validates JWT, injects g.current_user
- @require_admin β†’ same as @require_auth + checks admin role
"""
import os
import jwt
import logging
from functools import wraps
from datetime import datetime, timedelta, timezone
from flask import request, jsonify, g
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me")
JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24"))
# ─── Token Generation ─────────────────────────────────────────────────────────
def generate_token(user_id: str, role: str) -> str:
"""Create a signed JWT valid for JWT_EXPIRY_HOURS hours."""
payload = {
"sub": str(user_id),
"role": role,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS),
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def decode_token(token: str) -> dict:
"""Decode and verify a JWT. Raises jwt.exceptions on failure."""
return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
# ─── Decorators ───────────────────────────────────────────────────────────────
def require_auth(f):
"""Decorator: validates Bearer JWT and populates g.current_user."""
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return jsonify({"error": "Authorization header missing or malformed."}), 401
token = auth_header.split(" ", 1)[1]
try:
payload = decode_token(token)
# Real-time suspension check
from utils.db import users_col
from bson import ObjectId as ObjId
col = users_col()
try:
user = col.find_one({"_id": ObjId(payload["sub"])})
except Exception:
user = col.find_one({"_id": payload["sub"]})
if not user or not user.get("is_active", True):
return jsonify({"error": "Your account has been suspended by an administrator."}), 403
g.current_user = {
"id": payload["sub"],
"role": payload["role"],
}
except jwt.ExpiredSignatureError:
return jsonify({"error": "Token expired. Please log in again."}), 401
except jwt.InvalidTokenError as exc:
return jsonify({"error": f"Invalid token: {exc}"}), 401
return f(*args, **kwargs)
return decorated
def require_admin(f):
"""Decorator: validates JWT AND checks for admin role."""
@wraps(f)
@require_auth
def decorated(*args, **kwargs):
if g.current_user.get("role") != "admin":
return jsonify({"error": "Admin access required."}), 403
return f(*args, **kwargs)
return decorated