Spaces:
Runtime error
Runtime error
File size: 12,101 Bytes
da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc b5ffe42 da8ce94 707d3fc da8ce94 707d3fc 8e485f9 707d3fc da8ce94 707d3fc da8ce94 b5ffe42 707d3fc da8ce94 b5ffe42 da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc da8ce94 b5ffe42 707d3fc da8ce94 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc da8ce94 707d3fc da8ce94 707d3fc 8e485f9 707d3fc da8ce94 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc da8ce94 707d3fc 8e485f9 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 707d3fc b5ffe42 da8ce94 707d3fc da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 8e485f9 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 8e485f9 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 b5ffe42 da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc da8ce94 707d3fc b5ffe42 8e485f9 da8ce94 707d3fc b5ffe42 707d3fc b5ffe42 707d3fc da8ce94 8e485f9 707d3fc da8ce94 8e485f9 707d3fc da8ce94 8e485f9 707d3fc da8ce94 707d3fc da8ce94 b5ffe42 707d3fc da8ce94 707d3fc 8e485f9 da8ce94 707d3fc da8ce94 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc 8e485f9 707d3fc da8ce94 707d3fc da8ce94 b5ffe42 da8ce94 707d3fc da8ce94 707d3fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 | """
JSON file-based database module
Handles:
- JSON file storage for users, tokens, and blacklisted tokens
- User management and authentication
- Token blacklist management
- Refresh token storage
- Thread-safe JSON operations with file locking
- Hugging Face friendly seeding + writable fallback
"""
import os
import json
import threading
import shutil
from typing import Optional, Dict, Any, List, Tuple
from pathlib import Path
from datetime import datetime
# =========================================================
# PATHS
# =========================================================
def _project_root() -> Path:
# auth/json_database.py -> auth (parent) -> project root (parent)
return Path(__file__).resolve().parent.parent
DEFAULT_DB_DIR = str((_project_root() / "data").resolve()) # <project>/data
DB_DIR_ENV = os.getenv("AUTH_DB_DIR", DEFAULT_DB_DIR)
# Seed dir (where your repo JSON files exist)
SEED_DIR_ENV = os.getenv("AUTH_SEED_DIR", DEFAULT_DB_DIR)
# Seeding controls
SEED_ON_START = os.getenv("AUTH_SEED_ON_START", "1") == "1"
SEED_IF_EMPTY = os.getenv("AUTH_SEED_IF_EMPTY", "1") == "1"
# Initialize DB files on startup
RUN_INIT_DB = os.getenv("RUN_INIT_DB", "1") == "1"
# Resolve target db dir
DB_DIR = str(Path(DB_DIR_ENV).expanduser().resolve())
SEED_DIR = str(Path(SEED_DIR_ENV).expanduser().resolve())
USERS_FILE = os.path.join(DB_DIR, "users.json")
BLACKLISTED_TOKENS_FILE = os.path.join(DB_DIR, "blacklisted_tokens.json")
REFRESH_TOKENS_FILE = os.path.join(DB_DIR, "refresh_tokens.json")
# Thread locks
_file_locks = {
"users": threading.Lock(),
"blacklisted_tokens": threading.Lock(),
"refresh_tokens": threading.Lock(),
}
_db_init_done = False
_db_init_lock = threading.Lock()
# =========================================================
# HELPERS
# =========================================================
def _log(msg: str):
# Simple stdout logging (Hugging Face logs show this)
print(msg, flush=True)
def ensure_data_directory():
"""
Create DB directory. If not writable, fallback to /tmp/auth_db.
"""
global DB_DIR, USERS_FILE, BLACKLISTED_TOKENS_FILE, REFRESH_TOKENS_FILE
try:
Path(DB_DIR).mkdir(parents=True, exist_ok=True)
# write test
test_file = os.path.join(DB_DIR, ".write_test")
with open(test_file, "w", encoding="utf-8") as f:
f.write("ok")
os.remove(test_file)
return
except Exception as e:
_log(f"[AUTH][WARN] DB_DIR not writable: {DB_DIR}. Error: {e}")
fallback = "/tmp/auth_db"
_log(f"[AUTH][WARN] Falling back to: {fallback}")
DB_DIR = fallback
Path(DB_DIR).mkdir(parents=True, exist_ok=True)
USERS_FILE = os.path.join(DB_DIR, "users.json")
BLACKLISTED_TOKENS_FILE = os.path.join(DB_DIR, "blacklisted_tokens.json")
REFRESH_TOKENS_FILE = os.path.join(DB_DIR, "refresh_tokens.json")
def load_json_file(filepath: str) -> Dict[str, Any]:
"""
Load JSON file with error handling.
Returns {} if file does not exist.
"""
if not os.path.exists(filepath):
return {}
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
_log(f"[AUTH][ERROR] Error loading {filepath}: {e}")
return {}
def save_json_file(filepath: str, data: Dict[str, Any]):
"""
Atomic save JSON file (prevents partial writes).
"""
parent = os.path.dirname(filepath)
os.makedirs(parent, exist_ok=True)
tmp_path = filepath + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
os.replace(tmp_path, filepath)
except Exception as e:
_log(f"[AUTH][ERROR] Error saving {filepath}: {e}")
# cleanup temp if exists
try:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except Exception:
pass
raise
def _file_is_empty_json(filepath: str) -> bool:
if not os.path.exists(filepath):
return True
try:
data = load_json_file(filepath)
return not bool(data)
except Exception:
return True
def _seed_file_if_needed(target_path: str, seed_path: str):
"""
Copy seed file -> target if:
- target missing, OR
- target exists but empty AND SEED_IF_EMPTY=1
"""
if not SEED_ON_START:
return
seed_exists = os.path.exists(seed_path)
target_exists = os.path.exists(target_path)
if (not target_exists and seed_exists) or (target_exists and SEED_IF_EMPTY and seed_exists and _file_is_empty_json(target_path)):
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(seed_path, target_path)
_log(f"[AUTH][SEED] Copied seed: {seed_path} -> {target_path}")
# =========================================================
# STORAGE CLASSES
# =========================================================
class JSONUsers:
@staticmethod
def load_all() -> Dict[str, Any]:
with _file_locks["users"]:
return load_json_file(USERS_FILE)
@staticmethod
def find_by_username(username: str) -> Optional[Dict[str, Any]]:
users = JSONUsers.load_all()
return users.get(username.lower())
@staticmethod
def create_user(username: str, password_hash: str, role: str = "user") -> bool:
username_lower = username.lower()
with _file_locks["users"]:
users = load_json_file(USERS_FILE)
if username_lower in users:
return False
users[username_lower] = {
"id": len(users) + 1,
"username": username_lower,
"password_hash": password_hash,
"role": role,
"created_at": datetime.now().isoformat(),
}
save_json_file(USERS_FILE, users)
return True
@staticmethod
def get_all_users() -> List[Dict[str, Any]]:
users = JSONUsers.load_all()
return [
{"id": u.get("id"), "username": u.get("username"), "role": u.get("role")}
for u in users.values()
]
@staticmethod
def promote_to_admin(username: str) -> bool:
username_lower = username.lower()
with _file_locks["users"]:
users = load_json_file(USERS_FILE)
if username_lower not in users:
return False
users[username_lower]["role"] = "admin"
save_json_file(USERS_FILE, users)
return True
@staticmethod
def user_count() -> int:
return len(JSONUsers.load_all())
class JSONBlacklistedTokens:
@staticmethod
def load_all() -> Dict[str, Any]:
with _file_locks["blacklisted_tokens"]:
return load_json_file(BLACKLISTED_TOKENS_FILE)
@staticmethod
def is_blacklisted(token: str) -> bool:
tokens = JSONBlacklistedTokens.load_all()
return token in tokens
@staticmethod
def add_to_blacklist(token: str) -> bool:
with _file_locks["blacklisted_tokens"]:
tokens = load_json_file(BLACKLISTED_TOKENS_FILE)
if token in tokens:
return True
tokens[token] = {"token": token, "created_at": datetime.now().isoformat()}
save_json_file(BLACKLISTED_TOKENS_FILE, tokens)
return True
class JSONRefreshTokens:
@staticmethod
def load_all() -> Dict[str, Any]:
with _file_locks["refresh_tokens"]:
return load_json_file(REFRESH_TOKENS_FILE)
@staticmethod
def find_by_token(token: str) -> Optional[str]:
tokens = JSONRefreshTokens.load_all()
for token_data in tokens.values():
if token_data.get("token") == token:
return token_data.get("username")
return None
@staticmethod
def create_token(username: str, token: str) -> bool:
with _file_locks["refresh_tokens"]:
tokens = load_json_file(REFRESH_TOKENS_FILE)
token_key = f"{username.lower()}_{len(tokens) + 1}"
tokens[token_key] = {
"username": username.lower(),
"token": token,
"created_at": datetime.now().isoformat(),
}
save_json_file(REFRESH_TOKENS_FILE, tokens)
return True
@staticmethod
def delete_user_tokens(username: str) -> bool:
username_lower = username.lower()
with _file_locks["refresh_tokens"]:
tokens = load_json_file(REFRESH_TOKENS_FILE)
to_remove = [k for k, v in tokens.items() if v.get("username") == username_lower]
for k in to_remove:
del tokens[k]
save_json_file(REFRESH_TOKENS_FILE, tokens)
return True
# =========================================================
# INIT / DIAG
# =========================================================
def init_db():
"""
Initialize JSON database files.
If AUTH_SEED_ON_START=1, seed from AUTH_SEED_DIR when missing or empty.
"""
ensure_data_directory()
# seed paths
seed_users = os.path.join(SEED_DIR, "users.json")
seed_blacklist = os.path.join(SEED_DIR, "blacklisted_tokens.json")
seed_refresh = os.path.join(SEED_DIR, "refresh_tokens.json")
# Seed (if needed)
_seed_file_if_needed(USERS_FILE, seed_users)
_seed_file_if_needed(BLACKLISTED_TOKENS_FILE, seed_blacklist)
_seed_file_if_needed(REFRESH_TOKENS_FILE, seed_refresh)
# Ensure files exist (create empty if still missing)
if not os.path.exists(USERS_FILE):
save_json_file(USERS_FILE, {})
_log(f"[AUTH][INIT] Created empty {USERS_FILE}")
if not os.path.exists(BLACKLISTED_TOKENS_FILE):
save_json_file(BLACKLISTED_TOKENS_FILE, {})
_log(f"[AUTH][INIT] Created empty {BLACKLISTED_TOKENS_FILE}")
if not os.path.exists(REFRESH_TOKENS_FILE):
save_json_file(REFRESH_TOKENS_FILE, {})
_log(f"[AUTH][INIT] Created empty {REFRESH_TOKENS_FILE}")
# Startup log (very useful in HF logs)
try:
users_count = len(load_json_file(USERS_FILE))
_log(f"[AUTH][READY] DB_DIR={DB_DIR} | users={users_count}")
except Exception as e:
_log(f"[AUTH][WARN] Could not read users count: {e}")
def ensure_database_initialized() -> bool:
"""
Ensure JSON DB is initialized once (thread-safe).
Controlled by RUN_INIT_DB.
"""
global _db_init_done
if not RUN_INIT_DB:
return False
if not _db_init_done:
with _db_init_lock:
if not _db_init_done:
init_db()
_db_init_done = True
return True
def get_database_info() -> Dict[str, Any]:
info = {
"database_type": "JSON",
"storage_location": DB_DIR,
"seed_location": SEED_DIR,
"files": {},
}
def _file_info(path: str, label: str):
if not os.path.exists(path):
info["files"][label] = {"exists": False}
return
data = load_json_file(path)
info["files"][label] = {
"exists": True,
"path": path,
"size_bytes": os.path.getsize(path),
"count": len(data) if isinstance(data, dict) else 0,
}
try:
_file_info(USERS_FILE, "users")
_file_info(BLACKLISTED_TOKENS_FILE, "blacklisted_tokens")
_file_info(REFRESH_TOKENS_FILE, "refresh_tokens")
info["connection_status"] = "ok"
except Exception as e:
info["connection_status"] = "error"
info["error"] = str(e)
return info
def test_database_connection() -> Tuple[bool, str]:
try:
ensure_data_directory()
_ = JSONUsers.load_all()
test_file = os.path.join(DB_DIR, ".test")
save_json_file(test_file, {"test": "ok"})
os.remove(test_file)
return True, "JSON database connection successful"
except Exception as e:
return False, f"JSON database connection failed: {str(e)}"
|