""" Shared utilities: logging, IDs, safe paths. """ import hashlib import json import logging import os from pathlib import Path from typing import Any, Dict, List, Optional from uuid import uuid4 from backend.config import DATA_ROOT, LOGS_DIR, MOCK_USER # Logging: write to file and console _log_file = LOGS_DIR / "app.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler(_log_file, encoding="utf-8"), logging.StreamHandler(), ], ) logger = logging.getLogger("notebooklm") def get_username_from_request(request: Any) -> str: """ Derive username from Gradio request (HF OAuth) or MOCK_USER. request may be None in local dev or when Gradio doesn't pass it. """ if MOCK_USER: return MOCK_USER if request is not None: # Gradio 4.x: request can have .username from HF OAuth if hasattr(request, "username") and request.username: return str(request.username).strip() if hasattr(request, "user") and request.user: u = request.user if isinstance(u, dict) and u.get("username"): return str(u["username"]).strip() if hasattr(u, "username"): return str(u.username).strip() # Some setups pass username in headers if hasattr(request, "headers"): h = getattr(request, "headers", {}) or {} if isinstance(h, dict) and h.get("x-username"): return str(h["x-username"]).strip() return "anonymous" def user_data_dir(username: str) -> Path: """Path to /data/users/. Validates no path escape.""" username = (username or "").strip() or "anonymous" if ".." in username or "/" in username or "\\" in username: username = "anonymous" root = DATA_ROOT.resolve() path = (root / "users" / username).resolve() if not str(path).startswith(str(root)): path = root / "users" / "anonymous" return path def ensure_dir(path: Path) -> Path: path.mkdir(parents=True, exist_ok=True) return path def new_uuid() -> str: return str(uuid4()) def file_hash(path: Path) -> str: """SHA256 hash of file for dedupe.""" h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def read_json(path: Path, default: Any = None) -> Any: if default is None: default = {} if not path.exists(): return default try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError) as e: logger.warning("read_json %s: %s", path, e) return default def write_json(path: Path, data: Any) -> None: ensure_dir(path.parent) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) def read_jsonl(path: Path) -> List[Dict[str, Any]]: if not path.exists(): return [] out: List[Dict[str, Any]] = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: out.append(json.loads(line)) except json.JSONDecodeError: continue return out def append_jsonl(path: Path, record: Dict[str, Any]) -> None: ensure_dir(path.parent) with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def normalize_text(text: str) -> str: """Normalize whitespace and strip.""" if not text: return "" return " ".join(text.split()).strip()