| | """ |
| | Core shared utilities for the Nymbo-Tools MCP server. |
| | |
| | Consolidates three key areas: |
| | 1. Sandboxed filesystem operations (path resolution, reading, writing, safe_open) |
| | 2. Sandboxed Python execution (code interpreter, agent terminal) |
| | 3. Hugging Face inference utilities (token, providers, error handling) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import ast |
| | import json |
| | import os |
| | import re |
| | import stat |
| | import sys |
| | from datetime import datetime |
| | from io import StringIO |
| | from typing import Any, Callable, Optional, TypeVar |
| |
|
| | import gradio as gr |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _fmt_size(num_bytes: int) -> str: |
| | """Format byte size as human-readable string.""" |
| | units = ["B", "KB", "MB", "GB"] |
| | size = float(num_bytes) |
| | for unit in units: |
| | if size < 1024.0: |
| | return f"{size:.1f} {unit}" |
| | size /= 1024.0 |
| | return f"{size:.1f} TB" |
| |
|
| |
|
| | def build_tree(entries: list[tuple[str, dict]]) -> dict: |
| | """ |
| | Build a nested tree structure from flat path entries. |
| | |
| | Args: |
| | entries: List of (path, metadata) tuples where path uses forward slashes. |
| | Paths ending with '/' are treated as directories. |
| | |
| | Returns: |
| | Nested dict with "__files__" key for files at each level. |
| | """ |
| | root: dict = {"__files__": []} |
| | |
| | for path, metadata in entries: |
| | parts = path.rstrip("/").split("/") |
| | is_dir = path.endswith("/") |
| | |
| | node = root |
| | for i, part in enumerate(parts[:-1]): |
| | if part not in node: |
| | node[part] = {"__files__": []} |
| | node = node[part] |
| | |
| | final = parts[-1] |
| | if is_dir: |
| | if final not in node: |
| | node[final] = {"__files__": []} |
| | if metadata: |
| | node[final]["__meta__"] = metadata |
| | else: |
| | node["__files__"].append((final, metadata)) |
| | |
| | return root |
| |
|
| |
|
| | def render_tree( |
| | node: dict, |
| | prefix: str = "", |
| | format_entry: Optional[Callable[[str, dict, bool], str]] = None, |
| | ) -> list[str]: |
| | """ |
| | Render a tree with line connectors. |
| | |
| | Args: |
| | node: Nested dict from build_tree() |
| | prefix: Current line prefix for indentation |
| | format_entry: Optional callback to format each entry. |
| | |
| | Returns: |
| | List of formatted lines. |
| | """ |
| | result = [] |
| | |
| | def default_format(name: str, meta: dict, is_dir: bool) -> str: |
| | if is_dir: |
| | return f"{name}/" |
| | size = meta.get("size") |
| | if size is not None: |
| | return f"{name} ({_fmt_size(size)})" |
| | return name |
| | |
| | fmt = format_entry or default_format |
| | |
| | entries = [] |
| | subdirs = sorted(k for k in node.keys() if k not in ("__files__", "__meta__")) |
| | files_here = sorted(node.get("__files__", []), key=lambda x: x[0]) |
| | |
| | for dirname in subdirs: |
| | dir_meta = node[dirname].get("__meta__", {}) |
| | entries.append(("dir", dirname, node[dirname], dir_meta)) |
| | for fname, fmeta in files_here: |
| | entries.append(("file", fname, None, fmeta)) |
| | |
| | for i, entry in enumerate(entries): |
| | is_last = (i == len(entries) - 1) |
| | connector = "└── " if is_last else "├── " |
| | child_prefix = prefix + (" " if is_last else "│ ") |
| | |
| | etype, name, subtree, meta = entry |
| | |
| | if etype == "dir": |
| | result.append(f"{prefix}{connector}{fmt(name, meta, True)}") |
| | result.extend(render_tree(subtree, child_prefix, format_entry)) |
| | else: |
| | result.append(f"{prefix}{connector}{fmt(name, meta, False)}") |
| | |
| | return result |
| |
|
| |
|
| | def walk_and_build_tree( |
| | abs_path: str, |
| | *, |
| | show_hidden: bool = False, |
| | recursive: bool = False, |
| | max_entries: int = 100, |
| | ) -> tuple[dict, int, bool]: |
| | """ |
| | Walk a directory and build a tree structure. |
| | |
| | Returns: |
| | (tree, total_entries, truncated) |
| | """ |
| | entries: list[tuple[str, dict]] = [] |
| | total = 0 |
| | truncated = False |
| | |
| | for root, dirs, files in os.walk(abs_path): |
| | if not show_hidden: |
| | dirs[:] = [d for d in dirs if not d.startswith('.')] |
| | files = [f for f in files if not f.startswith('.')] |
| | |
| | dirs.sort() |
| | files.sort() |
| | |
| | try: |
| | rel_root = os.path.relpath(root, abs_path) |
| | except Exception: |
| | rel_root = "" |
| | prefix = "" if rel_root == "." else rel_root.replace("\\", "/") + "/" |
| | |
| | for d in dirs: |
| | p = os.path.join(root, d) |
| | try: |
| | mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
| | except Exception: |
| | mtime = "?" |
| | entries.append((f"{prefix}{d}/", {"mtime": mtime})) |
| | total += 1 |
| | if total >= max_entries: |
| | truncated = True |
| | break |
| | |
| | if truncated: |
| | break |
| | |
| | for f in files: |
| | p = os.path.join(root, f) |
| | try: |
| | size = os.path.getsize(p) |
| | mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
| | except Exception: |
| | size, mtime = 0, "?" |
| | entries.append((f"{prefix}{f}", {"size": size, "mtime": mtime})) |
| | total += 1 |
| | if total >= max_entries: |
| | truncated = True |
| | break |
| | |
| | if truncated: |
| | break |
| | |
| | if not recursive: |
| | break |
| | |
| | return build_tree(entries), total, truncated |
| |
|
| |
|
| | def format_dir_listing( |
| | abs_path: str, |
| | display_path: str, |
| | *, |
| | show_hidden: bool = False, |
| | recursive: bool = False, |
| | max_entries: int = 100, |
| | fmt_size_fn: Optional[Callable[[int], str]] = None, |
| | ) -> str: |
| | """Format a directory listing as a visual tree.""" |
| | fmt_size = fmt_size_fn or _fmt_size |
| | |
| | tree, total, truncated = walk_and_build_tree( |
| | abs_path, |
| | show_hidden=show_hidden, |
| | recursive=recursive, |
| | max_entries=max_entries, |
| | ) |
| | |
| | def format_entry(name: str, meta: dict, is_dir: bool) -> str: |
| | mtime = meta.get("mtime", "") |
| | if is_dir: |
| | return f"{name}/ ({mtime})" |
| | size = meta.get("size", 0) |
| | return f"{name} ({fmt_size(size)}, {mtime})" |
| | |
| | tree_lines = render_tree(tree, " ", format_entry) |
| | |
| | header = f"Listing of {display_path}\nRoot: /\nEntries: {total}" |
| | if truncated: |
| | header += f"\n… Truncated at {max_entries} entries." |
| | |
| | lines = [header, "", "└── /"] |
| | lines.extend(tree_lines) |
| | |
| | return "\n".join(lines).strip() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class SandboxedRoot: |
| | """ |
| | A configurable sandboxed root directory with path resolution and safety checks. |
| | |
| | Args: |
| | root_dir: Absolute path to the sandbox root. |
| | allow_abs: If True, allow absolute paths outside the sandbox. |
| | """ |
| | |
| | def __init__(self, root_dir: str, allow_abs: bool = False): |
| | self.root_dir = os.path.abspath(root_dir) |
| | self.allow_abs = allow_abs |
| | |
| | try: |
| | os.makedirs(self.root_dir, exist_ok=True) |
| | except Exception: |
| | pass |
| | |
| | def safe_err(self, exc: Exception | str) -> str: |
| | """Return an error string with any absolute root replaced by '/' and slashes normalized.""" |
| | s = str(exc) |
| | s_norm = s.replace("\\", "/") |
| | root_fwd = self.root_dir.replace("\\", "/") |
| | root_variants = {self.root_dir, root_fwd, re.sub(r"/+", "/", root_fwd)} |
| | for variant in root_variants: |
| | if variant: |
| | s_norm = s_norm.replace(variant, "/") |
| | s_norm = re.sub(r"/+", "/", s_norm) |
| | return s_norm |
| |
|
| | def err( |
| | self, |
| | code: str, |
| | message: str, |
| | *, |
| | path: Optional[str] = None, |
| | hint: Optional[str] = None, |
| | data: Optional[dict] = None, |
| | ) -> str: |
| | """Return a structured error JSON string.""" |
| | payload = { |
| | "status": "error", |
| | "code": code, |
| | "message": message, |
| | "root": "/", |
| | } |
| | if path is not None and path != "": |
| | payload["path"] = path |
| | if hint: |
| | payload["hint"] = hint |
| | if data: |
| | payload["data"] = data |
| | return json.dumps(payload, ensure_ascii=False) |
| |
|
| | def display_path(self, abs_path: str) -> str: |
| | """Return a user-friendly path relative to root using forward slashes.""" |
| | try: |
| | norm_root = os.path.normpath(self.root_dir) |
| | norm_abs = os.path.normpath(abs_path) |
| | common = os.path.commonpath([norm_root, norm_abs]) |
| | if os.path.normcase(common) == os.path.normcase(norm_root): |
| | rel = os.path.relpath(norm_abs, norm_root) |
| | if rel == ".": |
| | return "/" |
| | return "/" + rel.replace("\\", "/") |
| | except Exception: |
| | pass |
| | return abs_path.replace("\\", "/") |
| |
|
| | def resolve_path(self, path: str) -> tuple[str, str]: |
| | """ |
| | Resolve a user-provided path to an absolute, normalized path constrained to root. |
| | Returns (abs_path, error_message). error_message is empty when ok. |
| | """ |
| | try: |
| | user_input = (path or "/").strip() or "/" |
| | if user_input.startswith("/"): |
| | rel_part = user_input.lstrip("/") or "." |
| | raw = os.path.expanduser(rel_part) |
| | treat_as_relative = True |
| | else: |
| | raw = os.path.expanduser(user_input) |
| | treat_as_relative = False |
| |
|
| | if not treat_as_relative and os.path.isabs(raw): |
| | if not self.allow_abs: |
| | return "", self.err( |
| | "absolute_path_disabled", |
| | "Absolute paths are disabled in safe mode.", |
| | path=raw.replace("\\", "/"), |
| | hint="Use a path relative to / (e.g., /notes/todo.txt).", |
| | ) |
| | abs_path = os.path.abspath(raw) |
| | else: |
| | abs_path = os.path.abspath(os.path.join(self.root_dir, raw)) |
| | |
| | |
| | if not self.allow_abs: |
| | try: |
| | common = os.path.commonpath( |
| | [os.path.normpath(self.root_dir), os.path.normpath(abs_path)] |
| | ) |
| | if common != os.path.normpath(self.root_dir): |
| | return "", self.err( |
| | "path_outside_root", |
| | "Path is outside the sandbox root.", |
| | path=abs_path, |
| | ) |
| | except Exception: |
| | return "", self.err( |
| | "path_outside_root", |
| | "Path is outside the sandbox root.", |
| | path=abs_path, |
| | ) |
| |
|
| | return abs_path, "" |
| | except Exception as exc: |
| | return "", self.err( |
| | "resolve_path_failed", |
| | "Failed to resolve path.", |
| | path=(path or ""), |
| | data={"error": self.safe_err(exc)}, |
| | ) |
| |
|
| | def safe_open(self, file, *args, **kwargs): |
| | """A drop-in replacement for open() that enforces sandbox constraints.""" |
| | if isinstance(file, int): |
| | return open(file, *args, **kwargs) |
| | |
| | path_str = os.fspath(file) |
| | abs_path, err = self.resolve_path(path_str) |
| | if err: |
| | try: |
| | msg = json.loads(err)["message"] |
| | except Exception: |
| | msg = err |
| | raise PermissionError(f"Sandboxed open() failed: {msg}") |
| | |
| | return open(abs_path, *args, **kwargs) |
| |
|
| | def list_dir( |
| | self, |
| | abs_path: str, |
| | *, |
| | show_hidden: bool = False, |
| | recursive: bool = False, |
| | max_entries: int = 100, |
| | ) -> str: |
| | """List directory contents as a visual tree.""" |
| | return format_dir_listing( |
| | abs_path, |
| | self.display_path(abs_path), |
| | show_hidden=show_hidden, |
| | recursive=recursive, |
| | max_entries=max_entries, |
| | fmt_size_fn=_fmt_size, |
| | ) |
| |
|
| | def search_text( |
| | self, |
| | abs_path: str, |
| | query: str, |
| | *, |
| | recursive: bool = False, |
| | show_hidden: bool = False, |
| | max_results: int = 20, |
| | case_sensitive: bool = False, |
| | start_index: int = 0, |
| | ) -> str: |
| | """Search for text within files.""" |
| | if not os.path.exists(abs_path): |
| | return self.err( |
| | "path_not_found", |
| | f"Path not found: {self.display_path(abs_path)}", |
| | path=self.display_path(abs_path), |
| | ) |
| |
|
| | query = query or "" |
| | normalized_query = query if case_sensitive else query.lower() |
| | if normalized_query == "": |
| | return self.err( |
| | "missing_search_query", |
| | "Search query is required for the search action.", |
| | hint="Provide text in the Content field to search for.", |
| | ) |
| |
|
| | max_results = max(1, int(max_results) if max_results is not None else 20) |
| | start_index = max(0, int(start_index) if start_index is not None else 0) |
| | matches: list[tuple[str, int, str]] = [] |
| | errors: list[str] = [] |
| | files_scanned = 0 |
| | truncated = False |
| | total_matches = 0 |
| |
|
| | def _should_skip(name: str) -> bool: |
| | return not show_hidden and name.startswith(".") |
| |
|
| | def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: |
| | nonlocal truncated, total_matches |
| | total_matches += 1 |
| | if total_matches <= start_index: |
| | return False |
| | if len(matches) < max_results: |
| | snippet = line_text.strip() |
| | if len(snippet) > 200: |
| | snippet = snippet[:197] + "…" |
| | matches.append((self.display_path(file_path), line_no, snippet)) |
| | return False |
| | truncated = True |
| | return True |
| |
|
| | def _search_file(file_path: str) -> bool: |
| | nonlocal files_scanned |
| | files_scanned += 1 |
| | try: |
| | with open(file_path, "r", encoding="utf-8", errors="replace") as handle: |
| | for line_no, line in enumerate(handle, start=1): |
| | haystack = line if case_sensitive else line.lower() |
| | if normalized_query in haystack: |
| | if _handle_match(file_path, line_no, line): |
| | return True |
| | except Exception as exc: |
| | errors.append(f"{self.display_path(file_path)} ({self.safe_err(exc)})") |
| | return truncated |
| |
|
| | if os.path.isfile(abs_path): |
| | _search_file(abs_path) |
| | else: |
| | for root, dirs, files in os.walk(abs_path): |
| | dirs[:] = [d for d in dirs if not _should_skip(d)] |
| | visible_files = [f for f in files if show_hidden or not f.startswith(".")] |
| | for name in visible_files: |
| | file_path = os.path.join(root, name) |
| | if _search_file(file_path): |
| | break |
| | if truncated: |
| | break |
| | if not recursive: |
| | break |
| |
|
| | header_lines = [ |
| | f"Search results for {query!r}", |
| | f"Scope: {self.display_path(abs_path)}", |
| | f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", |
| | f"Start offset: {start_index}", |
| | f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), |
| | f"Files scanned: {files_scanned}", |
| | ] |
| |
|
| | next_cursor = start_index + len(matches) if truncated else None |
| |
|
| | if truncated: |
| | header_lines.append(f"Matches encountered before truncation: {total_matches}") |
| | header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") |
| | header_lines.append(f"Next cursor: {next_cursor}") |
| | else: |
| | header_lines.append(f"Total matches found: {total_matches}") |
| | header_lines.append("Truncated: no — end of results.") |
| | header_lines.append("Next cursor: None") |
| |
|
| | if not matches: |
| | if total_matches > 0 and start_index >= total_matches: |
| | hint_limit = max(total_matches - 1, 0) |
| | body_lines = [ |
| | f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", |
| | (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), |
| | ] |
| | body_lines = [line for line in body_lines if line] |
| | else: |
| | body_lines = [ |
| | "No matches found.", |
| | (f"Total matches encountered: {total_matches}." if total_matches else ""), |
| | ] |
| | body_lines = [line for line in body_lines if line] |
| | else: |
| | body_lines = [ |
| | f"{idx}. {path}:{line_no}: {text}" |
| | for idx, (path, line_no, text) in enumerate(matches, start=1) |
| | ] |
| |
|
| | if errors: |
| | shown = errors[:5] |
| | body_lines.extend(["", "Warnings:"]) |
| | body_lines.extend(shown) |
| | if len(errors) > len(shown): |
| | body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") |
| |
|
| | return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) |
| |
|
| | def read_file(self, abs_path: str, *, offset: int = 0, max_chars: int = 4000) -> str: |
| | """Read file contents with optional offset and character limit.""" |
| | if not os.path.exists(abs_path): |
| | return self.err( |
| | "file_not_found", |
| | f"File not found: {self.display_path(abs_path)}", |
| | path=self.display_path(abs_path), |
| | ) |
| | if os.path.isdir(abs_path): |
| | return self.err( |
| | "is_directory", |
| | f"Path is a directory, not a file: {self.display_path(abs_path)}", |
| | path=self.display_path(abs_path), |
| | hint="Provide a file path.", |
| | ) |
| | try: |
| | with open(abs_path, "r", encoding="utf-8", errors="replace") as f: |
| | data = f.read() |
| | except Exception as exc: |
| | return self.err( |
| | "read_failed", |
| | "Failed to read file.", |
| | path=self.display_path(abs_path), |
| | data={"error": self.safe_err(exc)}, |
| | ) |
| | total = len(data) |
| | start = max(0, min(offset, total)) |
| | if max_chars > 0: |
| | end = min(total, start + max_chars) |
| | else: |
| | end = total |
| | chunk = data[start:end] |
| | next_cursor = end if end < total else None |
| | header = ( |
| | f"Reading {self.display_path(abs_path)}\n" |
| | f"Offset {start}, returned {len(chunk)} of {total}." |
| | + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") |
| | ) |
| | sep = "\n\n---\n\n" |
| | return header + sep + chunk |
| |
|
| | def info(self, abs_path: str) -> str: |
| | """Get file/directory metadata as JSON.""" |
| | try: |
| | st = os.stat(abs_path) |
| | except Exception as exc: |
| | return self.err( |
| | "stat_failed", |
| | "Failed to stat path.", |
| | path=self.display_path(abs_path), |
| | data={"error": self.safe_err(exc)}, |
| | ) |
| | info_dict = { |
| | "path": self.display_path(abs_path), |
| | "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", |
| | "size": st.st_size, |
| | "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=" ", timespec="seconds"), |
| | "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=" ", timespec="seconds"), |
| | "mode": oct(st.st_mode), |
| | "root": "/", |
| | } |
| | return json.dumps(info_dict, indent=2) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _get_filesystem_root() -> str: |
| | """Get the default filesystem root directory.""" |
| | root = os.getenv("NYMBO_TOOLS_ROOT") |
| | if root and root.strip(): |
| | return os.path.abspath(os.path.expanduser(root.strip())) |
| | try: |
| | here = os.path.abspath(__file__) |
| | tools_dir = os.path.dirname(os.path.dirname(here)) |
| | return os.path.abspath(os.path.join(tools_dir, "Filesystem")) |
| | except Exception: |
| | return os.path.abspath(os.getcwd()) |
| |
|
| |
|
| | def _get_obsidian_root() -> str: |
| | """Get the default Obsidian vault root directory.""" |
| | env_root = os.getenv("OBSIDIAN_VAULT_ROOT") |
| | if env_root and env_root.strip(): |
| | return os.path.abspath(os.path.expanduser(env_root.strip())) |
| | try: |
| | here = os.path.abspath(__file__) |
| | tools_dir = os.path.dirname(os.path.dirname(here)) |
| | return os.path.abspath(os.path.join(tools_dir, "Obsidian")) |
| | except Exception: |
| | return os.path.abspath(os.getcwd()) |
| |
|
| |
|
| | |
| | ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) |
| |
|
| | FILESYSTEM_ROOT = _get_filesystem_root() |
| | OBSIDIAN_ROOT = _get_obsidian_root() |
| |
|
| | |
| | filesystem_sandbox = SandboxedRoot(FILESYSTEM_ROOT, allow_abs=ALLOW_ABS) |
| |
|
| | |
| | obsidian_sandbox = SandboxedRoot(OBSIDIAN_ROOT, allow_abs=ALLOW_ABS) |
| |
|
| |
|
| | |
| | ROOT_DIR = FILESYSTEM_ROOT |
| |
|
| | def _resolve_path(path: str) -> tuple[str, str]: |
| | """Resolve path using the default filesystem sandbox.""" |
| | return filesystem_sandbox.resolve_path(path) |
| |
|
| | def _display_path(abs_path: str) -> str: |
| | """Display path using the default filesystem sandbox.""" |
| | return filesystem_sandbox.display_path(abs_path) |
| |
|
| | def safe_open(file, *args, **kwargs): |
| | """Open file using the default filesystem sandbox.""" |
| | return filesystem_sandbox.safe_open(file, *args, **kwargs) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def create_safe_builtins() -> dict: |
| | """Create a builtins dict with sandboxed open().""" |
| | if isinstance(__builtins__, dict): |
| | safe_builtins = __builtins__.copy() |
| | else: |
| | safe_builtins = vars(__builtins__).copy() |
| | safe_builtins["open"] = safe_open |
| | return safe_builtins |
| |
|
| |
|
| | def sandboxed_exec( |
| | code: str, |
| | *, |
| | extra_globals: dict[str, Any] | None = None, |
| | ast_mode: bool = False, |
| | ) -> str: |
| | """ |
| | Execute Python code in a sandboxed environment. |
| | |
| | Args: |
| | code: Python source code to execute |
| | extra_globals: Additional globals to inject (e.g., tools) |
| | ast_mode: If True, parse and print results of all expression statements |
| | (like Agent_Terminal). If False, simple exec (like Code_Interpreter). |
| | |
| | Returns: |
| | Captured stdout output, or exception text on error. |
| | """ |
| | if not code: |
| | return "No code provided." |
| | |
| | old_stdout = sys.stdout |
| | old_cwd = os.getcwd() |
| | redirected_output = sys.stdout = StringIO() |
| | |
| | |
| | safe_builtins = create_safe_builtins() |
| | env: dict[str, Any] = { |
| | "open": safe_open, |
| | "__builtins__": safe_builtins, |
| | "print": print, |
| | } |
| | if extra_globals: |
| | env.update(extra_globals) |
| | |
| | try: |
| | os.chdir(ROOT_DIR) |
| | |
| | if ast_mode: |
| | |
| | tree = ast.parse(code) |
| | for node in tree.body: |
| | if isinstance(node, ast.Expr): |
| | |
| | expr = compile(ast.Expression(node.value), filename="<string>", mode="eval") |
| | result_val = eval(expr, env) |
| | if result_val is not None: |
| | print(result_val) |
| | else: |
| | |
| | mod = ast.Module(body=[node], type_ignores=[]) |
| | exec(compile(mod, filename="<string>", mode="exec"), env) |
| | else: |
| | |
| | exec(code, env) |
| | |
| | result = redirected_output.getvalue() |
| | except Exception as exc: |
| | result = str(exc) |
| | finally: |
| | sys.stdout = old_stdout |
| | try: |
| | os.chdir(old_cwd) |
| | except Exception: |
| | pass |
| | |
| | return result |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def get_hf_token() -> str | None: |
| | """Get the HF API token from environment variables. |
| | |
| | Checks HF_READ_TOKEN first, then falls back to HF_TOKEN. |
| | """ |
| | return os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
| |
|
| |
|
| | |
| | HF_TOKEN = get_hf_token() |
| |
|
| | |
| | DEFAULT_PROVIDERS = ["auto", "replicate", "fal-ai"] |
| |
|
| | |
| | TEXTGEN_PROVIDERS = ["cerebras", "auto"] |
| |
|
| |
|
| | T = TypeVar("T") |
| |
|
| |
|
| | def handle_hf_error(msg: str, model_id: str, *, context: str = "generation") -> None: |
| | """ |
| | Raise appropriate gr.Error for common HF API error codes. |
| | |
| | Args: |
| | msg: Error message string to analyze |
| | model_id: The model ID being used (for error messages) |
| | context: Description of operation for error messages |
| | |
| | Raises: |
| | gr.Error: With user-friendly message based on error type |
| | """ |
| | lowered = msg.lower() |
| | |
| | if "404" in msg: |
| | raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
| | |
| | if "503" in msg: |
| | raise gr.Error("The model is warming up. Please try again shortly.") |
| | |
| | if "401" in msg or "403" in msg: |
| | raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
| | |
| | if any(pattern in lowered for pattern in ("api_key", "hf auth login", "unauthorized", "forbidden")): |
| | raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
| | |
| | |
| | raise gr.Error(f"{context.capitalize()} failed: {msg}") |
| |
|
| |
|
| | def invoke_with_fallback( |
| | fn: Callable[[str], T], |
| | providers: list[str] | None = None, |
| | ) -> T: |
| | """ |
| | Try calling fn(provider) for each provider until one succeeds. |
| | |
| | Args: |
| | fn: Function that takes a provider string and returns a result. |
| | Should raise an exception on failure. |
| | providers: List of provider strings to try. Defaults to DEFAULT_PROVIDERS. |
| | |
| | Returns: |
| | The result from the first successful fn() call. |
| | |
| | Raises: |
| | The last exception if all providers fail. |
| | """ |
| | if providers is None: |
| | providers = DEFAULT_PROVIDERS |
| | |
| | last_error: Exception | None = None |
| | |
| | for provider in providers: |
| | try: |
| | return fn(provider) |
| | except Exception as exc: |
| | last_error = exc |
| | continue |
| | |
| | |
| | if last_error: |
| | raise last_error |
| | raise RuntimeError("No providers available") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | __all__ = [ |
| | |
| | "_fmt_size", |
| | "build_tree", |
| | "render_tree", |
| | "walk_and_build_tree", |
| | "format_dir_listing", |
| | |
| | "SandboxedRoot", |
| | "filesystem_sandbox", |
| | "obsidian_sandbox", |
| | "ROOT_DIR", |
| | "FILESYSTEM_ROOT", |
| | "OBSIDIAN_ROOT", |
| | "ALLOW_ABS", |
| | "_resolve_path", |
| | "_display_path", |
| | "safe_open", |
| | |
| | "sandboxed_exec", |
| | "create_safe_builtins", |
| | |
| | "get_hf_token", |
| | "HF_TOKEN", |
| | "DEFAULT_PROVIDERS", |
| | "TEXTGEN_PROVIDERS", |
| | "handle_hf_error", |
| | "invoke_with_fallback", |
| | ] |
| |
|