""" auth.py – JWT-based authentication blueprint Routes: /register /login /refresh /logout """ from datetime import datetime, timezone, timedelta from flask import Blueprint, render_template, redirect, url_for, request, flash, make_response, jsonify from flask_jwt_extended import ( create_access_token, create_refresh_token, jwt_required, get_jwt_identity, get_jwt, set_access_cookies, set_refresh_cookies, unset_jwt_cookies ) from flask_bcrypt import Bcrypt from flask_limiter import Limiter from flask_limiter.util import get_remote_address from project.database import ( create_user, find_user_by_email, find_user_by_id, add_token_to_blocklist ) from project.config import BCRYPT_PEPPER auth = Blueprint('auth', __name__) bcrypt = Bcrypt() limiter = Limiter(key_func=get_remote_address) # ── Helpers ──────────────────────────────────────────────────────────────────── def _pepper(password: str) -> str: """Append server-side pepper before hashing. Means a leaked DB alone cannot crack passwords without also knowing this secret.""" return password + BCRYPT_PEPPER # ── Register ─────────────────────────────────────────────────────────────────── @auth.route('/register', methods=['GET', 'POST']) @limiter.limit("3 per minute") def register(): if request.method == 'POST': username = request.form.get('username', '').strip() email = request.form.get('email', '').strip().lower() password = request.form.get('password', '') confirm = request.form.get('confirm_password', '') if not username or not email or not password: flash('All fields are required.', 'error') return render_template('register.html') if password != confirm: flash('Passwords do not match.', 'error') return render_template('register.html') if len(password) < 6: flash('Password must be at least 6 characters.', 'error') return render_template('register.html') if find_user_by_email(email): flash('An account with that email already exists.', 'error') return render_template('register.html') pw_hash = bcrypt.generate_password_hash(_pepper(password)).decode('utf-8') # First user is automatically an admin if none exist (Convenience/Bootstrap) from project.database import get_db is_admin = get_db().users.count_documents({}) == 0 uid = create_user(username, email, pw_hash, is_admin=is_admin) if uid: return jsonify({"msg": "Account created! Please log in."}), 201 return jsonify({"detail": "Registration failed. Please try again."}), 400 return render_template('register.html') # ── Login ────────────────────────────────────────────────────────────────────── @auth.route('/login', methods=['GET', 'POST']) @limiter.limit("5 per minute") def login(): if request.method == 'POST': email = request.form.get('email', '').strip().lower() password = request.form.get('password', '') user_doc = find_user_by_email(email) if user_doc and bcrypt.check_password_hash( user_doc['password_hash'], _pepper(password)): user_id = str(user_doc['_id']) username = user_doc['username'] # Issue tokens — identity is the user_id string access_token = create_access_token( identity=user_id, additional_claims={ "username": username, "is_admin": user_doc.get("is_admin", False) } ) refresh_token = create_refresh_token(identity=user_id) # Store in HttpOnly cookies — JS cannot read these resp = make_response(jsonify({"msg": "Login successful", "username": username, "is_admin": user_doc.get("is_admin", False)})) set_access_cookies(resp, access_token) set_refresh_cookies(resp, refresh_token) return resp # Return JSON error for the Next.js frontend return jsonify({"detail": "Invalid email or password."}), 401 return render_template('login.html') # ── Token Refresh ────────────────────────────────────────────────────────────── @auth.route('/refresh', methods=['POST']) @jwt_required(refresh=True) def refresh(): """Issue a new short-lived access token using the long-lived refresh token.""" user_id = get_jwt_identity() user_doc = find_user_by_id(user_id) username = user_doc['username'] if user_doc else '' access_token = create_access_token( identity=user_id, additional_claims={ "username": username, "is_admin": user_doc.get("is_admin", False) } ) resp = make_response(redirect(request.referrer or url_for('index'))) set_access_cookies(resp, access_token) return resp # ── Logout ───────────────────────────────────────────────────────────────────── @auth.route('/logout', methods=['GET', 'POST']) @jwt_required(optional=True) def logout(): """Revoke both tokens and clear cookies.""" jwt_data = get_jwt() if jwt_data: jti = jwt_data.get('jti') exp = datetime.fromtimestamp(jwt_data.get('exp', 0), tz=timezone.utc) add_token_to_blocklist(jti, exp) resp = make_response(redirect(url_for('auth.login'))) unset_jwt_cookies(resp) return resp # ── Rate-limit error handler ─────────────────────────────────────────────────── @auth.errorhandler(429) def rate_limit_hit(e): flash('Too many attempts. Please wait a moment and try again.', 'error') return render_template('login.html'), 429