| | |
| | """ |
| | Authentication logic: signup, login, token management |
| | """ |
| | from flask import jsonify |
| | from database import get_supabase |
| | from crypto_utils import ( |
| | hash_password, |
| | verify_password, |
| | hash_username_for_storage, |
| | generate_encryption_key |
| | ) |
| | import jwt |
| | from datetime import datetime, timedelta |
| | import os |
| |
|
| | |
| | JWT_SECRET = os.getenv("JWT_SECRET", "change-this-secret-in-production") |
| | JWT_ALGORITHM = "HS256" |
| | JWT_EXPIRATION_DAYS = 7 |
| |
|
| | def create_jwt_token(user_id: str, username_hash: str) -> str: |
| | """ |
| | Create JWT token for authenticated user |
| | Args: |
| | user_id: User UUID from database |
| | username_hash: Hashed username |
| | Returns: |
| | JWT token string |
| | """ |
| | expiration = datetime.utcnow() + timedelta(days=JWT_EXPIRATION_DAYS) |
| | |
| | payload = { |
| | "user_id": user_id, |
| | "username_hash": username_hash, |
| | "exp": expiration, |
| | "iat": datetime.utcnow() |
| | } |
| | |
| | token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) |
| | return token |
| |
|
| | def verify_jwt_token(token: str) -> dict: |
| | """ |
| | Verify and decode JWT token |
| | Args: |
| | token: JWT token string |
| | Returns: |
| | Decoded payload dict or None if invalid |
| | """ |
| | try: |
| | payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) |
| | return payload |
| | except jwt.ExpiredSignatureError: |
| | print("Token expired") |
| | return None |
| | except jwt.InvalidTokenError: |
| | print("Invalid token") |
| | return None |
| |
|
| | def signup_user(username: str, password: str): |
| | """ |
| | Register new user |
| | Args: |
| | username: Plain text username |
| | password: Plain text password |
| | Returns: |
| | Tuple of (result_dict, status_code) |
| | """ |
| | supabase = get_supabase() |
| | |
| | |
| | if not username or not password: |
| | return {"error": "Username and password required"}, 400 |
| | |
| | if len(username) < 3 or len(username) > 20: |
| | return {"error": "Username must be 3-20 characters"}, 400 |
| | |
| | if not username.isalnum(): |
| | return {"error": "Username must be alphanumeric"}, 400 |
| | |
| | if len(password) < 6: |
| | return {"error": "Password must be at least 6 characters"}, 400 |
| | |
| | |
| | username_hash = hash_username_for_storage(username) |
| | |
| | |
| | try: |
| | result = supabase.table('users').select('*').eq('username_hash', username_hash).execute() |
| | |
| | if result.data: |
| | return {"error": "Username already exists"}, 400 |
| | except Exception as e: |
| | return {"error": f"Database error: {str(e)}"}, 500 |
| | |
| | |
| | password_hash = hash_password(password) |
| | |
| | |
| | encryption_key = generate_encryption_key() |
| | |
| | |
| | try: |
| | user_data = { |
| | 'username_hash': username_hash, |
| | 'password_hash': password_hash, |
| | 'encryption_key': encryption_key, |
| | 'created_at': datetime.utcnow().isoformat(), |
| | 'is_active': True |
| | } |
| | |
| | result = supabase.table('users').insert(user_data).execute() |
| | |
| | if not result.data: |
| | return {"error": "Failed to create user"}, 500 |
| | |
| | user = result.data[0] |
| | |
| | |
| | token = create_jwt_token(user['id'], username_hash) |
| | |
| | return { |
| | "user_id": user['id'], |
| | "username": username, |
| | "username_hash": username_hash, |
| | "encryption_key": encryption_key, |
| | "token": token |
| | }, 200 |
| | |
| | except Exception as e: |
| | return {"error": f"Failed to create user: {str(e)}"}, 500 |
| |
|
| | def login_user(username: str, password: str): |
| | """ |
| | Login existing user |
| | Args: |
| | username: Plain text username |
| | password: Plain text password |
| | Returns: |
| | Tuple of (result_dict, status_code) |
| | """ |
| | supabase = get_supabase() |
| | |
| | |
| | username_hash = hash_username_for_storage(username) |
| | |
| | |
| | try: |
| | result = supabase.table('users').select('*').eq('username_hash', username_hash).execute() |
| | |
| | if not result.data: |
| | return {"error": "Invalid username or password"}, 401 |
| | |
| | user = result.data[0] |
| | |
| | |
| | if not user.get('is_active', True): |
| | return {"error": "Account is disabled"}, 403 |
| | |
| | |
| | if not verify_password(password, user['password_hash']): |
| | return {"error": "Invalid username or password"}, 401 |
| | |
| | |
| | supabase.table('users').update({ |
| | 'last_login': datetime.utcnow().isoformat() |
| | }).eq('id', user['id']).execute() |
| | |
| | |
| | token = create_jwt_token(user['id'], username_hash) |
| | |
| | return { |
| | "user_id": user['id'], |
| | "username": username, |
| | "username_hash": username_hash, |
| | "encryption_key": user['encryption_key'], |
| | "token": token |
| | }, 200 |
| | |
| | except Exception as e: |
| | return {"error": f"Login failed: {str(e)}"}, 500 |
| |
|
| | def check_username_exists(username: str): |
| | """ |
| | Check if username exists (for adding friends) |
| | Args: |
| | username: Plain text username |
| | Returns: |
| | Tuple of (result_dict, status_code) |
| | """ |
| | supabase = get_supabase() |
| | |
| | username_hash = hash_username_for_storage(username) |
| | |
| | try: |
| | result = supabase.table('users').select('id').eq('username_hash', username_hash).execute() |
| | |
| | exists = len(result.data) > 0 |
| | |
| | return { |
| | "exists": exists, |
| | "username": username if exists else None |
| | }, 200 |
| | |
| | except Exception as e: |
| | return {"error": f"Check failed: {str(e)}"}, 500 |
| |
|
| | def verify_token(token: str): |
| | """ |
| | Verify JWT token and return user info |
| | Args: |
| | token: JWT token string (may include "Bearer " prefix) |
| | Returns: |
| | User payload dict or None |
| | """ |
| | if not token: |
| | return None |
| | |
| | |
| | token = token.replace("Bearer ", "") |
| | |
| | payload = verify_jwt_token(token) |
| | |
| | if not payload: |
| | return None |
| | |
| | return payload |