""" Authentication Routes Flask routes for user registration, login, password reset, and account management """ from flask import Blueprint, request, jsonify, current_app from flask_cors import cross_origin from models.user import db from models.enhanced_user import User, VPNClient from functools import wraps import logging import re import secrets from datetime import datetime, timedelta logger = logging.getLogger(__name__) auth_bp = Blueprint('auth', __name__) def token_required(f): """Decorator to require valid JWT token""" @wraps(f) def decorated(*args, **kwargs): token = None # Get token from Authorization header if 'Authorization' in request.headers: auth_header = request.headers['Authorization'] try: token = auth_header.split(" ")[1] # Bearer except IndexError: return jsonify({'error': 'Invalid token format'}), 401 if not token: return jsonify({'error': 'Token is missing'}), 401 try: current_user = User.verify_auth_token(token) if current_user is None: return jsonify({'error': 'Token is invalid or expired'}), 401 if not current_user.is_active: return jsonify({'error': 'Account is deactivated'}), 401 except Exception as e: logger.error(f"Token verification error: {e}") return jsonify({'error': 'Token verification failed'}), 401 return f(current_user, *args, **kwargs) return decorated def admin_required(f): """Decorator to require admin privileges""" @wraps(f) def decorated(current_user, *args, **kwargs): if not current_user.is_admin: return jsonify({'error': 'Admin privileges required'}), 403 return f(current_user, *args, **kwargs) return decorated @auth_bp.route('/register', methods=['POST']) @cross_origin() def register(): """User registration endpoint""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 # Validate required fields required_fields = ['username', 'email', 'password'] for field in required_fields: if field not in data or not data[field]: return jsonify({'error': f'{field} is required'}), 400 username = data['username'].strip() email = data['email'].strip().lower() password = data['password'] # Validate input format if not User.validate_username(username): return jsonify({ 'error': 'Username must be 3-80 characters and contain only letters, numbers, hyphens, and underscores' }), 400 if not User.validate_email(email): return jsonify({'error': 'Invalid email format'}), 400 if not User.validate_password_strength(password): return jsonify({ 'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character' }), 400 # Check if user already exists if User.query.filter_by(username=username).first(): return jsonify({'error': 'Username already exists'}), 409 if User.query.filter_by(email=email).first(): return jsonify({'error': 'Email already registered'}), 409 # Create new user user = User(username=username, email=email, password=password) # Set subscription based on registration data subscription_type = data.get('subscription_type', 'free') if subscription_type in ['free', 'premium', 'enterprise']: user.subscription_type = subscription_type # Set limits based on subscription if subscription_type == 'premium': user.max_concurrent_connections = 3 user.bandwidth_limit_mbps = 50 user.subscription_expires = datetime.utcnow() + timedelta(days=30) elif subscription_type == 'enterprise': user.max_concurrent_connections = 10 user.bandwidth_limit_mbps = 100 user.subscription_expires = datetime.utcnow() + timedelta(days=30) db.session.add(user) db.session.commit() logger.info(f"New user registered: {username} ({email})") # Generate tokens auth_token = user.generate_auth_token() refresh_token = user.generate_refresh_token() return jsonify({ 'message': 'User registered successfully', 'user': user.to_dict(), 'auth_token': auth_token, 'refresh_token': refresh_token, 'email_verification_required': not user.email_verified }), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: logger.error(f"Registration error: {e}") db.session.rollback() return jsonify({'error': 'Registration failed'}), 500 @auth_bp.route('/login', methods=['POST']) @cross_origin() def login(): """User login endpoint""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 # Validate required fields if 'login' not in data or 'password' not in data: return jsonify({'error': 'Login and password are required'}), 400 login_field = data['login'].strip() password = data['password'] # Find user by username or email user = None if '@' in login_field: user = User.query.filter_by(email=login_field.lower()).first() else: user = User.query.filter_by(username=login_field).first() if not user: return jsonify({'error': 'Invalid credentials'}), 401 if not user.is_active: return jsonify({'error': 'Account is deactivated'}), 401 if user.is_account_locked(): return jsonify({ 'error': 'Account is temporarily locked due to failed login attempts' }), 423 if not user.check_password(password): db.session.commit() # Save failed attempt count return jsonify({'error': 'Invalid credentials'}), 401 # Update last login user.last_login = datetime.utcnow() db.session.commit() logger.info(f"User logged in: {user.username}") # Generate tokens auth_token = user.generate_auth_token() refresh_token = user.generate_refresh_token() return jsonify({ 'message': 'Login successful', 'user': user.to_dict(include_sensitive='self'), 'auth_token': auth_token, 'refresh_token': refresh_token }), 200 except Exception as e: logger.error(f"Login error: {e}") return jsonify({'error': 'Login failed'}), 500 @auth_bp.route('/refresh', methods=['POST']) @cross_origin() def refresh_token(): """Refresh authentication token""" try: data = request.get_json() if not data or 'refresh_token' not in data: return jsonify({'error': 'Refresh token is required'}), 400 refresh_token = data['refresh_token'] user = User.verify_refresh_token(refresh_token) if not user: return jsonify({'error': 'Invalid or expired refresh token'}), 401 if not user.is_active: return jsonify({'error': 'Account is deactivated'}), 401 # Generate new tokens new_auth_token = user.generate_auth_token() new_refresh_token = user.generate_refresh_token() return jsonify({ 'auth_token': new_auth_token, 'refresh_token': new_refresh_token, 'user': user.to_dict() }), 200 except Exception as e: logger.error(f"Token refresh error: {e}") return jsonify({'error': 'Token refresh failed'}), 500 @auth_bp.route('/logout', methods=['POST']) @cross_origin() @token_required def logout(current_user): """User logout endpoint""" try: # In a production system, you would invalidate the token # For now, we just return success logger.info(f"User logged out: {current_user.username}") return jsonify({'message': 'Logout successful'}), 200 except Exception as e: logger.error(f"Logout error: {e}") return jsonify({'error': 'Logout failed'}), 500 @auth_bp.route('/profile', methods=['GET']) @cross_origin() @token_required def get_profile(current_user): """Get user profile""" try: return jsonify({ 'user': current_user.to_dict(include_sensitive='self') }), 200 except Exception as e: logger.error(f"Profile retrieval error: {e}") return jsonify({'error': 'Failed to retrieve profile'}), 500 @auth_bp.route('/profile', methods=['PUT']) @cross_origin() @token_required def update_profile(current_user): """Update user profile""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 # Update allowed fields if 'email' in data: new_email = data['email'].strip().lower() if new_email != current_user.email: if not User.validate_email(new_email): return jsonify({'error': 'Invalid email format'}), 400 # Check if email is already taken existing_user = User.query.filter_by(email=new_email).first() if existing_user and existing_user.id != current_user.id: return jsonify({'error': 'Email already registered'}), 409 current_user.email = new_email current_user.email_verified = False current_user.email_verification_token = secrets.token_urlsafe(32) db.session.commit() logger.info(f"Profile updated for user: {current_user.username}") return jsonify({ 'message': 'Profile updated successfully', 'user': current_user.to_dict(include_sensitive='self') }), 200 except Exception as e: logger.error(f"Profile update error: {e}") db.session.rollback() return jsonify({'error': 'Profile update failed'}), 500 @auth_bp.route('/change-password', methods=['POST']) @cross_origin() @token_required def change_password(current_user): """Change user password""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 # Validate required fields required_fields = ['current_password', 'new_password'] for field in required_fields: if field not in data: return jsonify({'error': f'{field} is required'}), 400 current_password = data['current_password'] new_password = data['new_password'] # Verify current password if not current_user.check_password(current_password): return jsonify({'error': 'Current password is incorrect'}), 401 # Validate new password if not User.validate_password_strength(new_password): return jsonify({ 'error': 'New password must be at least 8 characters with uppercase, lowercase, number, and special character' }), 400 # Set new password current_user.set_password(new_password) db.session.commit() logger.info(f"Password changed for user: {current_user.username}") return jsonify({'message': 'Password changed successfully'}), 200 except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: logger.error(f"Password change error: {e}") db.session.rollback() return jsonify({'error': 'Password change failed'}), 500 @auth_bp.route('/forgot-password', methods=['POST']) @cross_origin() def forgot_password(): """Request password reset""" try: data = request.get_json() if not data or 'email' not in data: return jsonify({'error': 'Email is required'}), 400 email = data['email'].strip().lower() user = User.query.filter_by(email=email).first() if user: reset_token = user.generate_password_reset_token() db.session.commit() # In a production system, you would send an email here logger.info(f"Password reset requested for user: {user.username}") # For development, return the token (remove in production) return jsonify({ 'message': 'Password reset instructions sent to email', 'reset_token': reset_token # Remove this in production }), 200 else: # Don't reveal if email exists return jsonify({ 'message': 'If the email exists, password reset instructions have been sent' }), 200 except Exception as e: logger.error(f"Password reset request error: {e}") return jsonify({'error': 'Password reset request failed'}), 500 @auth_bp.route('/reset-password', methods=['POST']) @cross_origin() def reset_password(): """Reset password with token""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 # Validate required fields required_fields = ['email', 'token', 'new_password'] for field in required_fields: if field not in data: return jsonify({'error': f'{field} is required'}), 400 email = data['email'].strip().lower() token = data['token'] new_password = data['new_password'] user = User.query.filter_by(email=email).first() if not user: return jsonify({'error': 'Invalid reset request'}), 400 if not User.validate_password_strength(new_password): return jsonify({ 'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character' }), 400 if user.reset_password(new_password, token): db.session.commit() logger.info(f"Password reset completed for user: {user.username}") return jsonify({'message': 'Password reset successfully'}), 200 else: return jsonify({'error': 'Invalid or expired reset token'}), 400 except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: logger.error(f"Password reset error: {e}") db.session.rollback() return jsonify({'error': 'Password reset failed'}), 500 @auth_bp.route('/verify-email', methods=['POST']) @cross_origin() def verify_email(): """Verify email address""" try: data = request.get_json() if not data or 'token' not in data: return jsonify({'error': 'Verification token is required'}), 400 token = data['token'] # Find user by verification token user = User.query.filter_by(email_verification_token=token).first() if not user: return jsonify({'error': 'Invalid verification token'}), 400 if user.verify_email(token): db.session.commit() logger.info(f"Email verified for user: {user.username}") return jsonify({'message': 'Email verified successfully'}), 200 else: return jsonify({'error': 'Email verification failed'}), 400 except Exception as e: logger.error(f"Email verification error: {e}") return jsonify({'error': 'Email verification failed'}), 500 @auth_bp.route('/users', methods=['GET']) @cross_origin() @token_required @admin_required def list_users(current_user): """List all users (admin only)""" try: page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 20, type=int) search = request.args.get('search', '') query = User.query if search: query = query.filter( db.or_( User.username.contains(search), User.email.contains(search) ) ) users = query.paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'users': [user.to_dict(include_sensitive=True) for user in users.items], 'total': users.total, 'pages': users.pages, 'current_page': page, 'per_page': per_page }), 200 except Exception as e: logger.error(f"User listing error: {e}") return jsonify({'error': 'Failed to retrieve users'}), 500 @auth_bp.route('/users/', methods=['GET']) @cross_origin() @token_required @admin_required def get_user(current_user, user_id): """Get specific user details (admin only)""" try: user = User.query.get_or_404(user_id) return jsonify({ 'user': user.to_dict(include_sensitive=True) }), 200 except Exception as e: logger.error(f"User retrieval error: {e}") return jsonify({'error': 'Failed to retrieve user'}), 500 @auth_bp.route('/users//deactivate', methods=['POST']) @cross_origin() @token_required @admin_required def deactivate_user(current_user, user_id): """Deactivate user account (admin only)""" try: user = User.query.get_or_404(user_id) if user.id == current_user.id: return jsonify({'error': 'Cannot deactivate your own account'}), 400 user.is_active = False db.session.commit() logger.info(f"User deactivated by admin {current_user.username}: {user.username}") return jsonify({'message': 'User deactivated successfully'}), 200 except Exception as e: logger.error(f"User deactivation error: {e}") db.session.rollback() return jsonify({'error': 'Failed to deactivate user'}), 500 @auth_bp.route('/users//activate', methods=['POST']) @cross_origin() @token_required @admin_required def activate_user(current_user, user_id): """Activate user account (admin only)""" try: user = User.query.get_or_404(user_id) user.is_active = True user.failed_login_attempts = 0 user.account_locked_until = None db.session.commit() logger.info(f"User activated by admin {current_user.username}: {user.username}") return jsonify({'message': 'User activated successfully'}), 200 except Exception as e: logger.error(f"User activation error: {e}") db.session.rollback() return jsonify({'error': 'Failed to activate user'}), 500