File size: 6,335 Bytes
2c31cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
Authentication module with JWT tokens and bcrypt password hashing
"""
import json
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from config import settings

ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES

# helper for hashing/verification using bcrypt directly

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify a password against its bcrypt hash"""
    try:
        return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
    except Exception:
        return False


def get_password_hash(password: str) -> str:
    """Generate bcrypt password hash"""
    hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
    return hashed.decode("utf-8")


def get_users_data() -> Dict[str, Any]:
    """Load users data from JSON file"""
    try:
        if settings.USERS_JSON_PATH.exists():
            with open(settings.USERS_JSON_PATH, 'r') as f:
                return json.load(f)
    except Exception as e:
        print(f"Error loading users data: {e}")
    return {"users": {}}

def save_users_data(data: Dict[str, Any]):
    """Save users data to JSON file"""
    try:
        with open(settings.USERS_JSON_PATH, 'w') as f:
            json.dump(data, f, indent=2)
    except Exception as e:
        print(f"Error saving users data: {e}")
        raise

def get_user(username: str) -> Optional[Dict[str, Any]]:
    """Get user by username"""
    data = get_users_data()
    user = data["users"].get(username)
    if user:
        return {
            "username": username,
            **user
        }
    return None

def authenticate_user(username: str, password: str) -> Optional[Dict[str, Any]]:
    """Authenticate user with username and password"""
    user = get_user(username)
    if not user:
        return None
    if not verify_password(password, user["hashed_password"]):
        return None
    return user

def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
    """Create JWT access token"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

def decode_token(token: str) -> Optional[Dict[str, Any]]:
    """Decode and validate JWT token"""
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            return None
        return payload
    except JWTError:
        return None

def register_user(
    username: str, 
    password: str, 
    settings_dict: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Register a new user"""
    data = get_users_data()
    
    # Check if user already exists
    if username in data["users"]:
        raise ValueError("Username already exists")
    
    # Validate username
    if len(username) < 3 or len(username) > 30:
        raise ValueError("Username must be between 3 and 30 characters")
    
    # Validate password
    if len(password) < 6:
        raise ValueError("Password must be at least 6 characters")
    # bcrypt has a 72-byte input limit; automatically truncate if necessary
    encoded = password.encode('utf-8')
    if len(encoded) > 72:
        # truncate and warn
        password = encoded[:72].decode('utf-8', 'ignore')
        print("Warning: password exceeded 72 bytes; truncated to meet bcrypt limit")
    
    # Create user
    hashed_password = get_password_hash(password)
    user_data = {
        "hashed_password": hashed_password,
        "created_at": datetime.utcnow().isoformat(),
        "settings": settings_dict or {
            "temperature": settings.TEMPERATURE,
            "max_tokens": settings.MAX_TOKENS,
            "system_prompt": "",
            "theme": "dark"
        }
    }
    
    data["users"][username] = user_data
    save_users_data(data)
    
    return {
        "username": username,
        **user_data
    }

def update_user_settings(username: str, new_settings: Dict[str, Any]) -> bool:
    """Update user settings"""
    data = get_users_data()
    
    if username not in data["users"]:
        return False
    
    # Merge with existing settings
    current_settings = data["users"][username].get("settings", {})
    current_settings.update(new_settings)
    data["users"][username]["settings"] = current_settings
    
    save_users_data(data)
    return True

def get_user_settings(username: str) -> Dict[str, Any]:
    """Get user settings"""
    user = get_user(username)
    if user:
        user_settings = user.get("settings", {})
        # Merge with defaults
        return {
            "temperature": user_settings.get("temperature", settings.TEMPERATURE),
            "max_tokens": user_settings.get("max_tokens", settings.MAX_TOKENS),
            "system_prompt": user_settings.get("system_prompt", ""),
            "theme": user_settings.get("theme", "dark")
        }
    return {
        "temperature": settings.TEMPERATURE,
        "max_tokens": settings.MAX_TOKENS,
        "system_prompt": "",
        "theme": "dark"
    }

def change_password(username: str, old_password: str, new_password: str) -> bool:
    """Change user password"""
    # Verify old password
    user = authenticate_user(username, old_password)
    if not user:
        return False
    
    if len(new_password) < 6:
        raise ValueError("New password must be at least 6 characters")
    
    data = get_users_data()
    data["users"][username]["hashed_password"] = get_password_hash(new_password)
    save_users_data(data)
    
    return True

def delete_user(username: str) -> bool:
    """Delete a user account"""
    data = get_users_data()
    
    if username not in data["users"]:
        return False
    
    del data["users"][username]
    save_users_data(data)
    return True

def get_all_usernames() -> list:
    """Get list of all usernames (for admin purposes)"""
    data = get_users_data()
    return list(data["users"].keys())