| """ |
| Shared utilities: logging, IDs, safe paths. |
| """ |
| import hashlib |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
| from uuid import uuid4 |
|
|
| from backend.config import DATA_ROOT, LOGS_DIR, MOCK_USER |
|
|
| |
| _log_file = LOGS_DIR / "app.log" |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| handlers=[ |
| logging.FileHandler(_log_file, encoding="utf-8"), |
| logging.StreamHandler(), |
| ], |
| ) |
| logger = logging.getLogger("notebooklm") |
|
|
|
|
| def get_username_from_request(request: Any) -> str: |
| """ |
| Derive username from Gradio request (HF OAuth) or MOCK_USER. |
| request may be None in local dev or when Gradio doesn't pass it. |
| """ |
| if MOCK_USER: |
| return MOCK_USER |
| if request is not None: |
| |
| if hasattr(request, "username") and request.username: |
| return str(request.username).strip() |
| if hasattr(request, "user") and request.user: |
| u = request.user |
| if isinstance(u, dict) and u.get("username"): |
| return str(u["username"]).strip() |
| if hasattr(u, "username"): |
| return str(u.username).strip() |
| |
| if hasattr(request, "headers"): |
| h = getattr(request, "headers", {}) or {} |
| if isinstance(h, dict) and h.get("x-username"): |
| return str(h["x-username"]).strip() |
| return "anonymous" |
|
|
|
|
| def user_data_dir(username: str) -> Path: |
| """Path to /data/users/<username>. Validates no path escape.""" |
| username = (username or "").strip() or "anonymous" |
| if ".." in username or "/" in username or "\\" in username: |
| username = "anonymous" |
| root = DATA_ROOT.resolve() |
| path = (root / "users" / username).resolve() |
| if not str(path).startswith(str(root)): |
| path = root / "users" / "anonymous" |
| return path |
|
|
|
|
| def ensure_dir(path: Path) -> Path: |
| path.mkdir(parents=True, exist_ok=True) |
| return path |
|
|
|
|
| def new_uuid() -> str: |
| return str(uuid4()) |
|
|
|
|
| def file_hash(path: Path) -> str: |
| """SHA256 hash of file for dedupe.""" |
| h = hashlib.sha256() |
| with open(path, "rb") as f: |
| for chunk in iter(lambda: f.read(8192), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
|
|
| def read_json(path: Path, default: Any = None) -> Any: |
| if default is None: |
| default = {} |
| if not path.exists(): |
| return default |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except (json.JSONDecodeError, OSError) as e: |
| logger.warning("read_json %s: %s", path, e) |
| return default |
|
|
|
|
| def write_json(path: Path, data: Any) -> None: |
| ensure_dir(path.parent) |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
| def read_jsonl(path: Path) -> List[Dict[str, Any]]: |
| if not path.exists(): |
| return [] |
| out: List[Dict[str, Any]] = [] |
| with open(path, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| out.append(json.loads(line)) |
| except json.JSONDecodeError: |
| continue |
| return out |
|
|
|
|
| def append_jsonl(path: Path, record: Dict[str, Any]) -> None: |
| ensure_dir(path.parent) |
| with open(path, "a", encoding="utf-8") as f: |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
|
|
|
|
| def normalize_text(text: str) -> str: |
| """Normalize whitespace and strip.""" |
| if not text: |
| return "" |
| return " ".join(text.split()).strip() |
|
|