Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import stat | |
| from datetime import datetime | |
| from typing import Annotated, Optional | |
| import gradio as gr | |
| from app import _log_call_end, _log_call_start, _truncate_for_log | |
| from ._docstrings import autodoc | |
| TOOL_SUMMARY = ( | |
| "Browse and search the Obsidian vault in read-only mode. " | |
| "Actions: list, read, info, search, help. " | |
| "All paths resolve within the vault root. Start paths with '/' (e.g., /Notes)." | |
| ) | |
| HELP_TEXT = ( | |
| "Obsidian Vault — actions and usage\n\n" | |
| "Root: Tools/Obsidian (override with OBSIDIAN_VAULT_ROOT). " | |
| "Start paths with '/' to reference the vault root (e.g., /Projects/note.md). " | |
| "Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n" | |
| "Actions and fields:\n" | |
| "- list: path='/' (default), recursive=false, show_hidden=false, max_entries=20\n" | |
| "- read: path (e.g., /Projects/note.md), offset=0, max_chars=4000 (shows next_cursor when truncated)\n" | |
| "- info: path\n" | |
| "- search: path (note or folder), query text in the Search field, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n" | |
| "- help: show this guide\n\n" | |
| "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n" | |
| "Examples:\n" | |
| "- list current: action=list, path='/'\n" | |
| "- read note: action=read, path='/Projects/note.md', max_chars=500\n" | |
| "- show metadata: action=info, path='/Inbox'\n" | |
| "- search notes: action=search, path='/Projects', query='deadline', recursive=true, max_entries=100\n" | |
| "- case-sensitive search: action=search, query='TODO', case_sensitive=true\n" | |
| "- page search results: action=search, query='TODO', offset=20\n" | |
| ) | |
| def _default_root() -> str: | |
| env_root = os.getenv("OBSIDIAN_VAULT_ROOT") | |
| if env_root and env_root.strip(): | |
| return os.path.abspath(os.path.expanduser(env_root.strip())) | |
| try: | |
| here = os.path.abspath(__file__) | |
| tools_dir = os.path.dirname(os.path.dirname(here)) | |
| return os.path.abspath(os.path.join(tools_dir, "Obsidian")) | |
| except Exception: | |
| return os.path.abspath(os.getcwd()) | |
| ROOT_DIR = _default_root() | |
| try: | |
| os.makedirs(ROOT_DIR, exist_ok=True) | |
| except Exception: | |
| pass | |
| ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) | |
| def _safe_err(exc: Exception | str) -> str: | |
| """Return an error string with any absolute root replaced by '/' and slashes normalized.""" | |
| s = str(exc) | |
| s_norm = s.replace("\\", "/") | |
| root_fwd = ROOT_DIR.replace("\\", "/") | |
| root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)} | |
| for variant in root_variants: | |
| if variant: | |
| s_norm = s_norm.replace(variant, "/") | |
| s_norm = re.sub(r"/+", "/", s_norm) | |
| return s_norm | |
| def _err(code: str, message: str, *, path: str | None = None, hint: str | None = None, data: dict | None = None) -> str: | |
| payload = { | |
| "status": "error", | |
| "code": code, | |
| "message": message, | |
| "root": "/", | |
| } | |
| if path: | |
| payload["path"] = path | |
| if hint: | |
| payload["hint"] = hint | |
| if data: | |
| payload["data"] = data | |
| return json.dumps(payload, ensure_ascii=False) | |
| def _display_path(abs_path: str) -> str: | |
| try: | |
| norm_root = os.path.normpath(ROOT_DIR) | |
| norm_abs = os.path.normpath(abs_path) | |
| common = os.path.commonpath([norm_root, norm_abs]) | |
| if os.path.normcase(common) == os.path.normcase(norm_root): | |
| rel = os.path.relpath(norm_abs, norm_root) | |
| if rel == ".": | |
| return "/" | |
| return "/" + rel.replace("\\", "/") | |
| except Exception: | |
| pass | |
| return abs_path.replace("\\", "/") | |
| def _resolve_path(path: str) -> tuple[str, str]: | |
| try: | |
| user_input = (path or "/").strip() or "/" | |
| if user_input.startswith("/"): | |
| rel_part = user_input.lstrip("/") or "." | |
| raw = os.path.expanduser(rel_part) | |
| treat_as_relative = True | |
| else: | |
| raw = os.path.expanduser(user_input) | |
| treat_as_relative = False | |
| if not treat_as_relative and os.path.isabs(raw): | |
| if not ALLOW_ABS: | |
| return "", _err( | |
| "absolute_path_disabled", | |
| "Absolute paths are disabled in safe mode.", | |
| path=raw.replace("\\", "/"), | |
| hint="Use a path relative to / (e.g., /Notes/index.md).", | |
| ) | |
| abs_path = os.path.abspath(raw) | |
| else: | |
| abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw)) | |
| if not ALLOW_ABS: | |
| try: | |
| common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)]) | |
| except Exception: | |
| root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) | |
| abs_cmp = os.path.normcase(os.path.normpath(abs_path)) | |
| if not abs_cmp.startswith(root_cmp): | |
| return "", _err( | |
| "path_outside_root", | |
| "Path not allowed outside root.", | |
| path=user_input.replace("\\", "/"), | |
| hint="Use a path under / (the vault root).", | |
| ) | |
| else: | |
| root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) | |
| common_cmp = os.path.normcase(os.path.normpath(common)) | |
| if common_cmp != root_cmp: | |
| return "", _err( | |
| "path_outside_root", | |
| "Path not allowed outside root.", | |
| path=user_input.replace("\\", "/"), | |
| hint="Use a path under / (the vault root).", | |
| ) | |
| return abs_path, "" | |
| except Exception as exc: | |
| return "", _err( | |
| "resolve_path_failed", | |
| "Failed to resolve path.", | |
| path=(path or ""), | |
| data={"error": _safe_err(exc)}, | |
| ) | |
| def _fmt_size(num_bytes: int) -> str: | |
| units = ["B", "KB", "MB", "GB", "TB"] | |
| size = float(num_bytes) | |
| for unit in units: | |
| if size < 1024.0: | |
| return f"{size:.1f} {unit}" | |
| size /= 1024.0 | |
| return f"{size:.1f} PB" | |
| def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str: | |
| lines: list[str] = [] | |
| total = 0 | |
| listing_display = _display_path(abs_path) | |
| for root, dirs, files in os.walk(abs_path): | |
| if not show_hidden: | |
| dirs[:] = [d for d in dirs if not d.startswith('.')] | |
| files = [f for f in files if not f.startswith('.')] | |
| try: | |
| rel_root = os.path.relpath(root, ROOT_DIR) | |
| except Exception: | |
| rel_root = root | |
| rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/") | |
| lines.append(f"\n📂 {rel_root_disp}") | |
| dirs.sort() | |
| files.sort() | |
| for d in dirs: | |
| p = os.path.join(root, d) | |
| try: | |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') | |
| except Exception: | |
| mtime = "?" | |
| lines.append(f" • [DIR] {d} (modified {mtime})") | |
| total += 1 | |
| if total >= max_entries: | |
| lines.append(f"\n… Truncated at {max_entries} entries.") | |
| return "\n".join(lines).strip() | |
| for f in files: | |
| p = os.path.join(root, f) | |
| try: | |
| size = _fmt_size(os.path.getsize(p)) | |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') | |
| except Exception: | |
| size, mtime = "?", "?" | |
| lines.append(f" • {f} ({size}, modified {mtime})") | |
| total += 1 | |
| if total >= max_entries: | |
| lines.append(f"\n… Truncated at {max_entries} entries.") | |
| return "\n".join(lines).strip() | |
| if not recursive: | |
| break | |
| header = f"Listing of {listing_display}\nRoot: /\nEntries: {total}" | |
| return (header + "\n" + "\n".join(lines)).strip() | |
| def _search_text( | |
| abs_path: str, | |
| query: str, | |
| *, | |
| recursive: bool, | |
| show_hidden: bool, | |
| max_results: int, | |
| case_sensitive: bool, | |
| start_index: int, | |
| ) -> str: | |
| if not os.path.exists(abs_path): | |
| return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) | |
| query = query or "" | |
| normalized_query = query if case_sensitive else query.lower() | |
| if normalized_query == "": | |
| return _err( | |
| "missing_search_query", | |
| "Search query is required for the search action.", | |
| hint="Provide text in the Search field to look for.", | |
| ) | |
| max_results = max(1, int(max_results) if max_results is not None else 20) | |
| start_index = max(0, int(start_index) if start_index is not None else 0) | |
| matches: list[tuple[str, int, str]] = [] | |
| errors: list[str] = [] | |
| files_scanned = 0 | |
| truncated = False | |
| total_matches = 0 | |
| def _should_skip(name: str) -> bool: | |
| return not show_hidden and name.startswith('.') | |
| def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: | |
| nonlocal truncated, total_matches | |
| total_matches += 1 | |
| if total_matches <= start_index: | |
| return False | |
| if len(matches) < max_results: | |
| snippet = line_text.strip() | |
| if len(snippet) > 200: | |
| snippet = snippet[:197] + "…" | |
| matches.append((_display_path(file_path), line_no, snippet)) | |
| return False | |
| truncated = True | |
| return True | |
| def _search_file(file_path: str) -> bool: | |
| nonlocal files_scanned | |
| files_scanned += 1 | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='replace') as handle: | |
| for line_no, line in enumerate(handle, start=1): | |
| haystack = line if case_sensitive else line.lower() | |
| if normalized_query in haystack: | |
| if _handle_match(file_path, line_no, line): | |
| return True | |
| except Exception as exc: | |
| errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})") | |
| return truncated | |
| if os.path.isfile(abs_path): | |
| _search_file(abs_path) | |
| else: | |
| for root, dirs, files in os.walk(abs_path): | |
| dirs[:] = [d for d in dirs if not _should_skip(d)] | |
| visible_files = [f for f in files if show_hidden or not f.startswith('.')] | |
| for name in visible_files: | |
| file_path = os.path.join(root, name) | |
| if _search_file(file_path): | |
| break | |
| if truncated: | |
| break | |
| if not recursive: | |
| break | |
| header_lines = [ | |
| f"Search results for {query!r}", | |
| f"Scope: {_display_path(abs_path)}", | |
| f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", | |
| f"Start offset: {start_index}", | |
| f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), | |
| f"Files scanned: {files_scanned}", | |
| ] | |
| next_cursor = start_index + len(matches) if truncated else None | |
| if truncated: | |
| header_lines.append(f"Matches encountered before truncation: {total_matches}") | |
| header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") | |
| header_lines.append(f"Next cursor: {next_cursor}") | |
| else: | |
| header_lines.append(f"Total matches found: {total_matches}") | |
| header_lines.append("Truncated: no — end of results.") | |
| header_lines.append("Next cursor: None") | |
| if not matches: | |
| if total_matches > 0 and start_index >= total_matches: | |
| hint_limit = max(total_matches - 1, 0) | |
| body_lines = [ | |
| f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", | |
| (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), | |
| ] | |
| body_lines = [line for line in body_lines if line] | |
| else: | |
| body_lines = [ | |
| "No matches found.", | |
| (f"Total matches encountered: {total_matches}." if total_matches else ""), | |
| ] | |
| body_lines = [line for line in body_lines if line] | |
| else: | |
| body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)] | |
| if errors: | |
| shown = errors[:5] | |
| body_lines.extend(["", "Warnings:"]) | |
| body_lines.extend(shown) | |
| if len(errors) > len(shown): | |
| body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") | |
| return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) | |
| def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str: | |
| if not os.path.exists(abs_path): | |
| return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path)) | |
| if os.path.isdir(abs_path): | |
| return _err( | |
| "is_directory", | |
| f"Path is a directory, not a file: {_display_path(abs_path)}", | |
| path=_display_path(abs_path), | |
| hint="Provide a file path.", | |
| ) | |
| try: | |
| with open(abs_path, 'r', encoding='utf-8', errors='replace') as f: | |
| data = f.read() | |
| except Exception as exc: | |
| return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) | |
| total = len(data) | |
| start = max(0, min(offset, total)) | |
| end = total if max_chars <= 0 else min(total, start + max_chars) | |
| chunk = data[start:end] | |
| next_cursor = end if end < total else None | |
| header = ( | |
| f"Reading {_display_path(abs_path)}\n" | |
| f"Offset {start}, returned {len(chunk)} of {total}." | |
| + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") | |
| ) | |
| return header + "\n\n---\n\n" + chunk | |
| def _info(abs_path: str) -> str: | |
| try: | |
| st = os.stat(abs_path) | |
| except Exception as exc: | |
| return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) | |
| info = { | |
| "path": _display_path(abs_path), | |
| "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", | |
| "size": st.st_size, | |
| "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'), | |
| "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'), | |
| "mode": oct(st.st_mode), | |
| "root": "/", | |
| } | |
| return json.dumps(info, indent=2) | |
| def Obsidian_Vault( | |
| action: Annotated[str, "Operation to perform: 'list', 'read', 'info', 'search', 'help'."], | |
| path: Annotated[str, "Target path, relative to the vault root." ] = "/", | |
| query: Annotated[Optional[str], "Text to search for when action=search."] = None, | |
| recursive: Annotated[bool, "Recurse into subfolders when listing/searching."] = False, | |
| show_hidden: Annotated[bool, "Include hidden files when listing/searching."] = False, | |
| max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20, | |
| offset: Annotated[int, "Start offset when reading files."] = 0, | |
| max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000, | |
| case_sensitive: Annotated[bool, "Match case when searching text."] = False, | |
| ) -> str: | |
| _log_call_start( | |
| "Obsidian_Vault", | |
| action=action, | |
| path=path, | |
| query=query, | |
| recursive=recursive, | |
| show_hidden=show_hidden, | |
| max_entries=max_entries, | |
| offset=offset, | |
| max_chars=max_chars, | |
| case_sensitive=case_sensitive, | |
| ) | |
| action = (action or "").strip().lower() | |
| if action not in {"list", "read", "info", "search", "help"}: | |
| result = _err( | |
| "invalid_action", | |
| "Invalid action.", | |
| hint="Choose from: list, read, info, search, help.", | |
| ) | |
| _log_call_end("Obsidian_Vault", _truncate_for_log(result)) | |
| return result | |
| if action == "help": | |
| result = HELP_TEXT | |
| _log_call_end("Obsidian_Vault", _truncate_for_log(result)) | |
| return result | |
| abs_path, err = _resolve_path(path) | |
| if err: | |
| _log_call_end("Obsidian_Vault", _truncate_for_log(err)) | |
| return err | |
| try: | |
| if action == "list": | |
| if not os.path.exists(abs_path): | |
| result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) | |
| else: | |
| result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries) | |
| elif action == "read": | |
| result = _read_file(abs_path, offset=offset, max_chars=max_chars) | |
| elif action == "search": | |
| query_text = query or "" | |
| if query_text.strip() == "": | |
| result = _err( | |
| "missing_search_query", | |
| "Search query is required for the search action.", | |
| hint="Provide text in the Search field to look for.", | |
| ) | |
| else: | |
| result = _search_text( | |
| abs_path, | |
| query_text, | |
| recursive=recursive, | |
| show_hidden=show_hidden, | |
| max_results=max_entries, | |
| case_sensitive=case_sensitive, | |
| start_index=offset, | |
| ) | |
| else: # info | |
| result = _info(abs_path) | |
| except Exception as exc: | |
| result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)}) | |
| _log_call_end("Obsidian_Vault", _truncate_for_log(result)) | |
| return result | |
| def build_interface() -> gr.Interface: | |
| return gr.Interface( | |
| fn=Obsidian_Vault, | |
| inputs=[ | |
| gr.Radio( | |
| label="Action", | |
| choices=["list", "read", "info", "search", "help"], | |
| value="help", | |
| info="Operation to perform", | |
| ), | |
| gr.Textbox(label="Path", placeholder="/ or /Notes/todo.md", max_lines=1, value="/", info="Target path (relative to vault root)"), | |
| gr.Textbox(label="Search text", lines=3, placeholder="Text to search for...", info="Text to search for (Search only)"), | |
| gr.Checkbox(label="Recursive", value=False, info="Recurse into subfolders (List/Search)"), | |
| gr.Checkbox(label="Show hidden", value=False, info="Include hidden files (List/Search)"), | |
| gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches", info="Max entries to list or matches to return (List/Search)"), | |
| gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset", info="Start offset (Read/Search)"), | |
| gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars", info="Max characters to return (Read, 0=all)"), | |
| gr.Checkbox(label="Case sensitive search", value=False, info="Match case (Search)"), | |
| ], | |
| outputs=gr.Textbox(label="Result", lines=20), | |
| title="Obsidian Vault", | |
| description=( | |
| "<div style=\"text-align:center; overflow:hidden;\">Explore and search notes in the vault without modifying them." "</div>" | |
| ), | |
| api_description=TOOL_SUMMARY, | |
| flagging_mode="never", | |
| submit_btn="Run", | |
| ) | |
| __all__ = ["Obsidian_Vault", "build_interface"] | |