proofly / auth.py
Pragthedon's picture
Fix login loop: JSON errors + emergency-reset route
276c3e2
"""
auth.py – JWT-based authentication blueprint
Routes: /register /login /refresh /logout
"""
from datetime import datetime, timezone, timedelta
from flask import Blueprint, render_template, redirect, url_for, request, flash, make_response, jsonify
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt,
set_access_cookies, set_refresh_cookies,
unset_jwt_cookies
)
from flask_bcrypt import Bcrypt
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from project.database import (
create_user, find_user_by_email, find_user_by_id,
add_token_to_blocklist
)
from project.config import BCRYPT_PEPPER
auth = Blueprint('auth', __name__)
bcrypt = Bcrypt()
limiter = Limiter(key_func=get_remote_address)
# ── Helpers ────────────────────────────────────────────────────────────────────
def _pepper(password: str) -> str:
"""Append server-side pepper before hashing. Means a leaked DB alone
cannot crack passwords without also knowing this secret."""
return password + BCRYPT_PEPPER
# ── Register ───────────────────────────────────────────────────────────────────
@auth.route('/register', methods=['GET', 'POST'])
@limiter.limit("3 per minute")
def register():
if request.method == 'POST':
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip().lower()
password = request.form.get('password', '')
confirm = request.form.get('confirm_password', '')
if not username or not email or not password:
flash('All fields are required.', 'error')
return render_template('register.html')
if password != confirm:
flash('Passwords do not match.', 'error')
return render_template('register.html')
if len(password) < 6:
flash('Password must be at least 6 characters.', 'error')
return render_template('register.html')
if find_user_by_email(email):
flash('An account with that email already exists.', 'error')
return render_template('register.html')
pw_hash = bcrypt.generate_password_hash(_pepper(password)).decode('utf-8')
# First user is automatically an admin if none exist (Convenience/Bootstrap)
from project.database import get_db
is_admin = get_db().users.count_documents({}) == 0
uid = create_user(username, email, pw_hash, is_admin=is_admin)
if uid:
return jsonify({"msg": "Account created! Please log in."}), 201
return jsonify({"detail": "Registration failed. Please try again."}), 400
return render_template('register.html')
# ── Login ──────────────────────────────────────────────────────────────────────
@auth.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute")
def login():
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
password = request.form.get('password', '')
user_doc = find_user_by_email(email)
if user_doc and bcrypt.check_password_hash(
user_doc['password_hash'], _pepper(password)):
user_id = str(user_doc['_id'])
username = user_doc['username']
# Issue tokens β€” identity is the user_id string
access_token = create_access_token(
identity=user_id,
additional_claims={
"username": username,
"is_admin": user_doc.get("is_admin", False)
}
)
refresh_token = create_refresh_token(identity=user_id)
# Store in HttpOnly cookies β€” JS cannot read these
resp = make_response(jsonify({"msg": "Login successful", "username": username, "is_admin": user_doc.get("is_admin", False)}))
set_access_cookies(resp, access_token)
set_refresh_cookies(resp, refresh_token)
return resp
# Return JSON error for the Next.js frontend
return jsonify({"detail": "Invalid email or password."}), 401
return render_template('login.html')
# ── Token Refresh ──────────────────────────────────────────────────────────────
@auth.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Issue a new short-lived access token using the long-lived refresh token."""
user_id = get_jwt_identity()
user_doc = find_user_by_id(user_id)
username = user_doc['username'] if user_doc else ''
access_token = create_access_token(
identity=user_id,
additional_claims={
"username": username,
"is_admin": user_doc.get("is_admin", False)
}
)
resp = make_response(redirect(request.referrer or url_for('index')))
set_access_cookies(resp, access_token)
return resp
# ── Logout ─────────────────────────────────────────────────────────────────────
@auth.route('/logout', methods=['GET', 'POST'])
@jwt_required(optional=True)
def logout():
"""Revoke both tokens and clear cookies."""
jwt_data = get_jwt()
if jwt_data:
jti = jwt_data.get('jti')
exp = datetime.fromtimestamp(jwt_data.get('exp', 0), tz=timezone.utc)
add_token_to_blocklist(jti, exp)
resp = make_response(redirect(url_for('auth.login')))
unset_jwt_cookies(resp)
return resp
# ── Rate-limit error handler ───────────────────────────────────────────────────
@auth.errorhandler(429)
def rate_limit_hit(e):
flash('Too many attempts. Please wait a moment and try again.', 'error')
return render_template('login.html'), 429