Spaces:
Running
Running
| """ | |
| Authentication utilities and security functions | |
| Contains: | |
| - JWT token validation decorator | |
| - Security helpers | |
| - Username anonymization for logging | |
| - Cookie management utilities | |
| """ | |
| import os | |
| import jwt | |
| import hashlib | |
| from functools import wraps | |
| from flask import request, jsonify, current_app, make_response | |
| from .database import get_db_connection | |
| from .models import BlacklistedToken | |
| def anonymize_username(username): | |
| """Create anonymous hash for logging while preserving uniqueness""" | |
| if not username: | |
| return "anonymous" | |
| return hashlib.sha256(f"user_{username}_salt".encode()).hexdigest()[:12] | |
| def token_required(f): | |
| """ | |
| JWT token validation decorator | |
| Validates access token from cookies and checks blacklist. | |
| Returns username to the decorated function. | |
| """ | |
| def decorated(*args, **kwargs): | |
| token = request.cookies.get('access_token') | |
| if not token: | |
| return jsonify({"message": "Token is missing"}), 401 | |
| try: | |
| # Check blacklist | |
| conn = get_db_connection() | |
| if BlacklistedToken.is_blacklisted(conn, token): | |
| conn.close() | |
| return jsonify({"message": "Token has been revoked. Please log in again."}), 401 | |
| conn.close() | |
| # Decode and validate token | |
| data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"]) | |
| return f(data['username'], *args, **kwargs) | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({"message": "Token has expired"}), 401 | |
| except jwt.InvalidTokenError: | |
| return jsonify({"message": "Invalid token"}), 401 | |
| except Exception as e: | |
| current_app.logger.exception("Auth error: %s", e) | |
| return jsonify({"message": "Server error"}), 500 | |
| return decorated | |
| def extract_username_from_request(req) -> str | None: | |
| """ | |
| Extract username from various sources in request | |
| Checks in order: | |
| 1. X-User header | |
| 2. Request body JSON | |
| 3. JWT cookie | |
| """ | |
| # 1) Header | |
| hdr = req.headers.get("X-User") | |
| if hdr: | |
| return hdr | |
| # 2) Body | |
| data = req.get_json(silent=True) or {} | |
| if data.get("username"): | |
| return data.get("username") | |
| # 3) JWT cookie | |
| token = req.cookies.get("access_token") | |
| if token: | |
| try: | |
| payload = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"]) | |
| return payload.get("username") | |
| except jwt.ExpiredSignatureError: | |
| return None | |
| except jwt.InvalidTokenError: | |
| return None | |
| return None | |
| def add_cookie(resp, name: str, value: str, max_age: int): | |
| """ | |
| Add secure cookie to response | |
| In prod: Secure + SameSite=None + Partitioned (works with third-party cookie protections). | |
| In dev: SameSite=Lax, not Secure. | |
| """ | |
| IS_PROD = os.getenv("ENV", "dev").lower() == "prod" | |
| if IS_PROD: | |
| resp.headers.add( | |
| "Set-Cookie", | |
| f"{name}={value}; Path=/; Max-Age={max_age}; Secure; HttpOnly; SameSite=None; Partitioned" | |
| ) | |
| else: | |
| resp.set_cookie(name, value, httponly=True, secure=False, samesite="Lax", max_age=max_age, path="/") | |
| def validate_user_input(username: str, password: str) -> tuple[bool, str]: | |
| """ | |
| Validate user input for signup/login | |
| Returns: (is_valid, error_message) | |
| """ | |
| if not username or not password: | |
| return False, "Username and password are required" | |
| if len(username) < 3 or len(username) > 50: | |
| return False, "Username must be 3-50 characters" | |
| if len(password) < 8: | |
| return False, "Password must be at least 8 characters" | |
| # Additional validation can be added here | |
| # - Special character requirements | |
| # - Username format validation | |
| # - Password complexity checks | |
| return True, "" | |
| def is_admin_user(conn, username: str) -> bool: | |
| """Check if user has admin role""" | |
| from .models import User | |
| user = User.find_by_username(conn, username) | |
| return user is not None and user.role == 'admin' | |
| def log_security_event(event_type: str, username: str, ip_address: str, details: str = ""): | |
| """ | |
| Log security events with anonymized usernames | |
| Args: | |
| event_type: Type of security event (login, logout, failed_login, etc.) | |
| username: Username (will be anonymized) | |
| ip_address: Request IP address | |
| details: Additional details about the event | |
| """ | |
| user_hash = anonymize_username(username) | |
| current_app.logger.info( | |
| f"Security Event [{event_type}]: user_hash={user_hash}, ip={ip_address}, details={details}" | |
| ) |