Spaces:
Running
Running
| """ | |
| auth.py β JWT-based authentication blueprint | |
| Routes: /register /login /refresh /logout | |
| """ | |
| from datetime import datetime, timezone, timedelta | |
| from flask import Blueprint, render_template, redirect, url_for, request, flash, make_response, jsonify | |
| from flask_jwt_extended import ( | |
| create_access_token, create_refresh_token, | |
| jwt_required, get_jwt_identity, get_jwt, | |
| set_access_cookies, set_refresh_cookies, | |
| unset_jwt_cookies | |
| ) | |
| from flask_bcrypt import Bcrypt | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| from project.database import ( | |
| create_user, find_user_by_email, find_user_by_id, | |
| add_token_to_blocklist | |
| ) | |
| from project.config import BCRYPT_PEPPER | |
| auth = Blueprint('auth', __name__) | |
| bcrypt = Bcrypt() | |
| limiter = Limiter(key_func=get_remote_address) | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _pepper(password: str) -> str: | |
| """Append server-side pepper before hashing. Means a leaked DB alone | |
| cannot crack passwords without also knowing this secret.""" | |
| return password + BCRYPT_PEPPER | |
| # ββ Register βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def register(): | |
| if request.method == 'POST': | |
| username = request.form.get('username', '').strip() | |
| email = request.form.get('email', '').strip().lower() | |
| password = request.form.get('password', '') | |
| confirm = request.form.get('confirm_password', '') | |
| if not username or not email or not password: | |
| flash('All fields are required.', 'error') | |
| return render_template('register.html') | |
| if password != confirm: | |
| flash('Passwords do not match.', 'error') | |
| return render_template('register.html') | |
| if len(password) < 6: | |
| flash('Password must be at least 6 characters.', 'error') | |
| return render_template('register.html') | |
| if find_user_by_email(email): | |
| flash('An account with that email already exists.', 'error') | |
| return render_template('register.html') | |
| pw_hash = bcrypt.generate_password_hash(_pepper(password)).decode('utf-8') | |
| # First user is automatically an admin if none exist (Convenience/Bootstrap) | |
| from project.database import get_db | |
| is_admin = get_db().users.count_documents({}) == 0 | |
| uid = create_user(username, email, pw_hash, is_admin=is_admin) | |
| if uid: | |
| return jsonify({"msg": "Account created! Please log in."}), 201 | |
| return jsonify({"detail": "Registration failed. Please try again."}), 400 | |
| return render_template('register.html') | |
| # ββ Login ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def login(): | |
| if request.method == 'POST': | |
| email = request.form.get('email', '').strip().lower() | |
| password = request.form.get('password', '') | |
| user_doc = find_user_by_email(email) | |
| if user_doc and bcrypt.check_password_hash( | |
| user_doc['password_hash'], _pepper(password)): | |
| user_id = str(user_doc['_id']) | |
| username = user_doc['username'] | |
| # Issue tokens β identity is the user_id string | |
| access_token = create_access_token( | |
| identity=user_id, | |
| additional_claims={ | |
| "username": username, | |
| "is_admin": user_doc.get("is_admin", False) | |
| } | |
| ) | |
| refresh_token = create_refresh_token(identity=user_id) | |
| # Store in HttpOnly cookies β JS cannot read these | |
| resp = make_response(jsonify({"msg": "Login successful", "username": username, "is_admin": user_doc.get("is_admin", False)})) | |
| set_access_cookies(resp, access_token) | |
| set_refresh_cookies(resp, refresh_token) | |
| return resp | |
| # Return JSON error for the Next.js frontend | |
| return jsonify({"detail": "Invalid email or password."}), 401 | |
| return render_template('login.html') | |
| # ββ Token Refresh ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def refresh(): | |
| """Issue a new short-lived access token using the long-lived refresh token.""" | |
| user_id = get_jwt_identity() | |
| user_doc = find_user_by_id(user_id) | |
| username = user_doc['username'] if user_doc else '' | |
| access_token = create_access_token( | |
| identity=user_id, | |
| additional_claims={ | |
| "username": username, | |
| "is_admin": user_doc.get("is_admin", False) | |
| } | |
| ) | |
| resp = make_response(redirect(request.referrer or url_for('index'))) | |
| set_access_cookies(resp, access_token) | |
| return resp | |
| # ββ Logout βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def logout(): | |
| """Revoke both tokens and clear cookies.""" | |
| jwt_data = get_jwt() | |
| if jwt_data: | |
| jti = jwt_data.get('jti') | |
| exp = datetime.fromtimestamp(jwt_data.get('exp', 0), tz=timezone.utc) | |
| add_token_to_blocklist(jti, exp) | |
| resp = make_response(redirect(url_for('auth.login'))) | |
| unset_jwt_cookies(resp) | |
| return resp | |
| # ββ Rate-limit error handler βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def rate_limit_hit(e): | |
| flash('Too many attempts. Please wait a moment and try again.', 'error') | |
| return render_template('login.html'), 429 | |