File size: 15,808 Bytes
25f22bf e3d8d4f 215a5ae c7d5529 b3c01df 25f22bf e3d8d4f 25f22bf b3c01df 25f22bf b3c01df 25f22bf b3c01df 25f22bf 215a5ae 25f22bf 215a5ae 25f22bf b3c01df 25f22bf b3c01df e3d8d4f b3c01df e3d8d4f 25f22bf e3d8d4f 25f22bf b3c01df e3d8d4f b3c01df e3d8d4f b3c01df e3d8d4f b3c01df e3d8d4f b3c01df 25f22bf e3d8d4f 25f22bf b3c01df 25f22bf 74728f5 25f22bf b3c01df 25f22bf b3c01df 25f22bf 74728f5 b3c01df 25f22bf b3c01df 25f22bf 74728f5 25f22bf b3c01df 25f22bf b3c01df e3d8d4f 25f22bf b3c01df 25f22bf 74728f5 25f22bf 74728f5 25f22bf b3c01df 25f22bf 74728f5 25f22bf b3c01df 25f22bf e3d8d4f 25f22bf b3c01df 25f22bf b3c01df 25f22bf e3d8d4f 25f22bf b3c01df 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf b3c01df 25f22bf 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df e3d8d4f 67b1bef b3c01df e3d8d4f b3c01df 67b1bef e3d8d4f 67b1bef e3d8d4f 67b1bef 19adbfe 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df 67b1bef 215a5ae 67b1bef 215a5ae 67b1bef b3c01df 67b1bef b3c01df e3d8d4f 67b1bef e3d8d4f 67b1bef b3c01df 67b1bef b3c01df 67b1bef b3c01df 67b1bef e3d8d4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity
from email_validator import validate_email, EmailNotValidError
from backend.services.auth_service import register_user, login_user, get_user_by_id, request_password_reset, reset_user_password
from backend.models.user import User
from backend.utils.country_language_data import COUNTRIES, LANGUAGES
auth_bp = Blueprint('auth', __name__)
def validate_email_format(email: str) -> tuple[bool, str]:
"""
Validate email format using email-validator library.
Args:
email: Email string to validate
Returns:
Tuple of (is_valid, validated_email_or_error_message)
"""
try:
validated = validate_email(email)
return True, validated['email']
except EmailNotValidError as e:
return False, str(e)
@auth_bp.route('/', methods=['OPTIONS'])
def handle_options():
"""Handle OPTIONS requests for preflight CORS checks."""
return '', 200
@auth_bp.route('/register', methods=['OPTIONS'])
def handle_register_options():
"""Handle OPTIONS requests for preflight CORS checks for register route."""
return '', 200
@auth_bp.route('/register', methods=['POST'])
def register():
"""
Register a new user.
Request Body:
email (str): User email
password (str): User password
country (str, optional): User country (ISO 3166-1 alpha-2 code)
language (str, optional): User language (ISO 639-1 code)
Returns:
JSON: Registration result
"""
try:
data = request.get_json()
# Validate required fields
if not data or not all(k in data for k in ('email', 'password')):
return jsonify({
'success': False,
'message': 'Email and password are required'
}), 400
email = data['email']
password = data['password']
country = data.get('country') # Optional: User country (ISO 3166-1 alpha-2 code)
language = data.get('language') # Optional: User language (ISO 639-1 code)
# Validate email format using email-validator
is_valid_email, validated_email_or_error = validate_email_format(email)
if not is_valid_email:
return jsonify({
'success': False,
'message': f'Invalid email format: {validated_email_or_error}'
}), 400
# Use validated email (it may be normalized)
email = validated_email_or_error
# Validate password strength
password_validation = validate_password_strength(password)
if not password_validation['valid']:
return jsonify({
'success': False,
'message': password_validation['message']
}), 400
# Optional: Validate country and language parameters if provided
if country:
# Validate if country is a valid ISO 3166-1 alpha-2 code
if not isinstance(country, str) or len(country) != 2 or not country.isalpha():
return jsonify({
'success': False,
'message': 'Country must be a valid ISO 3166-1 alpha-2 code (2 alphabetic characters)'
}), 400
if language:
# Validate if language is a valid ISO 639-1 code
if not isinstance(language, str) or len(language) != 2 or not language.isalpha():
return jsonify({
'success': False,
'message': 'Language must be a valid ISO 639-1 code (2 alphabetic characters)'
}), 400
# Register user with preferences
result = register_user(email, password, country, language)
if result['success']:
return jsonify(result), 201
else:
# Avoid exposing specific reasons for registration failure (security)
if 'already exist' in result.get('message', '').lower():
return jsonify({
'success': False,
'message': 'Account with this email already exists'
}), 400
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"Registration error: {str(e)}")
return jsonify({
'success': False,
'message': 'An error occurred during registration'
}), 500
@auth_bp.route('/login', methods=['OPTIONS'])
def handle_login_options():
"""Handle OPTIONS requests for preflight CORS checks for login route."""
from flask import current_app
current_app.logger.info(f"OPTIONS request for /login from {request.remote_addr}")
current_app.logger.info(f"Request headers: {dict(request.headers)}")
return '', 200
@auth_bp.route('/login', methods=['POST'])
def login():
"""
Authenticate and login a user.
Request Body:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session (optional)
Returns:
JSON: Login result with JWT token
"""
try:
# Log the incoming request
current_app.logger.info(f"Login request received from {request.remote_addr}")
current_app.logger.info(f"Request headers: {dict(request.headers)}")
data = request.get_json()
# Validate required fields
if not data or not all(k in data for k in ('email', 'password')):
current_app.logger.warning("Login failed: Missing email or password")
return jsonify({
'success': False,
'message': 'Email and password are required'
}), 400
email = data['email']
password = data['password']
remember_me = data.get('remember_me', False)
# Validate email format
is_valid_email, validated_email_or_error = validate_email_format(email)
if not is_valid_email:
current_app.logger.warning(f"Login attempt with invalid email format: {email}")
# Do not reveal that email format is invalid to avoid enumeration
return jsonify({
'success': False,
'message': 'Invalid email or password'
}), 401
# Use validated email (it may be normalized)
email = validated_email_or_error
# Login user
result = login_user(email, password, remember_me)
if result['success']:
# Set CORS headers explicitly
response_data = jsonify(result)
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
current_app.logger.info(f"Login successful for user {email}")
return response_data, 200
else:
current_app.logger.warning(f"Login failed for user {email}: {result.get('message', 'Unknown error')}")
return jsonify(result), 401
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}", exc_info=True)
return jsonify({
'success': False,
'message': 'An error occurred during login'
}), 500
@auth_bp.route('/logout', methods=['OPTIONS'])
def handle_logout_options():
"""Handle OPTIONS requests for preflight CORS checks for logout route."""
return '', 200
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def logout():
"""
Logout current user.
Returns:
JSON: Logout result
"""
try:
current_app.logger.info(f"Logout request for user: {get_jwt_identity()}")
return jsonify({
'success': True,
'message': 'Logged out successfully'
}), 200
except Exception as e:
current_app.logger.error(f"Logout error: {str(e)}")
return jsonify({
'success': False,
'message': 'An error occurred during logout'
}), 500
@auth_bp.route('/user', methods=['OPTIONS'])
def handle_user_options():
"""Handle OPTIONS requests for preflight CORS checks for user route."""
return '', 200
@auth_bp.route('/user', methods=['GET'])
@jwt_required()
def get_current_user():
"""
Get current authenticated user.
Returns:
JSON: Current user data
"""
try:
user_id = get_jwt_identity()
current_app.logger.info(f"Get user profile request for user: {user_id}")
user_data = get_user_by_id(user_id)
if user_data:
# Remove sensitive information from user data
safe_user_data = {k: v for k, v in user_data.items() if k not in ['password', 'password_hash']}
return jsonify({
'success': True,
'user': safe_user_data
}), 200
else:
return jsonify({
'success': False,
'message': 'User not found'
}), 404
except Exception as e:
current_app.logger.error(f"Get user error: {str(e)}")
return jsonify({
'success': False,
'message': 'An error occurred while fetching user data'
}), 500
@auth_bp.route('/registration-options', methods=['GET'])
def get_registration_options():
"""
Get registration options including countries and languages.
Returns:
JSON: Registration options
"""
try:
return jsonify({
'success': True,
'countries': COUNTRIES,
'languages': LANGUAGES
}), 200
except Exception as e:
current_app.logger.error(f"Get registration options error: {str(e)}")
return jsonify({
'success': False,
'message': 'An error occurred while fetching registration options'
}), 500
@auth_bp.route('/forgot-password', methods=['OPTIONS'])
def handle_forgot_password_options():
"""Handle OPTIONS requests for preflight CORS checks for forgot password route."""
return '', 200
@auth_bp.route('/forgot-password', methods=['POST'])
def forgot_password():
"""
Request password reset for a user.
Request Body:
email (str): User email
Returns:
JSON: Password reset request result
"""
try:
data = request.get_json()
# Validate required fields
if not data or 'email' not in data:
return jsonify({
'success': False,
'message': 'Email is required'
}), 400
email = data['email']
# Validate email format
is_valid_email, validated_email_or_error = validate_email_format(email)
if not is_valid_email:
# Don't reveal that email format is invalid to prevent enumeration
current_app.logger.warning(f"Forgot password request with invalid email format: {email}")
# Return success to prevent user enumeration
return jsonify({
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}), 200
# Use validated email (it may be normalized)
email = validated_email_or_error
# Request password reset (this should be handled in a way that doesn't reveal user existence)
result = request_password_reset(current_app.supabase, email)
# Always return success to prevent user enumeration
return jsonify({
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}), 200
except Exception as e:
current_app.logger.error(f"Forgot password error: {str(e)}")
# Even on error, don't reveal if user exists
return jsonify({
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}), 200
@auth_bp.route('/reset-password', methods=['OPTIONS'])
def handle_reset_password_options():
"""Handle OPTIONS requests for preflight CORS checks for reset password route."""
return '', 200
@auth_bp.route('/reset-password', methods=['GET'])
def show_reset_password_form():
"""
Serve the password reset form.
This endpoint is accessed via the link sent in the password reset email.
The token will be available as a query parameter (e.g., ?token=abc123).
The SPA frontend should read this token and display the form accordingly.
"""
# Log the access for monitoring
current_app.logger.info("Password reset form page accessed.")
# For an SPA, serving index.html allows the frontend router to take over.
# The frontend can access query parameters like 'token' using its router or window.location.
# No specific action is needed here other than ensuring the route is recognized.
# The main app.py serve_frontend will ultimately serve the SPA's index.html.
return '', 200 # Let the main SPA handler take over
@auth_bp.route('/reset-password', methods=['POST'])
def reset_password():
"""
Reset user password with token.
Request Body:
token (str): Password reset token
password (str): New password
Returns:
JSON: Password reset result
"""
try:
data = request.get_json()
# Validate required fields
if not data or not all(k in data for k in ('token', 'password')):
return jsonify({
'success': False,
'message': 'Token and password are required'
}), 400
token = data['token']
password = data['password']
# Validate password strength
password_validation = validate_password_strength(password)
if not password_validation['valid']:
return jsonify({
'success': False,
'message': password_validation['message']
}), 400
# Reset password
result = reset_user_password(current_app.supabase, token, password)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"Reset password error: {str(e)}")
return jsonify({
'success': False,
'message': 'An error occurred while resetting your password'
}), 500
def validate_password_strength(password: str) -> dict:
"""
Validates password strength based on security requirements.
Args:
password: Password string to validate
Returns:
Dictionary with validation result and message
"""
if len(password) < 8:
return {
'valid': False,
'message': 'Password must be at least 8 characters long'
}
# Check for complexity requirements
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_digit = any(c.isdigit() for c in password)
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
if not has_upper:
return {
'valid': False,
'message': 'Password must contain at least one uppercase letter'
}
if not has_lower:
return {
'valid': False,
'message': 'Password must contain at least one lowercase letter'
}
if not has_digit:
return {
'valid': False,
'message': 'Password must contain at least one number'
}
if not has_special:
return {
'valid': False,
'message': 'Password must contain at least one special character'
}
return {
'valid': True,
'message': 'Password is valid'
} |