Spaces:
Running
Running
File size: 6,525 Bytes
4f48a4e 276c3e2 4f48a4e 276c3e2 4f48a4e 276c3e2 4f48a4e 276c3e2 4f48a4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
auth.py β JWT-based authentication blueprint
Routes: /register /login /refresh /logout
"""
from datetime import datetime, timezone, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, make_response, jsonify
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt,
set_access_cookies, set_refresh_cookies,
unset_jwt_cookies
)
from flask_bcrypt import Bcrypt
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from project.database import (
create_user, find_user_by_email, find_user_by_id,
add_token_to_blocklist
)
from project.config import BCRYPT_PEPPER
auth = Blueprint('auth', __name__)
bcrypt = Bcrypt()
limiter = Limiter(key_func=get_remote_address)
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _pepper(password: str) -> str:
"""Append server-side pepper before hashing. Means a leaked DB alone
cannot crack passwords without also knowing this secret."""
return password + BCRYPT_PEPPER
# ββ Register βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@auth.route('/register', methods=['GET', 'POST'])
@limiter.limit("3 per minute")
def register():
if request.method == 'POST':
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip().lower()
password = request.form.get('password', '')
confirm = request.form.get('confirm_password', '')
if not username or not email or not password:
flash('All fields are required.', 'error')
return render_template('register.html')
if password != confirm:
flash('Passwords do not match.', 'error')
return render_template('register.html')
if len(password) < 6:
flash('Password must be at least 6 characters.', 'error')
return render_template('register.html')
if find_user_by_email(email):
flash('An account with that email already exists.', 'error')
return render_template('register.html')
pw_hash = bcrypt.generate_password_hash(_pepper(password)).decode('utf-8')
# First user is automatically an admin if none exist (Convenience/Bootstrap)
from project.database import get_db
is_admin = get_db().users.count_documents({}) == 0
uid = create_user(username, email, pw_hash, is_admin=is_admin)
if uid:
return jsonify({"msg": "Account created! Please log in."}), 201
return jsonify({"detail": "Registration failed. Please try again."}), 400
return render_template('register.html')
# ββ Login ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@auth.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute")
def login():
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
password = request.form.get('password', '')
user_doc = find_user_by_email(email)
if user_doc and bcrypt.check_password_hash(
user_doc['password_hash'], _pepper(password)):
user_id = str(user_doc['_id'])
username = user_doc['username']
# Issue tokens β identity is the user_id string
access_token = create_access_token(
identity=user_id,
additional_claims={
"username": username,
"is_admin": user_doc.get("is_admin", False)
}
)
refresh_token = create_refresh_token(identity=user_id)
# Store in HttpOnly cookies β JS cannot read these
resp = make_response(jsonify({"msg": "Login successful", "username": username, "is_admin": user_doc.get("is_admin", False)}))
set_access_cookies(resp, access_token)
set_refresh_cookies(resp, refresh_token)
return resp
# Return JSON error for the Next.js frontend
return jsonify({"detail": "Invalid email or password."}), 401
return render_template('login.html')
# ββ Token Refresh ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@auth.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Issue a new short-lived access token using the long-lived refresh token."""
user_id = get_jwt_identity()
user_doc = find_user_by_id(user_id)
username = user_doc['username'] if user_doc else ''
access_token = create_access_token(
identity=user_id,
additional_claims={
"username": username,
"is_admin": user_doc.get("is_admin", False)
}
)
resp = make_response(redirect(request.referrer or url_for('index')))
set_access_cookies(resp, access_token)
return resp
# ββ Logout βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@auth.route('/logout', methods=['GET', 'POST'])
@jwt_required(optional=True)
def logout():
"""Revoke both tokens and clear cookies."""
jwt_data = get_jwt()
if jwt_data:
jti = jwt_data.get('jti')
exp = datetime.fromtimestamp(jwt_data.get('exp', 0), tz=timezone.utc)
add_token_to_blocklist(jti, exp)
resp = make_response(redirect(url_for('auth.login')))
unset_jwt_cookies(resp)
return resp
# ββ Rate-limit error handler βββββββββββββββββββββββββββββββββββββββββββββββββββ
@auth.errorhandler(429)
def rate_limit_hit(e):
flash('Too many attempts. Please wait a moment and try again.', 'error')
return render_template('login.html'), 429
|