import functools import requests import time from collections import defaultdict from flask import Blueprint, render_template, request, redirect, url_for, session, flash from ..config import FIREBASE_WEB_API_KEY auth_bp = Blueprint('auth', __name__) # --- Rate Limiter --- LOGIN_ATTEMPTS = defaultdict(list) MAX_ATTEMPTS = 5 WINDOW_SECONDS = 300 # 5 minutes def is_rate_limited(ip: str) -> bool: now = time.time() # Clean up old attempts outside the window LOGIN_ATTEMPTS[ip] = [t for t in LOGIN_ATTEMPTS[ip] if now - t < WINDOW_SECONDS] if len(LOGIN_ATTEMPTS[ip]) >= MAX_ATTEMPTS: return True LOGIN_ATTEMPTS[ip].append(now) return False # --- Helper Decorator --- def login_required(f): @functools.wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: # Capture the full path the user was trying to access return redirect(url_for('auth.login', next=request.full_path)) return f(*args, **kwargs) return decorated_function # --- Login Route --- @auth_bp.route("/login", methods=["GET", "POST"]) def login(): if 'user_id' in session: return redirect(url_for('main.home')) if request.method == "POST": if is_rate_limited(request.remote_addr): return render_template("auth/register.html", mode="register", error="Too many attempts. Please wait 5 minutes.") email = request.form.get("email") password = request.form.get("password") if FIREBASE_WEB_API_KEY: # Exchange password for auth token via Google Identity Toolkit url = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={FIREBASE_WEB_API_KEY}" resp = requests.post(url, json={"email": email, "password": password, "returnSecureToken": True}) if resp.status_code == 200: # Activate the 30-day persistent session session.permanent = True session['user_id'] = resp.json()['localId'] # Retrieve the 'next' destination from the URL parameters next_page = request.args.get('next') # Security Check: Ensure 'next_page' is a relative path (starts with /) if not next_page or not next_page.startswith('/'): next_page = url_for('main.home') return redirect(next_page) else: return render_template("auth/login.html", mode="login", error="Invalid email or password") else: return render_template("auth/login.html", mode="login", error="Authentication system not configured") return render_template("auth/login.html", mode="login") @auth_bp.route("/register", methods=["GET", "POST"]) def register(): if 'user_id' in session: return redirect(url_for('main.home')) if request.method == "POST": if is_rate_limited(request.remote_addr): return render_template("auth/register.html", mode="register", error="Too many attempts. Please wait 5 minutes.") email = request.form.get("email") password = request.form.get("password") if FIREBASE_WEB_API_KEY: url = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={FIREBASE_WEB_API_KEY}" resp = requests.post(url, json={"email": email, "password": password, "returnSecureToken": True}) if resp.status_code == 200: session.permanent = True session['user_id'] = resp.json()['localId'] flash("Registration Successful! Welcome to the Toolkit.", "success") return redirect(url_for('main.setup')) else: error_msg = resp.json().get('error', {}).get('message', 'Registration failed') return render_template("auth/register.html", mode="register", error=error_msg) else: return render_template("auth/register.html", mode="register", error="Registration system not configured") return render_template("auth/register.html", mode="register") @auth_bp.route("/reset-password", methods=["GET", "POST"]) def reset_password(): if request.method == "POST": if is_rate_limited(request.remote_addr): return render_template("auth/register.html", mode="register", error="Too many attempts. Please wait 5 minutes.") email = request.form.get("email") if not email: return render_template("auth/reset.html", mode="reset", error="Please enter your email.") if FIREBASE_WEB_API_KEY: url = f"https://identitytoolkit.googleapis.com/v1/accounts:sendOobCode?key={FIREBASE_WEB_API_KEY}" resp = requests.post(url, json={"requestType": "PASSWORD_RESET", "email": email}) if resp.status_code == 200: return render_template("auth/reset.html", mode="reset", success="Password reset email sent!") else: return render_template("auth/reset.html", mode="reset", error="Error sending reset email") else: return render_template("auth/reset.html", mode="reset", error="Password reset not configured") return render_template("auth/reset.html", mode="reset") @auth_bp.route("/logout") def logout(): session.clear() return redirect(url_for('auth.login'))