Spaces:
Sleeping
Sleeping
| # utils/auth.py | |
| import os, jwt | |
| from functools import wraps | |
| from flask import request, jsonify, g | |
| from bson.objectid import ObjectId | |
| from dotenv import load_dotenv | |
| from db import db | |
| from datetime import datetime, timedelta | |
| load_dotenv() | |
| JWT_SECRET = os.getenv("JWT_SECRET", "supersecret") | |
| JWT_ALGO = "HS256" | |
| def create_token(user_id, role, hours=6): | |
| payload = { | |
| "id": str(user_id), | |
| "role": role, | |
| "exp": datetime.utcnow() + timedelta(hours=hours) | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO) | |
| def token_required(f): | |
| def wrapper(*args, **kwargs): | |
| auth = request.headers.get("Authorization", "") | |
| token = None | |
| if auth.startswith("Bearer "): | |
| token = auth.split(" ")[1] | |
| if not token: | |
| return jsonify({"error": "Token missing"}), 401 | |
| try: | |
| data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO]) | |
| user = db.users.find_one({"_id": ObjectId(data["id"])}) | |
| if not user: | |
| return jsonify({"error": "User not found"}), 401 | |
| if user.get("blocked", False): | |
| return jsonify({"error": "Account blocked"}), 403 | |
| # attach sanitized user | |
| user["_id"] = str(user["_id"]) | |
| g.current_user = user | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({"error": "Token expired"}), 401 | |
| except Exception as e: | |
| return jsonify({"error": "Invalid token", "detail": str(e)}), 401 | |
| return f(*args, **kwargs) | |
| return wrapper | |
| def admin_required(f): | |
| def wrapper(*args, **kwargs): | |
| if not getattr(g, "current_user", None): | |
| return jsonify({"error": "Auth required"}), 401 | |
| if g.current_user.get("role") != "admin": | |
| return jsonify({"error": "Admin role required"}), 403 | |
| return f(*args, **kwargs) | |
| return wrapper | |