gakrchat1 / backend /auth.py
extraplus's picture
Upload 8 files
2c31cbe verified
"""
Authentication module with JWT tokens and bcrypt password hashing
"""
import json
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from config import settings
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
# helper for hashing/verification using bcrypt directly
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its bcrypt hash"""
try:
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
except Exception:
return False
def get_password_hash(password: str) -> str:
"""Generate bcrypt password hash"""
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
return hashed.decode("utf-8")
def get_users_data() -> Dict[str, Any]:
"""Load users data from JSON file"""
try:
if settings.USERS_JSON_PATH.exists():
with open(settings.USERS_JSON_PATH, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading users data: {e}")
return {"users": {}}
def save_users_data(data: Dict[str, Any]):
"""Save users data to JSON file"""
try:
with open(settings.USERS_JSON_PATH, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving users data: {e}")
raise
def get_user(username: str) -> Optional[Dict[str, Any]]:
"""Get user by username"""
data = get_users_data()
user = data["users"].get(username)
if user:
return {
"username": username,
**user
}
return None
def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate user with username and password"""
user = get_user(username)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[Dict[str, Any]]:
"""Decode and validate JWT token"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return payload
except JWTError:
return None
def register_user(
username: str,
password: str,
settings_dict: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Register a new user"""
data = get_users_data()
# Check if user already exists
if username in data["users"]:
raise ValueError("Username already exists")
# Validate username
if len(username) < 3 or len(username) > 30:
raise ValueError("Username must be between 3 and 30 characters")
# Validate password
if len(password) < 6:
raise ValueError("Password must be at least 6 characters")
# bcrypt has a 72-byte input limit; automatically truncate if necessary
encoded = password.encode('utf-8')
if len(encoded) > 72:
# truncate and warn
password = encoded[:72].decode('utf-8', 'ignore')
print("Warning: password exceeded 72 bytes; truncated to meet bcrypt limit")
# Create user
hashed_password = get_password_hash(password)
user_data = {
"hashed_password": hashed_password,
"created_at": datetime.utcnow().isoformat(),
"settings": settings_dict or {
"temperature": settings.TEMPERATURE,
"max_tokens": settings.MAX_TOKENS,
"system_prompt": "",
"theme": "dark"
}
}
data["users"][username] = user_data
save_users_data(data)
return {
"username": username,
**user_data
}
def update_user_settings(username: str, new_settings: Dict[str, Any]) -> bool:
"""Update user settings"""
data = get_users_data()
if username not in data["users"]:
return False
# Merge with existing settings
current_settings = data["users"][username].get("settings", {})
current_settings.update(new_settings)
data["users"][username]["settings"] = current_settings
save_users_data(data)
return True
def get_user_settings(username: str) -> Dict[str, Any]:
"""Get user settings"""
user = get_user(username)
if user:
user_settings = user.get("settings", {})
# Merge with defaults
return {
"temperature": user_settings.get("temperature", settings.TEMPERATURE),
"max_tokens": user_settings.get("max_tokens", settings.MAX_TOKENS),
"system_prompt": user_settings.get("system_prompt", ""),
"theme": user_settings.get("theme", "dark")
}
return {
"temperature": settings.TEMPERATURE,
"max_tokens": settings.MAX_TOKENS,
"system_prompt": "",
"theme": "dark"
}
def change_password(username: str, old_password: str, new_password: str) -> bool:
"""Change user password"""
# Verify old password
user = authenticate_user(username, old_password)
if not user:
return False
if len(new_password) < 6:
raise ValueError("New password must be at least 6 characters")
data = get_users_data()
data["users"][username]["hashed_password"] = get_password_hash(new_password)
save_users_data(data)
return True
def delete_user(username: str) -> bool:
"""Delete a user account"""
data = get_users_data()
if username not in data["users"]:
return False
del data["users"][username]
save_users_data(data)
return True
def get_all_usernames() -> list:
"""Get list of all usernames (for admin purposes)"""
data = get_users_data()
return list(data["users"].keys())