File size: 6,335 Bytes
2c31cbe | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | """
Authentication module with JWT tokens and bcrypt password hashing
"""
import json
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from config import settings
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
# helper for hashing/verification using bcrypt directly
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its bcrypt hash"""
try:
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
except Exception:
return False
def get_password_hash(password: str) -> str:
"""Generate bcrypt password hash"""
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
return hashed.decode("utf-8")
def get_users_data() -> Dict[str, Any]:
"""Load users data from JSON file"""
try:
if settings.USERS_JSON_PATH.exists():
with open(settings.USERS_JSON_PATH, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading users data: {e}")
return {"users": {}}
def save_users_data(data: Dict[str, Any]):
"""Save users data to JSON file"""
try:
with open(settings.USERS_JSON_PATH, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving users data: {e}")
raise
def get_user(username: str) -> Optional[Dict[str, Any]]:
"""Get user by username"""
data = get_users_data()
user = data["users"].get(username)
if user:
return {
"username": username,
**user
}
return None
def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate user with username and password"""
user = get_user(username)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[Dict[str, Any]]:
"""Decode and validate JWT token"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return payload
except JWTError:
return None
def register_user(
username: str,
password: str,
settings_dict: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Register a new user"""
data = get_users_data()
# Check if user already exists
if username in data["users"]:
raise ValueError("Username already exists")
# Validate username
if len(username) < 3 or len(username) > 30:
raise ValueError("Username must be between 3 and 30 characters")
# Validate password
if len(password) < 6:
raise ValueError("Password must be at least 6 characters")
# bcrypt has a 72-byte input limit; automatically truncate if necessary
encoded = password.encode('utf-8')
if len(encoded) > 72:
# truncate and warn
password = encoded[:72].decode('utf-8', 'ignore')
print("Warning: password exceeded 72 bytes; truncated to meet bcrypt limit")
# Create user
hashed_password = get_password_hash(password)
user_data = {
"hashed_password": hashed_password,
"created_at": datetime.utcnow().isoformat(),
"settings": settings_dict or {
"temperature": settings.TEMPERATURE,
"max_tokens": settings.MAX_TOKENS,
"system_prompt": "",
"theme": "dark"
}
}
data["users"][username] = user_data
save_users_data(data)
return {
"username": username,
**user_data
}
def update_user_settings(username: str, new_settings: Dict[str, Any]) -> bool:
"""Update user settings"""
data = get_users_data()
if username not in data["users"]:
return False
# Merge with existing settings
current_settings = data["users"][username].get("settings", {})
current_settings.update(new_settings)
data["users"][username]["settings"] = current_settings
save_users_data(data)
return True
def get_user_settings(username: str) -> Dict[str, Any]:
"""Get user settings"""
user = get_user(username)
if user:
user_settings = user.get("settings", {})
# Merge with defaults
return {
"temperature": user_settings.get("temperature", settings.TEMPERATURE),
"max_tokens": user_settings.get("max_tokens", settings.MAX_TOKENS),
"system_prompt": user_settings.get("system_prompt", ""),
"theme": user_settings.get("theme", "dark")
}
return {
"temperature": settings.TEMPERATURE,
"max_tokens": settings.MAX_TOKENS,
"system_prompt": "",
"theme": "dark"
}
def change_password(username: str, old_password: str, new_password: str) -> bool:
"""Change user password"""
# Verify old password
user = authenticate_user(username, old_password)
if not user:
return False
if len(new_password) < 6:
raise ValueError("New password must be at least 6 characters")
data = get_users_data()
data["users"][username]["hashed_password"] = get_password_hash(new_password)
save_users_data(data)
return True
def delete_user(username: str) -> bool:
"""Delete a user account"""
data = get_users_data()
if username not in data["users"]:
return False
del data["users"][username]
save_users_data(data)
return True
def get_all_usernames() -> list:
"""Get list of all usernames (for admin purposes)"""
data = get_users_data()
return list(data["users"].keys())
|