Spaces:
Sleeping
Sleeping
| """ | |
| auth.py β JWT-based authentication helpers. | |
| Provides: | |
| - generate_token(user_id, role) β signed JWT string | |
| - @require_auth β validates JWT, injects g.current_user | |
| - @require_role(roles_list) β validates role-based access | |
| """ | |
| import os | |
| import jwt | |
| import logging | |
| from functools import wraps | |
| from datetime import datetime, timedelta, timezone | |
| from flask import request, jsonify, g | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| JWT_SECRET = os.getenv("JWT_SECRET", "default-insecure-secret-change-me") | |
| JWT_EXPIRY_HOURS = int(os.getenv("JWT_EXPIRY_HOURS", "24")) | |
| # βββ Token Generation βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_token(user_id: str, role: str) -> str: | |
| """Create a signed JWT valid for JWT_EXPIRY_HOURS hours.""" | |
| payload = { | |
| "sub": str(user_id), | |
| "role": role, | |
| "iat": datetime.now(timezone.utc), | |
| "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRY_HOURS), | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") | |
| def decode_token(token: str) -> dict: | |
| """Decode and verify a JWT. Raises jwt.exceptions on failure.""" | |
| return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) | |
| # βββ Decorators βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def require_auth(f): | |
| """Decorator: validates Bearer JWT (or query token) and populates g.current_user.""" | |
| def decorated(*args, **kwargs): | |
| # Support both 'Authorization: Bearer <token>' AND '?token=<token>' query param | |
| token = None | |
| auth_header = request.headers.get("Authorization", "") | |
| if auth_header.startswith("Bearer "): | |
| token = auth_header.split(" ", 1)[1] | |
| else: | |
| token = request.args.get("token") | |
| if not token: | |
| return jsonify({"error": "Authentication token missing."}), 401 | |
| try: | |
| payload = decode_token(token) | |
| # Real-time suspension check | |
| from utils.db import users_col | |
| from bson import ObjectId as ObjId | |
| col = users_col() | |
| try: | |
| user = col.find_one({"_id": ObjId(payload["sub"])}) | |
| except Exception: | |
| user = col.find_one({"_id": payload["sub"]}) | |
| if not user or not user.get("is_active", True): | |
| return jsonify({"error": "Your account has been suspended by an administrator."}), 403 | |
| g.current_user = { | |
| "id": payload["sub"], | |
| "role": payload["role"], | |
| "email": user.get("email"), | |
| "name": user.get("name"), | |
| } | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({"error": "Token expired. Please log in again."}), 401 | |
| except jwt.InvalidTokenError as exc: | |
| return jsonify({"error": f"Invalid token: {exc}"}), 401 | |
| return f(*args, **kwargs) | |
| return decorated | |
| def require_role(allowed_roles): | |
| """Decorator: checks if user has one of the allowed roles.""" | |
| def decorator(f): | |
| def decorated(*args, **kwargs): | |
| if g.current_user.get("role") not in allowed_roles: | |
| return jsonify({ | |
| "error": f"Access denied. Required: {', '.join(allowed_roles)}" | |
| }), 403 | |
| return f(*args, **kwargs) | |
| return decorated | |
| return decorator | |
| def require_admin(f): | |
| """Legacy helper: allows 'admin'.""" | |
| return require_role(["admin"])(f) |