""" Authentication module with JWT tokens and bcrypt password hashing """ import json import bcrypt from datetime import datetime, timedelta from typing import Optional, Dict, Any from jose import JWTError, jwt from config import settings ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES # helper for hashing/verification using bcrypt directly def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its bcrypt hash""" try: return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8")) except Exception: return False def get_password_hash(password: str) -> str: """Generate bcrypt password hash""" hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) return hashed.decode("utf-8") def get_users_data() -> Dict[str, Any]: """Load users data from JSON file""" try: if settings.USERS_JSON_PATH.exists(): with open(settings.USERS_JSON_PATH, 'r') as f: return json.load(f) except Exception as e: print(f"Error loading users data: {e}") return {"users": {}} def save_users_data(data: Dict[str, Any]): """Save users data to JSON file""" try: with open(settings.USERS_JSON_PATH, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error saving users data: {e}") raise def get_user(username: str) -> Optional[Dict[str, Any]]: """Get user by username""" data = get_users_data() user = data["users"].get(username) if user: return { "username": username, **user } return None def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]: """Authenticate user with username and password""" user = get_user(username) if not user: return None if not verify_password(password, user["hashed_password"]): return None return user def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_token(token: str) -> Optional[Dict[str, Any]]: """Decode and validate JWT token""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) username: str = payload.get("sub") if username is None: return None return payload except JWTError: return None def register_user( username: str, password: str, settings_dict: Dict[str, Any] = None ) -> Dict[str, Any]: """Register a new user""" data = get_users_data() # Check if user already exists if username in data["users"]: raise ValueError("Username already exists") # Validate username if len(username) < 3 or len(username) > 30: raise ValueError("Username must be between 3 and 30 characters") # Validate password if len(password) < 6: raise ValueError("Password must be at least 6 characters") # bcrypt has a 72-byte input limit; automatically truncate if necessary encoded = password.encode('utf-8') if len(encoded) > 72: # truncate and warn password = encoded[:72].decode('utf-8', 'ignore') print("Warning: password exceeded 72 bytes; truncated to meet bcrypt limit") # Create user hashed_password = get_password_hash(password) user_data = { "hashed_password": hashed_password, "created_at": datetime.utcnow().isoformat(), "settings": settings_dict or { "temperature": settings.TEMPERATURE, "max_tokens": settings.MAX_TOKENS, "system_prompt": "", "theme": "dark" } } data["users"][username] = user_data save_users_data(data) return { "username": username, **user_data } def update_user_settings(username: str, new_settings: Dict[str, Any]) -> bool: """Update user settings""" data = get_users_data() if username not in data["users"]: return False # Merge with existing settings current_settings = data["users"][username].get("settings", {}) current_settings.update(new_settings) data["users"][username]["settings"] = current_settings save_users_data(data) return True def get_user_settings(username: str) -> Dict[str, Any]: """Get user settings""" user = get_user(username) if user: user_settings = user.get("settings", {}) # Merge with defaults return { "temperature": user_settings.get("temperature", settings.TEMPERATURE), "max_tokens": user_settings.get("max_tokens", settings.MAX_TOKENS), "system_prompt": user_settings.get("system_prompt", ""), "theme": user_settings.get("theme", "dark") } return { "temperature": settings.TEMPERATURE, "max_tokens": settings.MAX_TOKENS, "system_prompt": "", "theme": "dark" } def change_password(username: str, old_password: str, new_password: str) -> bool: """Change user password""" # Verify old password user = authenticate_user(username, old_password) if not user: return False if len(new_password) < 6: raise ValueError("New password must be at least 6 characters") data = get_users_data() data["users"][username]["hashed_password"] = get_password_hash(new_password) save_users_data(data) return True def delete_user(username: str) -> bool: """Delete a user account""" data = get_users_data() if username not in data["users"]: return False del data["users"][username] save_users_data(data) return True def get_all_usernames() -> list: """Get list of all usernames (for admin purposes)""" data = get_users_data() return list(data["users"].keys())