| | from __future__ import annotations |
| |
|
| | import json |
| | import os |
| | import re |
| | import shutil |
| | import stat |
| | from datetime import datetime |
| | from typing import Annotated, Optional |
| |
|
| | import gradio as gr |
| |
|
| | from app import _log_call_end, _log_call_start, _truncate_for_log |
| | from ._docstrings import autodoc |
| |
|
| |
|
| | TOOL_SUMMARY = ( |
| | "Browse, search, and manage files within a safe root. " |
| | "Actions: list, read, write, append, mkdir, move, copy, delete, info, search, help. " |
| | "Fill other fields as needed. " |
| | "Use paths like `/` or `/notes/todo.txt` because all paths are relative to the root (`/`). " |
| | "Use 'help' to see action-specific required fields and examples." |
| | ) |
| |
|
| | HELP_TEXT = ( |
| | "File System — actions and usage\n\n" |
| | "Root: paths resolve under Nymbo-Tools/Filesystem by default (or NYMBO_TOOLS_ROOT if set). " |
| | "Start paths with '/' to refer to the tool root (e.g., /notes). " |
| | "Absolute paths are disabled unless UNSAFE_ALLOW_ABS_PATHS=1.\n\n" |
| | "Actions and fields:\n" |
| | "- list: path='/' (default), recursive=false, show_hidden=false, max_entries=20\n" |
| | "- read: path (e.g., /notes/todo.txt), offset=0, max_chars=4000 (shows next_cursor when truncated)\n" |
| | "- write: path, content (UTF-8), create_dirs=true\n" |
| | "- append: path, content (UTF-8), create_dirs=true\n" |
| | "- mkdir: path (directory), exist_ok=true\n" |
| | "- move: path (src), dest_path (dst), overwrite=false\n" |
| | "- copy: path (src), dest_path (dst), overwrite=false\n" |
| | "- delete: path, recursive=true (required for directories)\n" |
| | "- info: path\n" |
| | "- search: path (dir or file), content=query text, recursive=false, show_hidden=false, max_entries=20, case_sensitive=false, offset=0\n" |
| | "- help: show this guide\n\n" |
| | "Errors are returned as JSON with fields: {status:'error', code, message, path?, hint?, data?}.\n\n" |
| | "Examples:\n" |
| | "- list current: action=list, path='/'\n" |
| | "- make folder: action=mkdir, path='/notes'\n" |
| | "- write file: action=write, path='/notes/todo.txt', content='hello'\n" |
| | "- read file: action=read, path='/notes/todo.txt', max_chars=200\n" |
| | "- move file: action=move, path='/notes/todo.txt', dest_path='/notes/todo-old.txt', overwrite=true\n" |
| | "- delete dir: action=delete, path='/notes', recursive=true\n" |
| | "- search text: action=search, path='/notes', content='TODO', recursive=true, max_entries=50\n" |
| | "- page search results: action=search, content='TODO', offset=10\n" |
| | "- case-sensitive search: action=search, content='TODO', case_sensitive=true\n" |
| | ) |
| |
|
| |
|
| | def _default_root() -> str: |
| | |
| | root = os.getenv("NYMBO_TOOLS_ROOT") |
| | if root and root.strip(): |
| | return os.path.abspath(os.path.expanduser(root.strip())) |
| | |
| | try: |
| | here = os.path.abspath(__file__) |
| | tools_dir = os.path.dirname(os.path.dirname(here)) |
| | default_root = os.path.abspath(os.path.join(tools_dir, "Filesystem")) |
| | return default_root |
| | except Exception: |
| | |
| | return os.path.abspath(os.getcwd()) |
| |
|
| |
|
| | ROOT_DIR = _default_root() |
| | |
| | try: |
| | os.makedirs(ROOT_DIR, exist_ok=True) |
| | except Exception: |
| | pass |
| | ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) |
| |
|
| | def _safe_err(exc: Exception | str) -> str: |
| | """Return an error string with any absolute root replaced by '/' and slashes normalized. |
| | This handles variants like backslashes and duplicate slashes in OS messages. |
| | """ |
| | s = str(exc) |
| | |
| | s_norm = s.replace("\\", "/") |
| | root_fwd = ROOT_DIR.replace("\\", "/") |
| | |
| | root_variants = {ROOT_DIR, root_fwd, re.sub(r"/+", "/", root_fwd)} |
| | for variant in root_variants: |
| | if variant: |
| | s_norm = s_norm.replace(variant, "/") |
| | |
| | s_norm = re.sub(r"/+", "/", s_norm) |
| | return s_norm |
| |
|
| |
|
| | def _err(code: str, message: str, *, path: Optional[str] = None, hint: Optional[str] = None, data: Optional[dict] = None) -> str: |
| | """Return a structured error JSON string. |
| | Fields: status='error', code, message, path?, hint?, data?, root='/' |
| | """ |
| | payload = { |
| | "status": "error", |
| | "code": code, |
| | "message": message, |
| | "root": "/", |
| | } |
| | if path is not None and path != "": |
| | payload["path"] = path |
| | if hint: |
| | payload["hint"] = hint |
| | if data: |
| | payload["data"] = data |
| | return json.dumps(payload, ensure_ascii=False) |
| |
|
| |
|
| | def _resolve_path(path: str) -> tuple[str, str]: |
| | """ |
| | Resolve a user-provided path to an absolute, normalized path constrained to ROOT_DIR |
| | (unless UNSAFE_ALLOW_ABS_PATHS=1). Returns (abs_path, error_message). error_message is empty when ok. |
| | """ |
| | try: |
| | user_input = (path or "/").strip() or "/" |
| | if user_input.startswith("/"): |
| | |
| | rel_part = user_input.lstrip("/") or "." |
| | raw = os.path.expanduser(rel_part) |
| | treat_as_relative = True |
| | else: |
| | raw = os.path.expanduser(user_input) |
| | treat_as_relative = False |
| |
|
| | if not treat_as_relative and os.path.isabs(raw): |
| | if not ALLOW_ABS: |
| | |
| | return "", _err( |
| | "absolute_path_disabled", |
| | "Absolute paths are disabled in safe mode.", |
| | path=raw.replace("\\", "/"), |
| | hint="Use a path relative to / (e.g., /notes/todo.txt)." |
| | ) |
| | abs_path = os.path.abspath(raw) |
| | else: |
| | abs_path = os.path.abspath(os.path.join(ROOT_DIR, raw)) |
| | |
| | if not ALLOW_ABS: |
| | try: |
| | common = os.path.commonpath([os.path.normpath(ROOT_DIR), os.path.normpath(abs_path)]) |
| | except Exception: |
| | |
| | root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) |
| | abs_cmp = os.path.normcase(os.path.normpath(abs_path)) |
| | if not abs_cmp.startswith(root_cmp): |
| | return "", _err( |
| | "path_outside_root", |
| | "Path not allowed outside root.", |
| | path=user_input.replace("\\", "/"), |
| | hint="Use a path under / (the tool's root)." |
| | ) |
| | else: |
| | root_cmp = os.path.normcase(os.path.normpath(ROOT_DIR)) |
| | common_cmp = os.path.normcase(os.path.normpath(common)) |
| | if common_cmp != root_cmp: |
| | return "", _err( |
| | "path_outside_root", |
| | "Path not allowed outside root.", |
| | path=user_input.replace("\\", "/"), |
| | hint="Use a path under / (the tool's root)." |
| | ) |
| | return abs_path, "" |
| | except Exception as exc: |
| | return "", _err( |
| | "resolve_path_failed", |
| | "Failed to resolve path.", |
| | path=(path or ""), |
| | data={"error": _safe_err(exc)} |
| | ) |
| |
|
| |
|
| | def _fmt_size(num_bytes: int) -> str: |
| | units = ["B", "KB", "MB", "GB", "TB"] |
| | size = float(num_bytes) |
| | for unit in units: |
| | if size < 1024.0: |
| | return f"{size:.1f} {unit}" |
| | size /= 1024.0 |
| | return f"{size:.1f} PB" |
| |
|
| |
|
| | def _display_path(abs_path: str) -> str: |
| | """Return a user-friendly path relative to ROOT_DIR using forward slashes. |
| | Example: ROOT_DIR -> '/', a file under it -> '/sub/dir/file.txt'.""" |
| | try: |
| | norm_root = os.path.normpath(ROOT_DIR) |
| | norm_abs = os.path.normpath(abs_path) |
| | common = os.path.commonpath([norm_root, norm_abs]) |
| | if os.path.normcase(common) == os.path.normcase(norm_root): |
| | rel = os.path.relpath(norm_abs, norm_root) |
| | if rel == ".": |
| | return "/" |
| | return "/" + rel.replace("\\", "/") |
| | except Exception: |
| | pass |
| | |
| | return abs_path.replace("\\", "/") |
| |
|
| |
|
| | def _list_dir(abs_path: str, *, show_hidden: bool, recursive: bool, max_entries: int) -> str: |
| | lines: list[str] = [] |
| | total = 0 |
| | root_display = "/" |
| | listing_display = _display_path(abs_path) |
| | for root, dirs, files in os.walk(abs_path): |
| | |
| | if not show_hidden: |
| | dirs[:] = [d for d in dirs if not d.startswith('.')] |
| | files = [f for f in files if not f.startswith('.')] |
| | try: |
| | rel_root = os.path.relpath(root, ROOT_DIR) |
| | except Exception: |
| | rel_root = root |
| | rel_root_disp = "/" if rel_root == "." else "/" + rel_root.replace("\\", "/") |
| | lines.append(f"\n📂 {rel_root_disp}") |
| | |
| | dirs.sort() |
| | files.sort() |
| | for d in dirs: |
| | p = os.path.join(root, d) |
| | try: |
| | mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') |
| | except Exception: |
| | mtime = "?" |
| | lines.append(f" • [DIR] {d} (modified {mtime})") |
| | total += 1 |
| | if total >= max_entries: |
| | lines.append(f"\n… Truncated at {max_entries} entries.") |
| | return "\n".join(lines).strip() |
| | for f in files: |
| | p = os.path.join(root, f) |
| | try: |
| | size = _fmt_size(os.path.getsize(p)) |
| | mtime = datetime.fromtimestamp(os.path.getmtime(p)).isoformat(sep=' ', timespec='seconds') |
| | except Exception: |
| | size, mtime = "?", "?" |
| | lines.append(f" • {f} ({size}, modified {mtime})") |
| | total += 1 |
| | if total >= max_entries: |
| | lines.append(f"\n… Truncated at {max_entries} entries.") |
| | return "\n".join(lines).strip() |
| | if not recursive: |
| | break |
| | header = f"Listing of {listing_display}\nRoot: {root_display}\nEntries: {total}" |
| | return (header + "\n" + "\n".join(lines)).strip() |
| |
|
| |
|
| | def _search_text( |
| | abs_path: str, |
| | query: str, |
| | *, |
| | recursive: bool, |
| | show_hidden: bool, |
| | max_results: int, |
| | case_sensitive: bool, |
| | start_index: int, |
| | ) -> str: |
| | if not os.path.exists(abs_path): |
| | return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
| |
|
| | query = query or "" |
| | normalized_query = query if case_sensitive else query.lower() |
| | if normalized_query == "": |
| | return _err( |
| | "missing_search_query", |
| | "Search query is required for the search action.", |
| | hint="Provide text in the Content field to search for.", |
| | ) |
| |
|
| | max_results = max(1, int(max_results) if max_results is not None else 20) |
| | start_index = max(0, int(start_index) if start_index is not None else 0) |
| | matches: list[tuple[str, int, str]] = [] |
| | errors: list[str] = [] |
| | files_scanned = 0 |
| | truncated = False |
| | total_matches = 0 |
| |
|
| | def _should_skip(name: str) -> bool: |
| | return not show_hidden and name.startswith('.') |
| |
|
| | def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: |
| | nonlocal truncated, total_matches |
| | total_matches += 1 |
| | if total_matches <= start_index: |
| | return False |
| | if len(matches) < max_results: |
| | snippet = line_text.strip() |
| | if len(snippet) > 200: |
| | snippet = snippet[:197] + "…" |
| | matches.append((_display_path(file_path), line_no, snippet)) |
| | return False |
| | truncated = True |
| | return True |
| |
|
| | def _search_file(file_path: str) -> bool: |
| | nonlocal files_scanned |
| | files_scanned += 1 |
| | try: |
| | with open(file_path, 'r', encoding='utf-8', errors='replace') as handle: |
| | for line_no, line in enumerate(handle, start=1): |
| | haystack = line if case_sensitive else line.lower() |
| | if normalized_query in haystack: |
| | if _handle_match(file_path, line_no, line): |
| | return True |
| | except Exception as exc: |
| | errors.append(f"{_display_path(file_path)} ({_safe_err(exc)})") |
| | return truncated |
| |
|
| | if os.path.isfile(abs_path): |
| | _search_file(abs_path) |
| | else: |
| | for root, dirs, files in os.walk(abs_path): |
| | dirs[:] = [d for d in dirs if not _should_skip(d)] |
| | visible_files = [f for f in files if show_hidden or not f.startswith('.')] |
| | for name in visible_files: |
| | file_path = os.path.join(root, name) |
| | if _search_file(file_path): |
| | break |
| | if truncated: |
| | break |
| | if not recursive: |
| | break |
| |
|
| | header_lines = [ |
| | f"Search results for {query!r}", |
| | f"Scope: {_display_path(abs_path)}", |
| | f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", |
| | f"Start offset: {start_index}", |
| | f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), |
| | f"Files scanned: {files_scanned}", |
| | ] |
| |
|
| | next_cursor = start_index + len(matches) if truncated else None |
| |
|
| | if truncated: |
| | header_lines.append(f"Matches encountered before truncation: {total_matches}") |
| | header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") |
| | header_lines.append(f"Next cursor: {next_cursor}") |
| | else: |
| | header_lines.append(f"Total matches found: {total_matches}") |
| | header_lines.append("Truncated: no — end of results.") |
| | header_lines.append("Next cursor: None") |
| |
|
| | if not matches: |
| | if total_matches > 0 and start_index >= total_matches: |
| | hint_limit = max(total_matches - 1, 0) |
| | body_lines = [ |
| | f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", |
| | (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), |
| | ] |
| | body_lines = [line for line in body_lines if line] |
| | else: |
| | body_lines = [ |
| | "No matches found.", |
| | (f"Total matches encountered: {total_matches}." if total_matches else ""), |
| | ] |
| | body_lines = [line for line in body_lines if line] |
| | else: |
| | body_lines = [f"{idx}. {path}:{line_no}: {text}" for idx, (path, line_no, text) in enumerate(matches, start=1)] |
| |
|
| | if errors: |
| | shown = errors[:5] |
| | body_lines.extend(["", "Warnings:"]) |
| | body_lines.extend(shown) |
| | if len(errors) > len(shown): |
| | body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") |
| |
|
| | return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) |
| |
|
| |
|
| | def _read_file(abs_path: str, *, offset: int, max_chars: int) -> str: |
| | if not os.path.exists(abs_path): |
| | return _err("file_not_found", f"File not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
| | if os.path.isdir(abs_path): |
| | return _err("is_directory", f"Path is a directory, not a file: {_display_path(abs_path)}", path=_display_path(abs_path), hint="Provide a file path.") |
| | try: |
| | with open(abs_path, 'r', encoding='utf-8', errors='replace') as f: |
| | data = f.read() |
| | except Exception as exc: |
| | return _err("read_failed", "Failed to read file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
| | total = len(data) |
| | start = max(0, min(offset, total)) |
| | if max_chars > 0: |
| | end = min(total, start + max_chars) |
| | else: |
| | end = total |
| | chunk = data[start:end] |
| | next_cursor = end if end < total else None |
| | meta = { |
| | "offset": start, |
| | "returned": len(chunk), |
| | "total": total, |
| | "next_cursor": next_cursor, |
| | "path": _display_path(abs_path), |
| | } |
| | header = ( |
| | f"Reading {_display_path(abs_path)}\n" |
| | f"Offset {start}, returned {len(chunk)} of {total}." |
| | + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") |
| | ) |
| | sep = "\n\n---\n\n" |
| | return header + sep + chunk |
| |
|
| |
|
| | def _ensure_parent(abs_path: str, create_dirs: bool) -> None: |
| | parent = os.path.dirname(abs_path) |
| | if parent and not os.path.exists(parent): |
| | if create_dirs: |
| | os.makedirs(parent, exist_ok=True) |
| | else: |
| | raise FileNotFoundError(f"Parent directory does not exist: {_display_path(parent)}") |
| |
|
| |
|
| | def _write_file(abs_path: str, content: str, *, append: bool, create_dirs: bool) -> str: |
| | try: |
| | _ensure_parent(abs_path, create_dirs) |
| | mode = 'a' if append else 'w' |
| | with open(abs_path, mode, encoding='utf-8') as f: |
| | f.write(content or "") |
| | return f"{'Appended to' if append else 'Wrote'} file: {_display_path(abs_path)} (chars={len(content or '')})" |
| | except Exception as exc: |
| | return _err("write_failed", "Failed to write file.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
| |
|
| |
|
| | def _mkdir(abs_path: str, exist_ok: bool) -> str: |
| | try: |
| | os.makedirs(abs_path, exist_ok=exist_ok) |
| | return f"Created directory: {_display_path(abs_path)}" |
| | except Exception as exc: |
| | return _err("mkdir_failed", "Failed to create directory.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
| |
|
| |
|
| | def _move_copy(action: str, src: str, dst: str, *, overwrite: bool) -> str: |
| | try: |
| | if not os.path.exists(src): |
| | return _err("source_not_found", f"Source not found: {_display_path(src)}", path=_display_path(src)) |
| | if os.path.isdir(dst): |
| | |
| | dst_path = os.path.join(dst, os.path.basename(src)) |
| | else: |
| | dst_path = dst |
| | if os.path.exists(dst_path): |
| | if overwrite: |
| | if os.path.isdir(dst_path): |
| | shutil.rmtree(dst_path) |
| | else: |
| | os.remove(dst_path) |
| | else: |
| | return _err( |
| | "destination_exists", |
| | f"Destination already exists: {_display_path(dst_path)}", |
| | path=_display_path(dst_path), |
| | hint="Set overwrite=True to replace the destination." |
| | ) |
| | if action == 'move': |
| | shutil.move(src, dst_path) |
| | else: |
| | if os.path.isdir(src): |
| | shutil.copytree(src, dst_path) |
| | else: |
| | shutil.copy2(src, dst_path) |
| | return f"{action.capitalize()}d: {_display_path(src)} -> {_display_path(dst_path)}" |
| | except Exception as exc: |
| | return _err(f"{action}_failed", f"Failed to {action}.", path=_display_path(src), data={"error": _safe_err(exc), "destination": _display_path(dst)}) |
| |
|
| |
|
| | def _delete(abs_path: str, *, recursive: bool) -> str: |
| | try: |
| | if not os.path.exists(abs_path): |
| | return _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
| | if os.path.isdir(abs_path): |
| | if not recursive: |
| | |
| | return _err("requires_recursive", "Refusing to delete a directory without recursive=True", path=_display_path(abs_path), hint="Pass recursive=True to delete a directory.") |
| | shutil.rmtree(abs_path) |
| | else: |
| | os.remove(abs_path) |
| | return f"Deleted: {_display_path(abs_path)}" |
| | except Exception as exc: |
| | return _err("delete_failed", "Failed to delete path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
| |
|
| |
|
| | def _info(abs_path: str) -> str: |
| | try: |
| | st = os.stat(abs_path) |
| | except Exception as exc: |
| | return _err("stat_failed", "Failed to stat path.", path=_display_path(abs_path), data={"error": _safe_err(exc)}) |
| | info = { |
| | "path": _display_path(abs_path), |
| | "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", |
| | "size": st.st_size, |
| | "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=' ', timespec='seconds'), |
| | "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=' ', timespec='seconds'), |
| | "mode": oct(st.st_mode), |
| | "root": "/", |
| | } |
| | return json.dumps(info, indent=2) |
| |
|
| |
|
| | @autodoc(summary=TOOL_SUMMARY) |
| | def File_System( |
| | action: Annotated[str, "Operation to perform: 'list', 'read', 'write', 'append', 'mkdir', 'move', 'copy', 'delete', 'info', 'search'."], |
| | path: Annotated[str, "Target path, relative to root unless UNSAFE_ALLOW_ABS_PATHS=1."] = "/", |
| | content: Annotated[Optional[str], "Content for write/append actions or search query (UTF-8)."] = None, |
| | dest_path: Annotated[Optional[str], "Destination for move/copy (relative to root unless unsafe absolute allowed)."] = None, |
| | recursive: Annotated[bool, "For list/search (recurse into subfolders) and delete (required for directories)."] = False, |
| | show_hidden: Annotated[bool, "Include hidden files (dotfiles) for list/search."] = False, |
| | max_entries: Annotated[int, "Max entries to list or matches to return (for list/search)."] = 20, |
| | offset: Annotated[int, "Start offset for reading files (for read)."] = 0, |
| | max_chars: Annotated[int, "Max characters to return when reading (0 = full file)."] = 4000, |
| | create_dirs: Annotated[bool, "Create parent directories for write/append if missing."] = True, |
| | overwrite: Annotated[bool, "Allow overwrite for move/copy destinations."] = False, |
| | case_sensitive: Annotated[bool, "Match case when searching text."] = False, |
| | ) -> str: |
| | _log_call_start( |
| | "File_System", |
| | action=action, |
| | path=path, |
| | dest_path=dest_path, |
| | recursive=recursive, |
| | show_hidden=show_hidden, |
| | max_entries=max_entries, |
| | offset=offset, |
| | max_chars=max_chars, |
| | create_dirs=create_dirs, |
| | overwrite=overwrite, |
| | case_sensitive=case_sensitive, |
| | ) |
| | action = (action or "").strip().lower() |
| | if action not in {"list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "search", "help"}: |
| | result = _err( |
| | "invalid_action", |
| | "Invalid action.", |
| | hint="Choose from: list, read, write, append, mkdir, move, copy, delete, info, search, help." |
| | ) |
| | _log_call_end("File_System", _truncate_for_log(result)) |
| | return result |
| |
|
| | abs_path, err = _resolve_path(path) |
| | if err: |
| | _log_call_end("File_System", _truncate_for_log(err)) |
| | return err |
| |
|
| | try: |
| | if action == "help": |
| | result = HELP_TEXT |
| | elif action == "list": |
| | if not os.path.exists(abs_path): |
| | result = _err("path_not_found", f"Path not found: {_display_path(abs_path)}", path=_display_path(abs_path)) |
| | else: |
| | result = _list_dir(abs_path, show_hidden=show_hidden, recursive=recursive, max_entries=max_entries) |
| | elif action == "read": |
| | result = _read_file(abs_path, offset=offset, max_chars=max_chars) |
| | elif action in {"write", "append"}: |
| | |
| | if _display_path(abs_path) == "/" or os.path.isdir(abs_path): |
| | result = _err( |
| | "invalid_write_path", |
| | "Invalid path for write/append.", |
| | path=_display_path(abs_path), |
| | hint="Provide a file path under / (e.g., /notes/todo.txt)." |
| | ) |
| | else: |
| | result = _write_file(abs_path, content or "", append=(action == "append"), create_dirs=create_dirs) |
| | elif action == "mkdir": |
| | result = _mkdir(abs_path, exist_ok=True) |
| | elif action in {"move", "copy"}: |
| | if not dest_path: |
| | result = _err("missing_dest_path", "dest_path is required for move/copy (ignored for other actions).") |
| | else: |
| | abs_dst, err2 = _resolve_path(dest_path) |
| | if err2: |
| | result = err2 |
| | else: |
| | result = _move_copy(action, abs_path, abs_dst, overwrite=overwrite) |
| | elif action == "delete": |
| | result = _delete(abs_path, recursive=recursive) |
| | elif action == "search": |
| | query_text = content or "" |
| | if query_text.strip() == "": |
| | result = _err( |
| | "missing_search_query", |
| | "Search query is required for the search action.", |
| | hint="Provide text in the Content field to search for.", |
| | ) |
| | else: |
| | result = _search_text( |
| | abs_path, |
| | query_text, |
| | recursive=recursive, |
| | show_hidden=show_hidden, |
| | max_results=max_entries, |
| | case_sensitive=case_sensitive, |
| | start_index=offset, |
| | ) |
| | else: |
| | result = _info(abs_path) |
| | except Exception as exc: |
| | result = _err("exception", "Unhandled error during operation.", data={"error": _safe_err(exc)}) |
| |
|
| | _log_call_end("File_System", _truncate_for_log(result)) |
| | return result |
| |
|
| |
|
| | def build_interface() -> gr.Interface: |
| | return gr.Interface( |
| | fn=File_System, |
| | inputs=[ |
| | gr.Radio( |
| | label="Action", |
| | choices=["list", "read", "write", "append", "mkdir", "move", "copy", "delete", "info", "search", "help"], |
| | value="help", |
| | ), |
| | gr.Textbox(label="Path", placeholder="/ or /src/file.txt", max_lines=1, value="/"), |
| | gr.Textbox(label="Content (write/append/search)", lines=6, placeholder="Text to write or search for..."), |
| | gr.Textbox(label="Destination (for move/copy)", max_lines=1), |
| | gr.Checkbox(label="Recursive (list/delete/search)", value=False), |
| | gr.Checkbox(label="Show hidden (list/search)", value=False), |
| | gr.Slider(minimum=10, maximum=5000, step=10, value=20, label="Max entries / matches (list/search)"), |
| | gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset (read/search start)"), |
| | gr.Slider(minimum=0, maximum=100_000, step=500, value=4000, label="Max chars (read, 0=all)"), |
| | gr.Checkbox(label="Create parent dirs (write)", value=True), |
| | gr.Checkbox(label="Overwrite destination (move/copy)", value=False), |
| | gr.Checkbox(label="Case sensitive search", value=False), |
| | ], |
| | outputs=gr.Textbox(label="Result", lines=20), |
| | title="File System", |
| | description=( |
| | "<div id=\"fs-desc\" style=\"text-align:center; overflow:hidden;\">Browse, search, and interact with a filesystem. " |
| | "Choose an action and fill optional fields as needed." |
| | "</div>" |
| | ), |
| | api_description=TOOL_SUMMARY, |
| | flagging_mode="never", |
| | submit_btn="Run", |
| | ) |
| |
|
| |
|
| | __all__ = ["File_System", "build_interface"] |
| |
|