Abhinav Biju
Deploying RAG pipeline changes (excluding binary data)
182e0fa
"""Storage helpers for per-user notebook data.
Spec references:
- `specs/04_interfaces.md`: required storage module interface.
- `specs/03_data_model.md`: JSON object storage and JSONL message layout.
- `specs/07_security.md`: per-user directory isolation and path traversal prevention.
- `specs/10_test_plan.md`: unit-testable storage safety behavior.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
class StorageError(Exception):
"""Base exception for storage-related failures."""
class StorageConfigurationError(StorageError):
"""Raised when the storage root is not configured correctly."""
class StorageFormatError(StorageError):
"""Raised when persisted data does not match the expected JSON shape."""
class StorageIOError(StorageError):
"""Raised when file reads or writes fail."""
def _data_root() -> Path:
"""Return the configured data root directory.
Spec references:
- `specs/07_security.md`: storage must enforce per-user directory isolation.
- `specs/10_test_plan.md`: root selection must remain unit-testable.
Returns:
The resolved Path to the data root. Defaults to './data' if NOTEBOOKLM_DATA_ROOT is unset.
"""
raw_root: str = os.getenv("NOTEBOOKLM_DATA_ROOT", "").strip()
if not raw_root:
raw_root = "./data"
root: Path = Path(raw_root).expanduser()
root.mkdir(parents=True, exist_ok=True)
return root.resolve(strict=False)
def _validate_name(value: str, label: str) -> str:
"""Validate a user-supplied path segment before path construction.
Spec references:
- `specs/07_security.md`: prevent path traversal and preserve isolation.
Raises:
ValueError: If the supplied segment is empty or contains path separators.
"""
if not value or not value.strip():
raise ValueError(f"{label} must be a non-empty string.")
candidate: Path = Path(value)
if candidate.name != value or candidate.is_absolute():
raise ValueError(f"{label} must be a single relative path segment.")
return value
def user_root(username: str) -> Path:
"""Return the per-user storage directory.
Spec references:
- `specs/04_interfaces.md`: implements `user_root()`.
- `specs/07_security.md`: enforces per-user directory isolation.
Raises:
ValueError: If `username` is not a safe single path segment.
StorageConfigurationError: If the data root is not configured.
StorageIOError: If the directory cannot be created.
"""
safe_username: str = _validate_name(username, "username")
root: Path = safe_join(_data_root(), "users", safe_username)
try:
root.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise StorageIOError(f"Failed to create user root directory: {root}") from exc
return root
def notebook_root(username: str, notebook_id: str) -> Path:
"""Return the per-notebook storage directory for a user.
Spec references:
- `specs/04_interfaces.md`: implements `notebook_root()`.
- `specs/07_security.md`: preserves per-user notebook isolation.
Raises:
ValueError: If `username` or `notebook_id` is unsafe.
StorageConfigurationError: If the data root is not configured.
StorageIOError: If the directory cannot be created.
"""
safe_notebook_id: str = _validate_name(notebook_id, "notebook_id")
root: Path = safe_join(user_root(username), "notebooks", safe_notebook_id)
try:
root.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise StorageIOError(f"Failed to create notebook root directory: {root}") from exc
return root
def safe_join(root: Path, *parts: str | os.PathLike[str]) -> Path:
"""Join path parts beneath `root` while preventing traversal.
Spec references:
- `specs/04_interfaces.md`: implements `safe_join()`.
- `specs/07_security.md`: resolved path must remain inside the root.
- `specs/10_test_plan.md`: supports storage safety unit tests.
Args:
root: The directory boundary that must contain the resolved result.
*parts: Relative path segments to join beneath `root`.
Returns:
A resolved path contained within `root`.
Raises:
ValueError: If traversal is attempted or an absolute path is supplied.
StorageIOError: If the root directory cannot be prepared.
"""
try:
root.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise StorageIOError(f"Failed to prepare storage root: {root}") from exc
resolved_root: Path = root.resolve(strict=False)
candidate: Path = resolved_root
for part in parts:
part_path: Path = Path(part)
if part_path.is_absolute():
raise ValueError(f"Absolute paths are not allowed in safe_join: {part_path}")
candidate = candidate / part_path
resolved_candidate: Path = candidate.resolve(strict=False)
try:
resolved_candidate.relative_to(resolved_root)
except ValueError as exc:
raise ValueError(
f"Path traversal detected for root '{resolved_root}' and path '{resolved_candidate}'."
) from exc
return resolved_candidate
def read_json(path: Path) -> dict[str, Any]:
"""Read a JSON object from disk.
Spec references:
- `specs/04_interfaces.md`: implements `read_json()`.
- `specs/03_data_model.md`: persisted JSON files use object-shaped payloads.
Raises:
StorageIOError: If the file cannot be opened or parsed.
StorageFormatError: If the decoded JSON is not a top-level object.
"""
try:
with path.open("r", encoding="utf-8") as handle:
payload: Any = json.load(handle)
except FileNotFoundError as exc:
raise StorageIOError(f"JSON file does not exist: {path}") from exc
except json.JSONDecodeError as exc:
raise StorageIOError(f"Invalid JSON in file: {path}") from exc
except OSError as exc:
raise StorageIOError(f"Failed to read JSON file: {path}") from exc
if not isinstance(payload, dict):
raise StorageFormatError(f"Expected a JSON object in file: {path}")
return payload
def write_json(path: Path, obj: dict[str, Any]) -> None:
"""Write a JSON object to disk.
Spec references:
- `specs/04_interfaces.md`: implements `write_json()`.
- `specs/03_data_model.md`: persisted metadata files are JSON objects.
Raises:
StorageFormatError: If `obj` is not a dictionary.
StorageIOError: If the file cannot be written.
"""
if not isinstance(obj, dict):
raise StorageFormatError("write_json expects a dictionary object.")
try:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as handle:
json.dump(obj, handle, ensure_ascii=True, indent=2, sort_keys=True)
handle.write("\n")
except OSError as exc:
raise StorageIOError(f"Failed to write JSON file: {path}") from exc
def append_jsonl(path: Path, obj: dict[str, Any]) -> None:
"""Append one JSON object as one line to a JSONL file.
Spec references:
- `specs/04_interfaces.md`: implements `append_jsonl()`.
- `specs/03_data_model.md`: `messages.jsonl` stores one JSON object per line.
Raises:
StorageFormatError: If `obj` is not a dictionary.
StorageIOError: If the file cannot be appended.
"""
if not isinstance(obj, dict):
raise StorageFormatError("append_jsonl expects a dictionary object.")
try:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8", newline="\n") as handle:
handle.write(json.dumps(obj, ensure_ascii=True, sort_keys=True))
handle.write("\n")
except OSError as exc:
raise StorageIOError(f"Failed to append JSONL file: {path}") from exc