Sharelock / auth.py
mike23415's picture
Update auth.py
028494e verified
# auth.py
"""
Authentication logic: signup, login, token management
"""
from flask import jsonify
from database import get_supabase
from crypto_utils import (
hash_password,
verify_password,
hash_username_for_storage,
generate_encryption_key
)
import jwt
from datetime import datetime, timedelta
import os
# JWT Configuration
JWT_SECRET = os.getenv("JWT_SECRET", "change-this-secret-in-production")
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_DAYS = 7
def create_jwt_token(user_id: str, username_hash: str) -> str:
"""
Create JWT token for authenticated user
Args:
user_id: User UUID from database
username_hash: Hashed username
Returns:
JWT token string
"""
expiration = datetime.utcnow() + timedelta(days=JWT_EXPIRATION_DAYS)
payload = {
"user_id": user_id,
"username_hash": username_hash,
"exp": expiration,
"iat": datetime.utcnow()
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token
def verify_jwt_token(token: str) -> dict:
"""
Verify and decode JWT token
Args:
token: JWT token string
Returns:
Decoded payload dict or None if invalid
"""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
print("Token expired")
return None
except jwt.InvalidTokenError:
print("Invalid token")
return None
def signup_user(username: str, password: str):
"""
Register new user
Args:
username: Plain text username
password: Plain text password
Returns:
Tuple of (result_dict, status_code)
"""
supabase = get_supabase()
# Validate input
if not username or not password:
return {"error": "Username and password required"}, 400
if len(username) < 3 or len(username) > 20:
return {"error": "Username must be 3-20 characters"}, 400
if not username.isalnum():
return {"error": "Username must be alphanumeric"}, 400
if len(password) < 6:
return {"error": "Password must be at least 6 characters"}, 400
# Create username hash for storage
username_hash = hash_username_for_storage(username)
# Check if username exists
try:
result = supabase.table('users').select('*').eq('username_hash', username_hash).execute()
if result.data:
return {"error": "Username already exists"}, 400
except Exception as e:
return {"error": f"Database error: {str(e)}"}, 500
# Hash password
password_hash = hash_password(password)
# Generate encryption key
encryption_key = generate_encryption_key()
# Insert user
try:
user_data = {
'username_hash': username_hash,
'password_hash': password_hash,
'encryption_key': encryption_key,
'created_at': datetime.utcnow().isoformat(),
'is_active': True
}
result = supabase.table('users').insert(user_data).execute()
if not result.data:
return {"error": "Failed to create user"}, 500
user = result.data[0]
# Create JWT token
token = create_jwt_token(user['id'], username_hash)
return {
"user_id": user['id'],
"username": username,
"username_hash": username_hash,
"encryption_key": encryption_key,
"token": token
}, 200
except Exception as e:
return {"error": f"Failed to create user: {str(e)}"}, 500
def login_user(username: str, password: str):
"""
Login existing user
Args:
username: Plain text username
password: Plain text password
Returns:
Tuple of (result_dict, status_code)
"""
supabase = get_supabase()
# Create username hash
username_hash = hash_username_for_storage(username)
# Get user from database
try:
result = supabase.table('users').select('*').eq('username_hash', username_hash).execute()
if not result.data:
return {"error": "Invalid username or password"}, 401
user = result.data[0]
# Check if account is active
if not user.get('is_active', True):
return {"error": "Account is disabled"}, 403
# Verify password
if not verify_password(password, user['password_hash']):
return {"error": "Invalid username or password"}, 401
# Update last login
supabase.table('users').update({
'last_login': datetime.utcnow().isoformat()
}).eq('id', user['id']).execute()
# Create JWT token
token = create_jwt_token(user['id'], username_hash)
return {
"user_id": user['id'],
"username": username,
"username_hash": username_hash,
"encryption_key": user['encryption_key'],
"token": token
}, 200
except Exception as e:
return {"error": f"Login failed: {str(e)}"}, 500
def check_username_exists(username: str):
"""
Check if username exists (for adding friends)
Args:
username: Plain text username
Returns:
Tuple of (result_dict, status_code)
"""
supabase = get_supabase()
username_hash = hash_username_for_storage(username)
try:
result = supabase.table('users').select('id').eq('username_hash', username_hash).execute()
exists = len(result.data) > 0
return {
"exists": exists,
"username": username if exists else None
}, 200
except Exception as e:
return {"error": f"Check failed: {str(e)}"}, 500
def verify_token(token: str):
"""
Verify JWT token and return user info
Args:
token: JWT token string (may include "Bearer " prefix)
Returns:
User payload dict or None
"""
if not token:
return None
# Remove "Bearer " prefix if present
token = token.replace("Bearer ", "")
payload = verify_jwt_token(token)
if not payload:
return None
return payload