File size: 3,774 Bytes
9c9ce67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Shared utilities: logging, IDs, safe paths.
"""
import hashlib
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import uuid4

from backend.config import DATA_ROOT, LOGS_DIR, MOCK_USER

# Logging: write to file and console
_log_file = LOGS_DIR / "app.log"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.FileHandler(_log_file, encoding="utf-8"),
        logging.StreamHandler(),
    ],
)
logger = logging.getLogger("notebooklm")


def get_username_from_request(request: Any) -> str:
    """
    Derive username from Gradio request (HF OAuth) or MOCK_USER.
    request may be None in local dev or when Gradio doesn't pass it.
    """
    if MOCK_USER:
        return MOCK_USER
    if request is not None:
        # Gradio 4.x: request can have .username from HF OAuth
        if hasattr(request, "username") and request.username:
            return str(request.username).strip()
        if hasattr(request, "user") and request.user:
            u = request.user
            if isinstance(u, dict) and u.get("username"):
                return str(u["username"]).strip()
            if hasattr(u, "username"):
                return str(u.username).strip()
        # Some setups pass username in headers
        if hasattr(request, "headers"):
            h = getattr(request, "headers", {}) or {}
            if isinstance(h, dict) and h.get("x-username"):
                return str(h["x-username"]).strip()
    return "anonymous"


def user_data_dir(username: str) -> Path:
    """Path to /data/users/<username>. Validates no path escape."""
    username = (username or "").strip() or "anonymous"
    if ".." in username or "/" in username or "\\" in username:
        username = "anonymous"
    root = DATA_ROOT.resolve()
    path = (root / "users" / username).resolve()
    if not str(path).startswith(str(root)):
        path = root / "users" / "anonymous"
    return path


def ensure_dir(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path


def new_uuid() -> str:
    return str(uuid4())


def file_hash(path: Path) -> str:
    """SHA256 hash of file for dedupe."""
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()


def read_json(path: Path, default: Any = None) -> Any:
    if default is None:
        default = {}
    if not path.exists():
        return default
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError) as e:
        logger.warning("read_json %s: %s", path, e)
        return default


def write_json(path: Path, data: Any) -> None:
    ensure_dir(path.parent)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)


def read_jsonl(path: Path) -> List[Dict[str, Any]]:
    if not path.exists():
        return []
    out: List[Dict[str, Any]] = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                out.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return out


def append_jsonl(path: Path, record: Dict[str, Any]) -> None:
    ensure_dir(path.parent)
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")


def normalize_text(text: str) -> str:
    """Normalize whitespace and strip."""
    if not text:
        return ""
    return " ".join(text.split()).strip()