Spaces:
Sleeping
Sleeping
| import os | |
| import jwt | |
| from datetime import datetime, timedelta | |
| from functools import wraps | |
| from flask import request, jsonify | |
| from models.user import User | |
| # Secret key for JWT | |
| SECRET_KEY = os.environ.get('JWT_SECRET') | |
| def generate_token(user_id, permissions, expiration_hours=24*30): # 30-day token by default | |
| """Generate JWT token for authentication""" | |
| payload = { | |
| 'user_id': str(user_id), | |
| 'permissions': permissions, | |
| 'exp': datetime.utcnow() + timedelta(hours=expiration_hours) | |
| } | |
| return jwt.encode(payload, SECRET_KEY, algorithm='HS256') | |
| def decode_token(token): | |
| """Decode JWT token and return payload""" | |
| try: | |
| return jwt.decode(token, SECRET_KEY, algorithms=['HS256']) | |
| except: | |
| return None | |
| def token_required(f): | |
| """ | |
| Fixed decorator for routes that require a valid token. | |
| This version passes current_user as a kwarg instead of an arg, | |
| avoiding parameter conflicts with URL path parameters. | |
| """ | |
| def decorated(*args, **kwargs): | |
| token = None | |
| auth_header = request.headers.get('Authorization') | |
| if auth_header: | |
| if auth_header.startswith('Bearer '): | |
| token = auth_header.split(" ")[1] | |
| if not token: | |
| return jsonify({'message': 'Token is missing'}), 401 | |
| data = decode_token(token) | |
| if not data: | |
| return jsonify({'message': 'Token is invalid or expired'}), 401 | |
| current_user = User.find_by_id(data['user_id']) | |
| if not current_user: | |
| return jsonify({'message': 'User not found'}), 401 | |
| # Pass current_user as a keyword argument instead of positional argument | |
| kwargs['current_user'] = current_user | |
| return f(*args, **kwargs) | |
| return decorated | |
| def admin_required(f): | |
| """ | |
| Fixed decorator for routes that require admin permissions. | |
| This version passes current_user as a kwarg instead of an arg, | |
| avoiding parameter conflicts with URL path parameters. | |
| """ | |
| def decorated(*args, **kwargs): | |
| token = None | |
| auth_header = request.headers.get('Authorization') | |
| if auth_header: | |
| if auth_header.startswith('Bearer '): | |
| token = auth_header.split(" ")[1] | |
| if not token: | |
| return jsonify({'message': 'Token is missing'}), 401 | |
| data = decode_token(token) | |
| if not data: | |
| return jsonify({'message': 'Token is invalid or expired'}), 401 | |
| if data['permissions'] != 'Admin': | |
| return jsonify({'message': 'Admin permissions required'}), 403 | |
| current_user = User.find_by_id(data['user_id']) | |
| if not current_user: | |
| return jsonify({'message': 'User not found'}), 401 | |
| # Pass current_user as a keyword argument instead of positional argument | |
| kwargs['current_user'] = current_user | |
| return f(*args, **kwargs) | |
| return decorated |