| """ |
| Authentication module with JWT tokens and bcrypt password hashing |
| """ |
| import json |
| import bcrypt |
| from datetime import datetime, timedelta |
| from typing import Optional, Dict, Any |
| from jose import JWTError, jwt |
| from config import settings |
|
|
| ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES |
|
|
| |
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify a password against its bcrypt hash""" |
| try: |
| return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8")) |
| except Exception: |
| return False |
|
|
|
|
| def get_password_hash(password: str) -> str: |
| """Generate bcrypt password hash""" |
| hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) |
| return hashed.decode("utf-8") |
|
|
|
|
| def get_users_data() -> Dict[str, Any]: |
| """Load users data from JSON file""" |
| try: |
| if settings.USERS_JSON_PATH.exists(): |
| with open(settings.USERS_JSON_PATH, 'r') as f: |
| return json.load(f) |
| except Exception as e: |
| print(f"Error loading users data: {e}") |
| return {"users": {}} |
|
|
| def save_users_data(data: Dict[str, Any]): |
| """Save users data to JSON file""" |
| try: |
| with open(settings.USERS_JSON_PATH, 'w') as f: |
| json.dump(data, f, indent=2) |
| except Exception as e: |
| print(f"Error saving users data: {e}") |
| raise |
|
|
| def get_user(username: str) -> Optional[Dict[str, Any]]: |
| """Get user by username""" |
| data = get_users_data() |
| user = data["users"].get(username) |
| if user: |
| return { |
| "username": username, |
| **user |
| } |
| return None |
|
|
| def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]: |
| """Authenticate user with username and password""" |
| user = get_user(username) |
| if not user: |
| return None |
| if not verify_password(password, user["hashed_password"]): |
| return None |
| return user |
|
|
| def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: |
| """Create JWT access token""" |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.utcnow() + expires_delta |
| else: |
| expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
| |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) |
| return encoded_jwt |
|
|
| def decode_token(token: str) -> Optional[Dict[str, Any]]: |
| """Decode and validate JWT token""" |
| try: |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) |
| username: str = payload.get("sub") |
| if username is None: |
| return None |
| return payload |
| except JWTError: |
| return None |
|
|
| def register_user( |
| username: str, |
| password: str, |
| settings_dict: Dict[str, Any] = None |
| ) -> Dict[str, Any]: |
| """Register a new user""" |
| data = get_users_data() |
| |
| |
| if username in data["users"]: |
| raise ValueError("Username already exists") |
| |
| |
| if len(username) < 3 or len(username) > 30: |
| raise ValueError("Username must be between 3 and 30 characters") |
| |
| |
| if len(password) < 6: |
| raise ValueError("Password must be at least 6 characters") |
| |
| encoded = password.encode('utf-8') |
| if len(encoded) > 72: |
| |
| password = encoded[:72].decode('utf-8', 'ignore') |
| print("Warning: password exceeded 72 bytes; truncated to meet bcrypt limit") |
| |
| |
| hashed_password = get_password_hash(password) |
| user_data = { |
| "hashed_password": hashed_password, |
| "created_at": datetime.utcnow().isoformat(), |
| "settings": settings_dict or { |
| "temperature": settings.TEMPERATURE, |
| "max_tokens": settings.MAX_TOKENS, |
| "system_prompt": "", |
| "theme": "dark" |
| } |
| } |
| |
| data["users"][username] = user_data |
| save_users_data(data) |
| |
| return { |
| "username": username, |
| **user_data |
| } |
|
|
| def update_user_settings(username: str, new_settings: Dict[str, Any]) -> bool: |
| """Update user settings""" |
| data = get_users_data() |
| |
| if username not in data["users"]: |
| return False |
| |
| |
| current_settings = data["users"][username].get("settings", {}) |
| current_settings.update(new_settings) |
| data["users"][username]["settings"] = current_settings |
| |
| save_users_data(data) |
| return True |
|
|
| def get_user_settings(username: str) -> Dict[str, Any]: |
| """Get user settings""" |
| user = get_user(username) |
| if user: |
| user_settings = user.get("settings", {}) |
| |
| return { |
| "temperature": user_settings.get("temperature", settings.TEMPERATURE), |
| "max_tokens": user_settings.get("max_tokens", settings.MAX_TOKENS), |
| "system_prompt": user_settings.get("system_prompt", ""), |
| "theme": user_settings.get("theme", "dark") |
| } |
| return { |
| "temperature": settings.TEMPERATURE, |
| "max_tokens": settings.MAX_TOKENS, |
| "system_prompt": "", |
| "theme": "dark" |
| } |
|
|
| def change_password(username: str, old_password: str, new_password: str) -> bool: |
| """Change user password""" |
| |
| user = authenticate_user(username, old_password) |
| if not user: |
| return False |
| |
| if len(new_password) < 6: |
| raise ValueError("New password must be at least 6 characters") |
| |
| data = get_users_data() |
| data["users"][username]["hashed_password"] = get_password_hash(new_password) |
| save_users_data(data) |
| |
| return True |
|
|
| def delete_user(username: str) -> bool: |
| """Delete a user account""" |
| data = get_users_data() |
| |
| if username not in data["users"]: |
| return False |
| |
| del data["users"][username] |
| save_users_data(data) |
| return True |
|
|
| def get_all_usernames() -> list: |
| """Get list of all usernames (for admin purposes)""" |
| data = get_users_data() |
| return list(data["users"].keys()) |
|
|