NotebookLMClone / src /ingestion /storage.py
github-actions[bot]
Sync from GitHub e48aa5f27523b35a22c1a01acbb2b835cdc28984
aacd162
from __future__ import annotations
from pathlib import Path
from typing import Optional, Dict, Any
import shutil
import json
import uuid
from datetime import datetime
class StorageAdapter:
"""Abstract storage adapter. Implementations must provide these methods.
"""
def ensure_notebook(self, user_id: str, notebook_id: str) -> Path:
raise NotImplementedError()
def save_raw_file(self, user_id: str, notebook_id: str, source_id: str, src_path: Path) -> Path:
raise NotImplementedError()
def save_extracted_text(self, user_id: str, notebook_id: str, source_id: str, filename: str, text: str) -> Path:
raise NotImplementedError()
def read_index(self, user_id: str) -> Dict[str, Any]:
raise NotImplementedError()
def write_index(self, user_id: str, index: Dict[str, Any]) -> None:
raise NotImplementedError()
class LocalStorageAdapter(StorageAdapter):
"""Local filesystem storage adapter following the project's `data/` layout.
Example usage:
adapter = LocalStorageAdapter(base_dir="data")
adapter.ensure_notebook("alice", "nb-123")
"""
def __init__(self, base_dir: str = "data"):
self.base = Path(base_dir)
self.base.mkdir(parents=True, exist_ok=True)
def _user_dir(self, user_id: str) -> Path:
return self.base / "users" / user_id
def _notebooks_dir(self, user_id: str) -> Path:
return self._user_dir(user_id) / "notebooks"
def ensure_notebook(self, user_id: str, notebook_id: str) -> Path:
notebooks = self._notebooks_dir(user_id)
notebooks.mkdir(parents=True, exist_ok=True)
nb_dir = notebooks / notebook_id
nb_dir.mkdir(parents=True, exist_ok=True)
# create subfolders
(nb_dir / "files_raw").mkdir(exist_ok=True)
(nb_dir / "files_extracted").mkdir(exist_ok=True)
(nb_dir / "chroma").mkdir(exist_ok=True)
(nb_dir / "chat").mkdir(exist_ok=True)
(nb_dir / "artifacts").mkdir(exist_ok=True)
# ensure per-user index exists
idx = self._user_dir(user_id) / "notebooks" / "index.json"
if not idx.exists():
idx.parent.mkdir(parents=True, exist_ok=True)
idx.write_text(json.dumps({"notebooks": []}, indent=2), encoding="utf-8")
# register notebook in index if missing
self._register_notebook_in_index(user_id, notebook_id)
return nb_dir
def _register_notebook_in_index(self, user_id: str, notebook_id: str):
idx_path = self._user_dir(user_id) / "notebooks" / "index.json"
try:
data = json.loads(idx_path.read_text(encoding="utf-8"))
except Exception:
data = {"notebooks": []}
known = {n.get("id") for n in data.get("notebooks", [])}
if notebook_id not in known:
data.setdefault("notebooks", []).append({
"id": notebook_id,
"name": notebook_id,
"created_at": datetime.utcnow().isoformat() + "Z",
})
idx_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def save_raw_file(self, user_id: str, notebook_id: str, source_id: str, src_path: Path) -> Path:
nb = self.ensure_notebook(user_id, notebook_id)
dest_dir = nb / "files_raw" / source_id
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / src_path.name
shutil.copy2(src_path, dest)
return dest
def save_extracted_text(self, user_id: str, notebook_id: str, source_id: str, filename: str, text: str) -> Path:
nb = self.ensure_notebook(user_id, notebook_id)
dest_dir = nb / "files_extracted" / source_id
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{filename}.txt"
dest.write_text(text, encoding="utf-8")
return dest
def read_index(self, user_id: str) -> Dict[str, Any]:
idx = self._user_dir(user_id) / "notebooks" / "index.json"
if not idx.exists():
return {"notebooks": []}
return json.loads(idx.read_text(encoding="utf-8"))
def write_index(self, user_id: str, index: Dict[str, Any]) -> None:
idx = self._user_dir(user_id) / "notebooks" / "index.json"
idx.parent.mkdir(parents=True, exist_ok=True)
idx.write_text(json.dumps(index, indent=2), encoding="utf-8")