Spaces:
Runtime error
Runtime error
File size: 4,584 Bytes
59f2028 da8ce94 59f2028 da8ce94 59f2028 da8ce94 59f2028 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """
Authentication utilities and security functions
Contains:
- JWT token validation decorator
- Security helpers
- Username anonymization for logging
- Cookie management utilities
"""
import os
import jwt
import hashlib
from functools import wraps
from flask import request, jsonify, current_app, make_response
from .models import BlacklistedToken
def anonymize_username(username):
"""Create anonymous hash for logging while preserving uniqueness"""
if not username:
return "anonymous"
return hashlib.sha256(f"user_{username}_salt".encode()).hexdigest()[:12]
def token_required(f):
"""
JWT token validation decorator
Validates access token from cookies and checks blacklist.
Returns username to the decorated function.
"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get('access_token')
if not token:
return jsonify({"message": "Token is missing"}), 401
try:
# Check blacklist
if BlacklistedToken.is_blacklisted(token):
return jsonify({"message": "Token has been revoked. Please log in again."}), 401
# Decode and validate token
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
return f(data['username'], *args, **kwargs)
except jwt.ExpiredSignatureError:
return jsonify({"message": "Token has expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"message": "Invalid token"}), 401
except Exception as e:
current_app.logger.exception("Auth error: %s", e)
return jsonify({"message": "Server error"}), 500
return decorated
def extract_username_from_request(req) -> str | None:
"""
Extract username from various sources in request
Checks in order:
1. X-User header
2. Request body JSON
3. JWT cookie
"""
# 1) Header
hdr = req.headers.get("X-User")
if hdr:
return hdr
# 2) Body
data = req.get_json(silent=True) or {}
if data.get("username"):
return data.get("username")
# 3) JWT cookie
token = req.cookies.get("access_token")
if token:
try:
payload = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"])
return payload.get("username")
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
return None
def add_cookie(resp, name: str, value: str, max_age: int):
"""
Add secure cookie to response
In prod: Secure + SameSite=None + Partitioned (works with third-party cookie protections).
In dev: SameSite=Lax, not Secure.
"""
IS_PROD = os.getenv("ENV", "dev").lower() == "prod"
if IS_PROD:
resp.headers.add(
"Set-Cookie",
f"{name}={value}; Path=/; Max-Age={max_age}; Secure; HttpOnly; SameSite=None; Partitioned"
)
else:
resp.set_cookie(name, value, httponly=True, secure=False, samesite="Lax", max_age=max_age, path="/")
def validate_user_input(username: str, password: str) -> tuple[bool, str]:
"""
Validate user input for signup/login
Returns: (is_valid, error_message)
"""
if not username or not password:
return False, "Username and password are required"
if len(username) < 3 or len(username) > 50:
return False, "Username must be 3-50 characters"
if len(password) < 8:
return False, "Password must be at least 8 characters"
# Additional validation can be added here
# - Special character requirements
# - Username format validation
# - Password complexity checks
return True, ""
def is_admin_user(username: str) -> bool:
"""Check if user has admin role"""
from .models import User
user = User.find_by_username(username)
return user is not None and user.role == 'admin'
def log_security_event(event_type: str, username: str, ip_address: str, details: str = ""):
"""
Log security events with anonymized usernames
Args:
event_type: Type of security event (login, logout, failed_login, etc.)
username: Username (will be anonymized)
ip_address: Request IP address
details: Additional details about the event
"""
user_hash = anonymize_username(username)
current_app.logger.info(
f"Security Event [{event_type}]: user_hash={user_hash}, ip={ip_address}, details={details}"
) |