roshcheeku's picture
Create auth.py
9b0abff verified
# utils/auth.py
import os, jwt
from functools import wraps
from flask import request, jsonify, g
from bson.objectid import ObjectId
from dotenv import load_dotenv
from db import db
from datetime import datetime, timedelta
load_dotenv()
JWT_SECRET = os.getenv("JWT_SECRET", "supersecret")
JWT_ALGO = "HS256"
def create_token(user_id, role, hours=6):
payload = {
"id": str(user_id),
"role": role,
"exp": datetime.utcnow() + timedelta(hours=hours)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
auth = request.headers.get("Authorization", "")
token = None
if auth.startswith("Bearer "):
token = auth.split(" ")[1]
if not token:
return jsonify({"error": "Token missing"}), 401
try:
data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
user = db.users.find_one({"_id": ObjectId(data["id"])})
if not user:
return jsonify({"error": "User not found"}), 401
if user.get("blocked", False):
return jsonify({"error": "Account blocked"}), 403
# attach sanitized user
user["_id"] = str(user["_id"])
g.current_user = user
except jwt.ExpiredSignatureError:
return jsonify({"error": "Token expired"}), 401
except Exception as e:
return jsonify({"error": "Invalid token", "detail": str(e)}), 401
return f(*args, **kwargs)
return wrapper
def admin_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not getattr(g, "current_user", None):
return jsonify({"error": "Auth required"}), 401
if g.current_user.get("role") != "admin":
return jsonify({"error": "Admin role required"}), 403
return f(*args, **kwargs)
return wrapper