SuperPauly commited on
Commit
4d61df2
·
verified ·
1 Parent(s): f68e19f

Delete Modules/Obsidian_Vault.py

Browse files
Files changed (1) hide show
  1. Modules/Obsidian_Vault.py +0 -494
Modules/Obsidian_Vault.py DELETED
@@ -1,494 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import json
4
- import os
5
- import re
6
- import stat
7
- from datetime import datetime
8
- from typing import Annotated, Optional
9
-
10
- import gradio as gr
11
-
12
- from app import _log_call_end, _log_call_start, _truncate_for_log
13
- from ._docstrings import autodoc
14
-
15
-
16
- TOOL_SUMMARY = (
17
- "Browse and search the Obsidian vault in read-only mode. "
18
- "Actions: list, read, info, search, help. "
19
- "All paths resolve within the vault root. Start paths with '/' (e.g., /Notes)."
20
- )
21
-
22
- HELP_TEXT = (
23
- "Obsidian Vault — actions and usage\n\n"
24
- "Root: Tools/Obsidian (override with OBSIDIAN_VAULT_ROOT). "
25
- "Start paths with '/' to reference the vault root (e.g., /Projects/note.md). "
26
- "Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n"
27
- "Actions and fields:\n"
28
- "- list: path='/' (default), recursive=false, show_hidden=false, max_entries=20\n"
29
- "- read: path (e.g., /Projects/note.md), offset=0, max_chars=4000 (shows next_cursor when truncated)\n"
30
- "- info: path\n"
31
- "- search: path (note or folder), query text in the Search field, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n"
32
- "- help: show this guide\n\n"
33
- "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n"
34
- "Examples:\n"
35
- "- list current: action=list, path='/'\n"
36
- "- read note: action=read, path='/Projects/note.md', max_chars=500\n"
37
- "- show metadata: action=info, path='/Inbox'\n"
38
- "- search notes: action=search, path='/Projects', query='deadline', recursive=true, max_entries=100\n"
39
- "- case-sensitive search: action=search, query='TODO', case_sensitive=true\n"
40
- "- page search results: action=search, query='TODO', offset=20\n"
41
- )
42
-
43
-
44
- def _default_root() -> str:
45
- env_root = os.getenv("OBSIDIAN_VAULT_ROOT")
46
- if env_root and env_root.strip():
47
- return os.path.abspath(os.path.expanduser(env_root.strip()))
48
- try:
49
- here = os.path.abspath(__file__)
50
- tools_dir = os.path.dirname(os.path.dirname(here))
51
- return os.path.abspath(os.path.join(tools_dir, "Obsidian"))
52
- except Exception:
53
- return os.path.abspath(os.getcwd())
54
-
55
-
56
- ROOT_DIR = _default_root()
57
- try:
58
- os.makedirs(ROOT_DIR, exist_ok=True)
59
- except Exception:
60
- pass
61
- ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0")))
62
-
63
-
64
- def _safe_err(exc: Exception | str) -> str:
65
- """Return an error string with any absolute root replaced by '/' and slashes normalized."""
66
- s = str(exc)
67
- s_norm = s.replace("\\", "/")
68
- root_fwd = ROOT_DIR.replace("\\", "/")
69
- root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)}
70
- for variant in root_variants:
71
- if variant:
72
- s_norm = s_norm.replace(variant, "/")
73
- s_norm = re.sub(r"/+", "/", s_norm)
74
- return s_norm
75
-
76
-
77
- def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str:
78
- payload = {
79
- "status": "error",
80
- "code": code,
81
- "message": message,
82
- "root": "/",
83
- }
84
- if path:
85
- payload["path"] = path
86
- if hint:
87
- payload["hint"] = hint
88
- if data:
89
- payload["data"] = data
90
- return json.dumps(payload, ensure_ascii=False)
91
-
92
-
93
- def _display_path(abs_path: str) -> str:
94
- try:
95
- norm_root = os.path.normpath(ROOT_DIR)
96
- norm_abs = os.path.normpath(abs_path)
97
- common = os.path.commonpath([norm_root, norm_abs])
98
- if os.path.normcase(common) == os.path.normcase(norm_root):
99
- rel = os.path.relpath(norm_abs, norm_root)
100
- if rel == ".":
101
- return "/"
102
- return "/" + rel.replace("\\", "/")
103
- except Exception:
104
- pass
105
- return abs_path.replace("\\", "/")
106
-
107
-
108
- def _resolve_path(path: str) -> tuple[str, str]:
109
- try:
110
- user_input = (path or "/").strip() or "/"
111
- if user_input.startswith("/"):
112
- rel_part = user_input.lstrip("/") or "."
113
- raw = os.path.expanduser(rel_part)
114
- treat_as_relative = True
115
- else:
116
- raw = os.path.expanduser(user_input)
117
- treat_as_relative = False
118
-
119
- if not treat_as_relative and os.path.isabs(raw):
120
- if not ALLOW_ABS:
121
- return "", _err(
122
- "absolute_path_disabled",
123
- "Absolute paths are disabled in safe mode.",
124
- path=raw.replace("\\", "/"),
125
- hint="Use a path relative to / (e.g., /Notes/index.md).",
126
- )
127
- abs_path = os.path.abspath(raw)
128
- else:
129
- abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw))
130
- if not ALLOW_ABS:
131
- try:
132
- common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)])
133
- except Exception:
134
- root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
135
- abs_cmp = os.path.normcase(os.path.normpath(abs_path))
136
- if not abs_cmp.startswith(root_cmp):
137
- return "", _err(
138
- "path_outside_root",
139
- "Path not allowed outside root.",
140
- path=user_input.replace("\\", "/"),
141
- hint="Use a path under / (the vault root).",
142
- )
143
- else:
144
- root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR))
145
- common_cmp = os.path.normcase(os.path.normpath(common))
146
- if common_cmp != root_cmp:
147
- return "", _err(
148
- "path_outside_root",
149
- "Path not allowed outside root.",
150
- path=user_input.replace("\\", "/"),
151
- hint="Use a path under / (the vault root).",
152
- )
153
- return abs_path, ""
154
- except Exception as exc:
155
- return "", _err(
156
- "resolve_path_failed",
157
- "Failed to resolve path.",
158
- path=(path or ""),
159
- data={"error": _safe_err(exc)},
160
- )
161
-
162
-
163
- def _fmt_size(num_bytes: int) -> str:
164
- units = ["B", "KB", "MB", "GB", "TB"]
165
- size = float(num_bytes)
166
- for unit in units:
167
- if size < 1024.0:
168
- return f"{size:.1f} {unit}"
169
- size /= 1024.0
170
- return f"{size:.1f} PB"
171
-
172
-
173
- def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str:
174
- lines: list[str] = []
175
- total = 0
176
- listing_display = _display_path(abs_path)
177
- for root, dirs, files in os.walk(abs_path):
178
- if not show_hidden:
179
- dirs[:] = [d for d in dirs if not d.startswith('.')]
180
- files = [f for f in files if not f.startswith('.')]
181
- try:
182
- rel_root = os.path.relpath(root, ROOT_DIR)
183
- except Exception:
184
- rel_root = root
185
- rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/")
186
- lines.append(f"\n📂 {rel_root_disp}")
187
- dirs.sort()
188
- files.sort()
189
- for d in dirs:
190
- p = os.path.join(root, d)
191
- try:
192
- mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
193
- except Exception:
194
- mtime = "?"
195
- lines.append(f" • [DIR] {d} (modified {mtime})")
196
- total += 1
197
- if total >= max_entries:
198
- lines.append(f"\n… Truncated at {max_entries} entries.")
199
- return "\n".join(lines).strip()
200
- for f in files:
201
- p = os.path.join(root, f)
202
- try:
203
- size = _fmt_size(os.path.getsize(p))
204
- mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds')
205
- except Exception:
206
- size, mtime = "?", "?"
207
- lines.append(f" • {f} ({size}, modified {mtime})")
208
- total += 1
209
- if total >= max_entries:
210
- lines.append(f"\n… Truncated at {max_entries} entries.")
211
- return "\n".join(lines).strip()
212
- if not recursive:
213
- break
214
- header = f"Listing of {listing_display}\nRoot: /\nEntries: {total}"
215
- return (header + "\n" + "\n".join(lines)).strip()
216
-
217
-
218
- def _search_text(
219
- abs_path: str,
220
- query: str,
221
- *,
222
- recursive: bool,
223
- show_hidden: bool,
224
- max_results: int,
225
- case_sensitive: bool,
226
- start_index: int,
227
- ) -> str:
228
- if not os.path.exists(abs_path):
229
- return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
230
-
231
- query = query or ""
232
- normalized_query = query if case_sensitive else query.lower()
233
- if normalized_query == "":
234
- return _err(
235
- "missing_search_query",
236
- "Search query is required for the search action.",
237
- hint="Provide text in the Search field to look for.",
238
- )
239
-
240
- max_results = max(1, int(max_results) if max_results is not None else 20)
241
- start_index = max(0, int(start_index) if start_index is not None else 0)
242
- matches: list[tuple[str, int, str]] = []
243
- errors: list[str] = []
244
- files_scanned = 0
245
- truncated = False
246
- total_matches = 0
247
-
248
- def _should_skip(name: str) -> bool:
249
- return not show_hidden and name.startswith('.')
250
-
251
- def _handle_match(file_path: str, line_no: int, line_text: str) -> bool:
252
- nonlocal truncated, total_matches
253
- total_matches += 1
254
- if total_matches <= start_index:
255
- return False
256
- if len(matches) < max_results:
257
- snippet = line_text.strip()
258
- if len(snippet) > 200:
259
- snippet = snippet[:197] + "…"
260
- matches.append((_display_path(file_path), line_no, snippet))
261
- return False
262
- truncated = True
263
- return True
264
-
265
- def _search_file(file_path: str) -> bool:
266
- nonlocal files_scanned
267
- files_scanned += 1
268
- try:
269
- with open(file_path, 'r', encoding='utf-8', errors='replace') as handle:
270
- for line_no, line in enumerate(handle, start=1):
271
- haystack = line if case_sensitive else line.lower()
272
- if normalized_query in haystack:
273
- if _handle_match(file_path, line_no, line):
274
- return True
275
- except Exception as exc:
276
- errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})")
277
- return truncated
278
-
279
- if os.path.isfile(abs_path):
280
- _search_file(abs_path)
281
- else:
282
- for root, dirs, files in os.walk(abs_path):
283
- dirs[:] = [d for d in dirs if not _should_skip(d)]
284
- visible_files = [f for f in files if show_hidden or not f.startswith('.')]
285
- for name in visible_files:
286
- file_path = os.path.join(root, name)
287
- if _search_file(file_path):
288
- break
289
- if truncated:
290
- break
291
- if not recursive:
292
- break
293
-
294
- header_lines = [
295
- f"Search results for {query!r}",
296
- f"Scope: {_display_path(abs_path)}",
297
- f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}",
298
- f"Start offset: {start_index}",
299
- f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""),
300
- f"Files scanned: {files_scanned}",
301
- ]
302
-
303
- next_cursor = start_index + len(matches) if truncated else None
304
-
305
- if truncated:
306
- header_lines.append(f"Matches encountered before truncation: {total_matches}")
307
- header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.")
308
- header_lines.append(f"Next cursor: {next_cursor}")
309
- else:
310
- header_lines.append(f"Total matches found: {total_matches}")
311
- header_lines.append("Truncated: no — end of results.")
312
- header_lines.append("Next cursor: None")
313
-
314
- if not matches:
315
- if total_matches > 0 and start_index >= total_matches:
316
- hint_limit = max(total_matches - 1, 0)
317
- body_lines = [
318
- f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.",
319
- (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""),
320
- ]
321
- body_lines = [line for line in body_lines if line]
322
- else:
323
- body_lines = [
324
- "No matches found.",
325
- (f"Total matches encountered: {total_matches}." if total_matches else ""),
326
- ]
327
- body_lines = [line for line in body_lines if line]
328
- else:
329
- body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)]
330
-
331
- if errors:
332
- shown = errors[:5]
333
- body_lines.extend(["", "Warnings:"])
334
- body_lines.extend(shown)
335
- if len(errors) > len(shown):
336
- body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.")
337
-
338
- return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines)
339
-
340
-
341
- def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str:
342
- if not os.path.exists(abs_path):
343
- return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path))
344
- if os.path.isdir(abs_path):
345
- return _err(
346
- "is_directory",
347
- f"Path is a directory, not a file: {_display_path(abs_path)}",
348
- path=_display_path(abs_path),
349
- hint="Provide a file path.",
350
- )
351
- try:
352
- with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
353
- data = f.read()
354
- except Exception as exc:
355
- return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
356
- total = len(data)
357
- start = max(0, min(offset, total))
358
- end = total if max_chars <= 0 else min(total, start + max_chars)
359
- chunk = data[start:end]
360
- next_cursor = end if end < total else None
361
- header = (
362
- f"Reading {_display_path(abs_path)}\n"
363
- f"Offset {start}, returned {len(chunk)} of {total}."
364
- + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "")
365
- )
366
- return header + "\n\n---\n\n" + chunk
367
-
368
-
369
- def _info(abs_path: str) -> str:
370
- try:
371
- st = os.stat(abs_path)
372
- except Exception as exc:
373
- return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)})
374
- info = {
375
- "path": _display_path(abs_path),
376
- "type": "directory" if stat.S_ISDIR(st.st_mode) else "file",
377
- "size": st.st_size,
378
- "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'),
379
- "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'),
380
- "mode": oct(st.st_mode),
381
- "root": "/",
382
- }
383
- return json.dumps(info, indent=2)
384
-
385
-
386
- @autodoc(summary=TOOL_SUMMARY)
387
- def Obsidian_Vault(
388
- action: Annotated[str, "Operation to perform: 'list', 'read', 'info', 'search', 'help'."],
389
- path: Annotated[str, "Target path, relative to the vault root." ] = "/",
390
- query: Annotated[Optional[str], "Text to search for when action=search."] = None,
391
- recursive: Annotated[bool, "Recurse into subfolders when listing/searching."] = False,
392
- show_hidden: Annotated[bool, "Include hidden files when listing/searching."] = False,
393
- max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20,
394
- offset: Annotated[int, "Start offset when reading files."] = 0,
395
- max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000,
396
- case_sensitive: Annotated[bool, "Match case when searching text."] = False,
397
- ) -> str:
398
- _log_call_start(
399
- "Obsidian_Vault",
400
- action=action,
401
- path=path,
402
- query=query,
403
- recursive=recursive,
404
- show_hidden=show_hidden,
405
- max_entries=max_entries,
406
- offset=offset,
407
- max_chars=max_chars,
408
- case_sensitive=case_sensitive,
409
- )
410
- action = (action or "").strip().lower()
411
- if action not in {"list", "read", "info", "search", "help"}:
412
- result = _err(
413
- "invalid_action",
414
- "Invalid action.",
415
- hint="Choose from: list, read, info, search, help.",
416
- )
417
- _log_call_end("Obsidian_Vault", _truncate_for_log(result))
418
- return result
419
-
420
- if action == "help":
421
- result = HELP_TEXT
422
- _log_call_end("Obsidian_Vault", _truncate_for_log(result))
423
- return result
424
-
425
- abs_path, err = _resolve_path(path)
426
- if err:
427
- _log_call_end("Obsidian_Vault", _truncate_for_log(err))
428
- return err
429
-
430
- try:
431
- if action == "list":
432
- if not os.path.exists(abs_path):
433
- result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path))
434
- else:
435
- result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries)
436
- elif action == "read":
437
- result = _read_file(abs_path, offset=offset, max_chars=max_chars)
438
- elif action == "search":
439
- query_text = query or ""
440
- if query_text.strip() == "":
441
- result = _err(
442
- "missing_search_query",
443
- "Search query is required for the search action.",
444
- hint="Provide text in the Search field to look for.",
445
- )
446
- else:
447
- result = _search_text(
448
- abs_path,
449
- query_text,
450
- recursive=recursive,
451
- show_hidden=show_hidden,
452
- max_results=max_entries,
453
- case_sensitive=case_sensitive,
454
- start_index=offset,
455
- )
456
- else: # info
457
- result = _info(abs_path)
458
- except Exception as exc:
459
- result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)})
460
-
461
- _log_call_end("Obsidian_Vault", _truncate_for_log(result))
462
- return result
463
-
464
-
465
- def build_interface() -> gr.Interface:
466
- return gr.Interface(
467
- fn=Obsidian_Vault,
468
- inputs=[
469
- gr.Radio(
470
- label="Action",
471
- choices=["list", "read", "info", "search", "help"],
472
- value="help",
473
- ),
474
- gr.Textbox(label="Path", placeholder="/ or /Notes/todo.md", max_lines=1, value="/"),
475
- gr.Textbox(label="Search text (search)", lines=3, placeholder="Text to search for..."),
476
- gr.Checkbox(label="Recursive (list/search)", value=False),
477
- gr.Checkbox(label="Show hidden (list/search)", value=False),
478
- gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches"),
479
- gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read/search start)"),
480
- gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"),
481
- gr.Checkbox(label="Case sensitive search", value=False),
482
- ],
483
- outputs=gr.Textbox(label="Result", lines=20),
484
- title="Obsidian Vault",
485
- description=(
486
- "<div style=\"text-align:center; overflow:hidden;\">Explore and search notes in the vault without modifying them." "</div>"
487
- ),
488
- api_description=TOOL_SUMMARY,
489
- flagging_mode="never",
490
- submit_btn="Run",
491
- )
492
-
493
-
494
- __all__ = ["Obsidian_Vault", "build_interface"]