Spaces:
Sleeping
Sleeping
File size: 10,651 Bytes
1e6a9db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
"""Filesystem vault management."""
from __future__ import annotations
from datetime import datetime, timezone
import logging
from pathlib import Path
import re
import time
from typing import Any, Dict, List, Tuple
import frontmatter
from .config import AppConfig, get_config
logger = logging.getLogger(__name__)
INVALID_PATH_CHARS = {'<', '>', ':', '"', '|', '?', '*'}
MAX_NOTE_BYTES = 1_048_576
H1_PATTERN = re.compile(r"^\s*#\s+(.+)$", re.MULTILINE)
VaultNote = Dict[str, Any]
def validate_note_path(note_path: str) -> Tuple[bool, str]:
"""
Validate a relative Markdown path.
Returns (is_valid, message). Message is empty when valid.
"""
if not note_path or len(note_path) > 256:
return False, "Path must be 1-256 characters"
if not note_path.endswith(".md"):
return False, "Path must end with .md"
if ".." in note_path:
return False, "Path must not contain '..'"
if "\\" in note_path:
return False, "Path must use Unix separators (/)"
if note_path.startswith("/"):
return False, "Path must be relative (no leading /)"
if any(char in INVALID_PATH_CHARS for char in note_path):
return False, "Path contains invalid characters"
return True, ""
def sanitize_path(user_id: str, vault_root: Path, note_path: str) -> Path:
"""
Sanitize and resolve a note path within the vault.
Raises ValueError if the resolved path escapes the vault root.
"""
vault = (vault_root / user_id).resolve()
full_path = (vault / note_path).resolve()
if not str(full_path).startswith(str(vault)):
raise ValueError(f"Path escapes vault root: {note_path}")
return full_path
def _utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _validate_frontmatter(metadata: Dict[str, Any]) -> Dict[str, Any]:
reserved = {"version"}
for key in metadata.keys():
if key in reserved:
raise ValueError(f"Field '{key}' is reserved and cannot be set in frontmatter")
tags = metadata.get("tags")
if tags is not None:
if not isinstance(tags, list):
raise ValueError("Field 'tags' must be an array")
if not all(isinstance(tag, str) for tag in tags):
raise ValueError("All tags must be strings")
return metadata
def _validate_note_body(body: str) -> None:
body_bytes = body.encode("utf-8")
if len(body_bytes) > MAX_NOTE_BYTES:
raise ValueError("Note exceeds 1 MiB limit")
def _derive_title(note_path: str, metadata: Dict[str, Any], body: str) -> str:
title = metadata.get("title")
if isinstance(title, str) and title.strip():
return title.strip()
match = H1_PATTERN.search(body or "")
if match:
return match.group(1).strip()
stem = Path(note_path).stem
title_from_filename = stem.replace("-", " ").replace("_", " ").strip()
return title_from_filename or stem
class VaultService:
"""Service for managing vault directories and basic path validation."""
def __init__(self, config: AppConfig | None = None) -> None:
self.config = config or get_config()
self.vault_root = self.config.vault_base_path
self.vault_root.mkdir(parents=True, exist_ok=True)
def initialize_vault(self, user_id: str) -> Path:
"""Ensure a user's vault directory exists and return its path."""
path = (self.vault_root / user_id).resolve()
path.mkdir(parents=True, exist_ok=True)
return path
def resolve_note_path(self, user_id: str, note_path: str) -> Path:
"""
Validate and resolve a note path inside a user's vault.
Raises ValueError for invalid paths.
"""
is_valid, message = validate_note_path(note_path)
if not is_valid:
raise ValueError(message)
return sanitize_path(user_id, self.vault_root, note_path)
def read_note(self, user_id: str, note_path: str) -> VaultNote:
"""Read a Markdown note, returning metadata, body, and derived title."""
start_time = time.time()
base = self.initialize_vault(user_id)
absolute_path = self.resolve_note_path(user_id, note_path)
if not absolute_path.exists():
logger.warning(
"Note not found",
extra={"user_id": user_id, "note_path": note_path, "operation": "read"}
)
raise FileNotFoundError(f"Note not found: {note_path}")
post = frontmatter.load(absolute_path)
metadata = dict(post.metadata or {})
body = post.content or ""
duration_ms = (time.time() - start_time) * 1000
logger.info(
"Note read successfully",
extra={
"user_id": user_id,
"note_path": note_path,
"operation": "read",
"duration_ms": f"{duration_ms:.2f}",
"size_bytes": absolute_path.stat().st_size
}
)
return self._build_note_payload(note_path, metadata, body, absolute_path)
def write_note(
self,
user_id: str,
note_path: str,
*,
title: str | None = None,
metadata: Dict[str, Any] | None = None,
body: str,
) -> VaultNote:
"""Create or update a note with validated metadata and content."""
start_time = time.time()
absolute_path = self.resolve_note_path(user_id, note_path)
body = body or ""
_validate_note_body(body)
metadata_dict: Dict[str, Any] = dict(metadata or {})
_validate_frontmatter(metadata_dict)
is_new_note = not absolute_path.exists()
existing_created: str | None = None
if not is_new_note:
try:
current = frontmatter.load(absolute_path)
current_created = current.metadata.get("created")
if isinstance(current_created, str):
existing_created = current_created
except Exception:
existing_created = None
effective_title = title or metadata_dict.get("title")
if not effective_title:
effective_title = _derive_title(note_path, metadata_dict, body)
metadata_dict["title"] = effective_title
now_iso = _utcnow_iso()
metadata_dict.setdefault("created", existing_created or metadata_dict.get("created") or now_iso)
metadata_dict["updated"] = now_iso
absolute_path.parent.mkdir(parents=True, exist_ok=True)
post = frontmatter.Post(body, **metadata_dict)
absolute_path.write_text(frontmatter.dumps(post), encoding="utf-8")
duration_ms = (time.time() - start_time) * 1000
logger.info(
f"Note {'created' if is_new_note else 'updated'} successfully",
extra={
"user_id": user_id,
"note_path": note_path,
"operation": "create" if is_new_note else "update",
"duration_ms": f"{duration_ms:.2f}",
"size_bytes": len(frontmatter.dumps(post).encode("utf-8"))
}
)
return self._build_note_payload(note_path, metadata_dict, body, absolute_path)
def delete_note(self, user_id: str, note_path: str) -> None:
"""Delete a note from the vault."""
start_time = time.time()
absolute_path = self.resolve_note_path(user_id, note_path)
try:
absolute_path.unlink()
duration_ms = (time.time() - start_time) * 1000
logger.info(
"Note deleted successfully",
extra={
"user_id": user_id,
"note_path": note_path,
"operation": "delete",
"duration_ms": f"{duration_ms:.2f}"
}
)
except FileNotFoundError as exc:
logger.warning(
"Note not found for deletion",
extra={"user_id": user_id, "note_path": note_path, "operation": "delete"}
)
raise FileNotFoundError(f"Note not found: {note_path}") from exc
def list_notes(self, user_id: str, folder: str | None = None) -> List[Dict[str, Any]]:
"""List notes (optionally scoped to a folder) with titles and timestamps."""
base = self.initialize_vault(user_id).resolve()
if folder:
cleaned = folder.strip().strip("/")
if "\\" in cleaned or ".." in cleaned:
raise ValueError("Folder path contains invalid characters")
folder_path = (base / cleaned).resolve() if cleaned else base
if not str(folder_path).startswith(str(base)):
raise ValueError("Folder path escapes vault root")
if not folder_path.exists():
return []
if folder_path.is_file():
files = [folder_path] if folder_path.suffix == ".md" else []
else:
files = list(folder_path.rglob("*.md"))
else:
files = list(base.rglob("*.md"))
results: List[Dict[str, Any]] = []
for file_path in files:
if not file_path.is_file():
continue
relative_path = file_path.relative_to(base).as_posix()
try:
post = frontmatter.load(file_path)
metadata = dict(post.metadata or {})
body = post.content or ""
title = _derive_title(relative_path, metadata, body)
except Exception:
title = Path(relative_path).stem
stat = file_path.stat()
results.append(
{
"path": relative_path,
"title": title,
"last_modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
}
)
return sorted(results, key=lambda item: item["path"].lower())
def _build_note_payload(
self, note_path: str, metadata: Dict[str, Any], body: str, absolute_path: Path
) -> VaultNote:
stat = absolute_path.stat()
title = _derive_title(note_path, metadata, body)
return {
"path": note_path,
"title": title,
"metadata": metadata,
"body": body,
"size_bytes": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
"absolute_path": absolute_path,
}
__all__ = ["VaultService", "validate_note_path", "sanitize_path"]
|