|
|
""" |
|
|
Core shared utilities for the Nymbo-Tools MCP server. |
|
|
|
|
|
Consolidates three key areas: |
|
|
1. Sandboxed filesystem operations (path resolution, reading, writing, safe_open) |
|
|
2. Sandboxed Python execution (code interpreter, agent terminal) |
|
|
3. Hugging Face inference utilities (token, providers, error handling) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import ast |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import stat |
|
|
import sys |
|
|
from datetime import datetime |
|
|
from io import StringIO |
|
|
from typing import Any, Callable, Optional, TypeVar |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fmt_size(num_bytes: int) -> str: |
|
|
"""Format byte size as human-readable string.""" |
|
|
units = ["B", "KB", "MB", "GB"] |
|
|
size = float(num_bytes) |
|
|
for unit in units: |
|
|
if size < 1024.0: |
|
|
return f"{size:.1f} {unit}" |
|
|
size /= 1024.0 |
|
|
return f"{size:.1f} TB" |
|
|
|
|
|
|
|
|
def build_tree(entries: list[tuple[str, dict]]) -> dict: |
|
|
""" |
|
|
Build a nested tree structure from flat path entries. |
|
|
|
|
|
Args: |
|
|
entries: List of (path, metadata) tuples where path uses forward slashes. |
|
|
Paths ending with '/' are treated as directories. |
|
|
|
|
|
Returns: |
|
|
Nested dict with "__files__" key for files at each level. |
|
|
""" |
|
|
root: dict = {"__files__": []} |
|
|
|
|
|
for path, metadata in entries: |
|
|
parts = path.rstrip("/").split("/") |
|
|
is_dir = path.endswith("/") |
|
|
|
|
|
node = root |
|
|
for i, part in enumerate(parts[:-1]): |
|
|
if part not in node: |
|
|
node[part] = {"__files__": []} |
|
|
node = node[part] |
|
|
|
|
|
final = parts[-1] |
|
|
if is_dir: |
|
|
if final not in node: |
|
|
node[final] = {"__files__": []} |
|
|
if metadata: |
|
|
node[final]["__meta__"] = metadata |
|
|
else: |
|
|
node["__files__"].append((final, metadata)) |
|
|
|
|
|
return root |
|
|
|
|
|
|
|
|
def render_tree( |
|
|
node: dict, |
|
|
prefix: str = "", |
|
|
format_entry: Optional[Callable[[str, dict, bool], str]] = None, |
|
|
) -> list[str]: |
|
|
""" |
|
|
Render a tree with line connectors. |
|
|
|
|
|
Args: |
|
|
node: Nested dict from build_tree() |
|
|
prefix: Current line prefix for indentation |
|
|
format_entry: Optional callback to format each entry. |
|
|
|
|
|
Returns: |
|
|
List of formatted lines. |
|
|
""" |
|
|
result = [] |
|
|
|
|
|
def default_format(name: str, meta: dict, is_dir: bool) -> str: |
|
|
if is_dir: |
|
|
return f"{name}/" |
|
|
size = meta.get("size") |
|
|
if size is not None: |
|
|
return f"{name} ({_fmt_size(size)})" |
|
|
return name |
|
|
|
|
|
fmt = format_entry or default_format |
|
|
|
|
|
entries = [] |
|
|
subdirs = sorted(k for k in node.keys() if k not in ("__files__", "__meta__")) |
|
|
files_here = sorted(node.get("__files__", []), key=lambda x: x[0]) |
|
|
|
|
|
for dirname in subdirs: |
|
|
dir_meta = node[dirname].get("__meta__", {}) |
|
|
entries.append(("dir", dirname, node[dirname], dir_meta)) |
|
|
for fname, fmeta in files_here: |
|
|
entries.append(("file", fname, None, fmeta)) |
|
|
|
|
|
for i, entry in enumerate(entries): |
|
|
is_last = (i == len(entries) - 1) |
|
|
connector = "└── " if is_last else "├── " |
|
|
child_prefix = prefix + (" " if is_last else "│ ") |
|
|
|
|
|
etype, name, subtree, meta = entry |
|
|
|
|
|
if etype == "dir": |
|
|
result.append(f"{prefix}{connector}{fmt(name, meta, True)}") |
|
|
result.extend(render_tree(subtree, child_prefix, format_entry)) |
|
|
else: |
|
|
result.append(f"{prefix}{connector}{fmt(name, meta, False)}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def walk_and_build_tree( |
|
|
abs_path: str, |
|
|
*, |
|
|
show_hidden: bool = False, |
|
|
recursive: bool = False, |
|
|
max_entries: int = 100, |
|
|
) -> tuple[dict, int, bool]: |
|
|
""" |
|
|
Walk a directory and build a tree structure. |
|
|
|
|
|
Returns: |
|
|
(tree, total_entries, truncated) |
|
|
""" |
|
|
entries: list[tuple[str, dict]] = [] |
|
|
total = 0 |
|
|
truncated = False |
|
|
|
|
|
for root, dirs, files in os.walk(abs_path): |
|
|
if not show_hidden: |
|
|
dirs[:] = [d for d in dirs if not d.startswith('.')] |
|
|
files = [f for f in files if not f.startswith('.')] |
|
|
|
|
|
dirs.sort() |
|
|
files.sort() |
|
|
|
|
|
try: |
|
|
rel_root = os.path.relpath(root, abs_path) |
|
|
except Exception: |
|
|
rel_root = "" |
|
|
prefix = "" if rel_root == "." else rel_root.replace("\\", "/") + "/" |
|
|
|
|
|
for d in dirs: |
|
|
p = os.path.join(root, d) |
|
|
try: |
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
|
|
except Exception: |
|
|
mtime = "?" |
|
|
entries.append((f"{prefix}{d}/", {"mtime": mtime})) |
|
|
total += 1 |
|
|
if total >= max_entries: |
|
|
truncated = True |
|
|
break |
|
|
|
|
|
if truncated: |
|
|
break |
|
|
|
|
|
for f in files: |
|
|
p = os.path.join(root, f) |
|
|
try: |
|
|
size = os.path.getsize(p) |
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
|
|
except Exception: |
|
|
size, mtime = 0, "?" |
|
|
entries.append((f"{prefix}{f}", {"size": size, "mtime": mtime})) |
|
|
total += 1 |
|
|
if total >= max_entries: |
|
|
truncated = True |
|
|
break |
|
|
|
|
|
if truncated: |
|
|
break |
|
|
|
|
|
if not recursive: |
|
|
break |
|
|
|
|
|
return build_tree(entries), total, truncated |
|
|
|
|
|
|
|
|
def format_dir_listing( |
|
|
abs_path: str, |
|
|
display_path: str, |
|
|
*, |
|
|
show_hidden: bool = False, |
|
|
recursive: bool = False, |
|
|
max_entries: int = 100, |
|
|
fmt_size_fn: Optional[Callable[[int], str]] = None, |
|
|
) -> str: |
|
|
"""Format a directory listing as a visual tree.""" |
|
|
fmt_size = fmt_size_fn or _fmt_size |
|
|
|
|
|
tree, total, truncated = walk_and_build_tree( |
|
|
abs_path, |
|
|
show_hidden=show_hidden, |
|
|
recursive=recursive, |
|
|
max_entries=max_entries, |
|
|
) |
|
|
|
|
|
def format_entry(name: str, meta: dict, is_dir: bool) -> str: |
|
|
mtime = meta.get("mtime", "") |
|
|
if is_dir: |
|
|
return f"{name}/ ({mtime})" |
|
|
size = meta.get("size", 0) |
|
|
return f"{name} ({fmt_size(size)}, {mtime})" |
|
|
|
|
|
tree_lines = render_tree(tree, " ", format_entry) |
|
|
|
|
|
header = f"Listing of {display_path}\nRoot: /\nEntries: {total}" |
|
|
if truncated: |
|
|
header += f"\n… Truncated at {max_entries} entries." |
|
|
|
|
|
lines = [header, "", "└── /"] |
|
|
lines.extend(tree_lines) |
|
|
|
|
|
return "\n".join(lines).strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SandboxedRoot: |
|
|
""" |
|
|
A configurable sandboxed root directory with path resolution and safety checks. |
|
|
|
|
|
Args: |
|
|
root_dir: Absolute path to the sandbox root. |
|
|
allow_abs: If True, allow absolute paths outside the sandbox. |
|
|
""" |
|
|
|
|
|
def __init__(self, root_dir: str, allow_abs: bool = False): |
|
|
self.root_dir = os.path.abspath(root_dir) |
|
|
self.allow_abs = allow_abs |
|
|
|
|
|
try: |
|
|
os.makedirs(self.root_dir, exist_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def safe_err(self, exc: Exception | str) -> str: |
|
|
"""Return an error string with any absolute root replaced by '/' and slashes normalized.""" |
|
|
s = str(exc) |
|
|
s_norm = s.replace("\\", "/") |
|
|
root_fwd = self.root_dir.replace("\\", "/") |
|
|
root_variants = {self.root_dir, root_fwd, re.sub(r"/+", "/", root_fwd)} |
|
|
for variant in root_variants: |
|
|
if variant: |
|
|
s_norm = s_norm.replace(variant, "/") |
|
|
s_norm = re.sub(r"/+", "/", s_norm) |
|
|
return s_norm |
|
|
|
|
|
def err( |
|
|
self, |
|
|
code: str, |
|
|
message: str, |
|
|
*, |
|
|
path: Optional[str] = None, |
|
|
hint: Optional[str] = None, |
|
|
data: Optional[dict] = None, |
|
|
) -> str: |
|
|
"""Return a structured error JSON string.""" |
|
|
payload = { |
|
|
"status": "error", |
|
|
"code": code, |
|
|
"message": message, |
|
|
"root": "/", |
|
|
} |
|
|
if path is not None and path != "": |
|
|
payload["path"] = path |
|
|
if hint: |
|
|
payload["hint"] = hint |
|
|
if data: |
|
|
payload["data"] = data |
|
|
return json.dumps(payload, ensure_ascii=False) |
|
|
|
|
|
def display_path(self, abs_path: str) -> str: |
|
|
"""Return a user-friendly path relative to root using forward slashes.""" |
|
|
try: |
|
|
norm_root = os.path.normpath(self.root_dir) |
|
|
norm_abs = os.path.normpath(abs_path) |
|
|
common = os.path.commonpath([norm_root, norm_abs]) |
|
|
if os.path.normcase(common) == os.path.normcase(norm_root): |
|
|
rel = os.path.relpath(norm_abs, norm_root) |
|
|
if rel == ".": |
|
|
return "/" |
|
|
return "/" + rel.replace("\\", "/") |
|
|
except Exception: |
|
|
pass |
|
|
return abs_path.replace("\\", "/") |
|
|
|
|
|
def resolve_path(self, path: str) -> tuple[str, str]: |
|
|
""" |
|
|
Resolve a user-provided path to an absolute, normalized path constrained to root. |
|
|
Returns (abs_path, error_message). error_message is empty when ok. |
|
|
""" |
|
|
try: |
|
|
user_input = (path or "/").strip() or "/" |
|
|
if user_input.startswith("/"): |
|
|
rel_part = user_input.lstrip("/") or "." |
|
|
raw = os.path.expanduser(rel_part) |
|
|
treat_as_relative = True |
|
|
else: |
|
|
raw = os.path.expanduser(user_input) |
|
|
treat_as_relative = False |
|
|
|
|
|
if not treat_as_relative and os.path.isabs(raw): |
|
|
if not self.allow_abs: |
|
|
return "", self.err( |
|
|
"absolute_path_disabled", |
|
|
"Absolute paths are disabled in safe mode.", |
|
|
path=raw.replace("\\", "/"), |
|
|
hint="Use a path relative to / (e.g., /notes/todo.txt).", |
|
|
) |
|
|
abs_path = os.path.abspath(raw) |
|
|
else: |
|
|
abs_path = os.path.abspath(os.path.join(self.root_dir, raw)) |
|
|
|
|
|
|
|
|
if not self.allow_abs: |
|
|
try: |
|
|
common = os.path.commonpath( |
|
|
[os.path.normpath(self.root_dir), os.path.normpath(abs_path)] |
|
|
) |
|
|
if common != os.path.normpath(self.root_dir): |
|
|
return "", self.err( |
|
|
"path_outside_root", |
|
|
"Path is outside the sandbox root.", |
|
|
path=abs_path, |
|
|
) |
|
|
except Exception: |
|
|
return "", self.err( |
|
|
"path_outside_root", |
|
|
"Path is outside the sandbox root.", |
|
|
path=abs_path, |
|
|
) |
|
|
|
|
|
return abs_path, "" |
|
|
except Exception as exc: |
|
|
return "", self.err( |
|
|
"resolve_path_failed", |
|
|
"Failed to resolve path.", |
|
|
path=(path or ""), |
|
|
data={"error": self.safe_err(exc)}, |
|
|
) |
|
|
|
|
|
def safe_open(self, file, *args, **kwargs): |
|
|
"""A drop-in replacement for open() that enforces sandbox constraints.""" |
|
|
if isinstance(file, int): |
|
|
return open(file, *args, **kwargs) |
|
|
|
|
|
path_str = os.fspath(file) |
|
|
abs_path, err = self.resolve_path(path_str) |
|
|
if err: |
|
|
try: |
|
|
msg = json.loads(err)["message"] |
|
|
except Exception: |
|
|
msg = err |
|
|
raise PermissionError(f"Sandboxed open() failed: {msg}") |
|
|
|
|
|
return open(abs_path, *args, **kwargs) |
|
|
|
|
|
def list_dir( |
|
|
self, |
|
|
abs_path: str, |
|
|
*, |
|
|
show_hidden: bool = False, |
|
|
recursive: bool = False, |
|
|
max_entries: int = 100, |
|
|
) -> str: |
|
|
"""List directory contents as a visual tree.""" |
|
|
return format_dir_listing( |
|
|
abs_path, |
|
|
self.display_path(abs_path), |
|
|
show_hidden=show_hidden, |
|
|
recursive=recursive, |
|
|
max_entries=max_entries, |
|
|
fmt_size_fn=_fmt_size, |
|
|
) |
|
|
|
|
|
def search_text( |
|
|
self, |
|
|
abs_path: str, |
|
|
query: str, |
|
|
*, |
|
|
recursive: bool = False, |
|
|
show_hidden: bool = False, |
|
|
max_results: int = 20, |
|
|
case_sensitive: bool = False, |
|
|
start_index: int = 0, |
|
|
) -> str: |
|
|
"""Search for text within files.""" |
|
|
if not os.path.exists(abs_path): |
|
|
return self.err( |
|
|
"path_not_found", |
|
|
f"Path not found: {self.display_path(abs_path)}", |
|
|
path=self.display_path(abs_path), |
|
|
) |
|
|
|
|
|
query = query or "" |
|
|
normalized_query = query if case_sensitive else query.lower() |
|
|
if normalized_query == "": |
|
|
return self.err( |
|
|
"missing_search_query", |
|
|
"Search query is required for the search action.", |
|
|
hint="Provide text in the Content field to search for.", |
|
|
) |
|
|
|
|
|
max_results = max(1, int(max_results) if max_results is not None else 20) |
|
|
start_index = max(0, int(start_index) if start_index is not None else 0) |
|
|
matches: list[tuple[str, int, str]] = [] |
|
|
errors: list[str] = [] |
|
|
files_scanned = 0 |
|
|
truncated = False |
|
|
total_matches = 0 |
|
|
|
|
|
def _should_skip(name: str) -> bool: |
|
|
return not show_hidden and name.startswith(".") |
|
|
|
|
|
def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: |
|
|
nonlocal truncated, total_matches |
|
|
total_matches += 1 |
|
|
if total_matches <= start_index: |
|
|
return False |
|
|
if len(matches) < max_results: |
|
|
snippet = line_text.strip() |
|
|
if len(snippet) > 200: |
|
|
snippet = snippet[:197] + "…" |
|
|
matches.append((self.display_path(file_path), line_no, snippet)) |
|
|
return False |
|
|
truncated = True |
|
|
return True |
|
|
|
|
|
def _search_file(file_path: str) -> bool: |
|
|
nonlocal files_scanned |
|
|
files_scanned += 1 |
|
|
try: |
|
|
with open(file_path, "r", encoding="utf-8", errors="replace") as handle: |
|
|
for line_no, line in enumerate(handle, start=1): |
|
|
haystack = line if case_sensitive else line.lower() |
|
|
if normalized_query in haystack: |
|
|
if _handle_match(file_path, line_no, line): |
|
|
return True |
|
|
except Exception as exc: |
|
|
errors.append(f"{self.display_path(file_path)} ({self.safe_err(exc)})") |
|
|
return truncated |
|
|
|
|
|
if os.path.isfile(abs_path): |
|
|
_search_file(abs_path) |
|
|
else: |
|
|
for root, dirs, files in os.walk(abs_path): |
|
|
dirs[:] = [d for d in dirs if not _should_skip(d)] |
|
|
visible_files = [f for f in files if show_hidden or not f.startswith(".")] |
|
|
for name in visible_files: |
|
|
file_path = os.path.join(root, name) |
|
|
if _search_file(file_path): |
|
|
break |
|
|
if truncated: |
|
|
break |
|
|
if not recursive: |
|
|
break |
|
|
|
|
|
header_lines = [ |
|
|
f"Search results for {query!r}", |
|
|
f"Scope: {self.display_path(abs_path)}", |
|
|
f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", |
|
|
f"Start offset: {start_index}", |
|
|
f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), |
|
|
f"Files scanned: {files_scanned}", |
|
|
] |
|
|
|
|
|
next_cursor = start_index + len(matches) if truncated else None |
|
|
|
|
|
if truncated: |
|
|
header_lines.append(f"Matches encountered before truncation: {total_matches}") |
|
|
header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") |
|
|
header_lines.append(f"Next cursor: {next_cursor}") |
|
|
else: |
|
|
header_lines.append(f"Total matches found: {total_matches}") |
|
|
header_lines.append("Truncated: no — end of results.") |
|
|
header_lines.append("Next cursor: None") |
|
|
|
|
|
if not matches: |
|
|
if total_matches > 0 and start_index >= total_matches: |
|
|
hint_limit = max(total_matches - 1, 0) |
|
|
body_lines = [ |
|
|
f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", |
|
|
(f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), |
|
|
] |
|
|
body_lines = [line for line in body_lines if line] |
|
|
else: |
|
|
body_lines = [ |
|
|
"No matches found.", |
|
|
(f"Total matches encountered: {total_matches}." if total_matches else ""), |
|
|
] |
|
|
body_lines = [line for line in body_lines if line] |
|
|
else: |
|
|
body_lines = [ |
|
|
f"{idx}. {path}:{line_no}: {text}" |
|
|
for idx, (path, line_no, text) in enumerate(matches, start=1) |
|
|
] |
|
|
|
|
|
if errors: |
|
|
shown = errors[:5] |
|
|
body_lines.extend(["", "Warnings:"]) |
|
|
body_lines.extend(shown) |
|
|
if len(errors) > len(shown): |
|
|
body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") |
|
|
|
|
|
return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) |
|
|
|
|
|
def read_file(self, abs_path: str, *, offset: int = 0, max_chars: int = 4000) -> str: |
|
|
"""Read file contents with optional offset and character limit.""" |
|
|
if not os.path.exists(abs_path): |
|
|
return self.err( |
|
|
"file_not_found", |
|
|
f"File not found: {self.display_path(abs_path)}", |
|
|
path=self.display_path(abs_path), |
|
|
) |
|
|
if os.path.isdir(abs_path): |
|
|
return self.err( |
|
|
"is_directory", |
|
|
f"Path is a directory, not a file: {self.display_path(abs_path)}", |
|
|
path=self.display_path(abs_path), |
|
|
hint="Provide a file path.", |
|
|
) |
|
|
try: |
|
|
with open(abs_path, "r", encoding="utf-8", errors="replace") as f: |
|
|
data = f.read() |
|
|
except Exception as exc: |
|
|
return self.err( |
|
|
"read_failed", |
|
|
"Failed to read file.", |
|
|
path=self.display_path(abs_path), |
|
|
data={"error": self.safe_err(exc)}, |
|
|
) |
|
|
total = len(data) |
|
|
start = max(0, min(offset, total)) |
|
|
if max_chars > 0: |
|
|
end = min(total, start + max_chars) |
|
|
else: |
|
|
end = total |
|
|
chunk = data[start:end] |
|
|
next_cursor = end if end < total else None |
|
|
header = ( |
|
|
f"Reading {self.display_path(abs_path)}\n" |
|
|
f"Offset {start}, returned {len(chunk)} of {total}." |
|
|
+ (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") |
|
|
) |
|
|
sep = "\n\n---\n\n" |
|
|
return header + sep + chunk |
|
|
|
|
|
def info(self, abs_path: str) -> str: |
|
|
"""Get file/directory metadata as JSON.""" |
|
|
try: |
|
|
st = os.stat(abs_path) |
|
|
except Exception as exc: |
|
|
return self.err( |
|
|
"stat_failed", |
|
|
"Failed to stat path.", |
|
|
path=self.display_path(abs_path), |
|
|
data={"error": self.safe_err(exc)}, |
|
|
) |
|
|
info_dict = { |
|
|
"path": self.display_path(abs_path), |
|
|
"type": "directory" if stat.S_ISDIR(st.st_mode) else "file", |
|
|
"size": st.st_size, |
|
|
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=" ", timespec="seconds"), |
|
|
"created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=" ", timespec="seconds"), |
|
|
"mode": oct(st.st_mode), |
|
|
"root": "/", |
|
|
} |
|
|
return json.dumps(info_dict, indent=2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_filesystem_root() -> str: |
|
|
"""Get the default filesystem root directory.""" |
|
|
root = os.getenv("NYMBO_TOOLS_ROOT") |
|
|
if root and root.strip(): |
|
|
return os.path.abspath(os.path.expanduser(root.strip())) |
|
|
try: |
|
|
here = os.path.abspath(__file__) |
|
|
tools_dir = os.path.dirname(os.path.dirname(here)) |
|
|
return os.path.abspath(os.path.join(tools_dir, "Filesystem")) |
|
|
except Exception: |
|
|
return os.path.abspath(os.getcwd()) |
|
|
|
|
|
|
|
|
def _get_obsidian_root() -> str: |
|
|
"""Get the default Obsidian vault root directory.""" |
|
|
env_root = os.getenv("OBSIDIAN_VAULT_ROOT") |
|
|
if env_root and env_root.strip(): |
|
|
return os.path.abspath(os.path.expanduser(env_root.strip())) |
|
|
try: |
|
|
here = os.path.abspath(__file__) |
|
|
tools_dir = os.path.dirname(os.path.dirname(here)) |
|
|
return os.path.abspath(os.path.join(tools_dir, "Obsidian")) |
|
|
except Exception: |
|
|
return os.path.abspath(os.getcwd()) |
|
|
|
|
|
|
|
|
|
|
|
ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) |
|
|
|
|
|
FILESYSTEM_ROOT = _get_filesystem_root() |
|
|
OBSIDIAN_ROOT = _get_obsidian_root() |
|
|
|
|
|
|
|
|
filesystem_sandbox = SandboxedRoot(FILESYSTEM_ROOT, allow_abs=ALLOW_ABS) |
|
|
|
|
|
|
|
|
obsidian_sandbox = SandboxedRoot(OBSIDIAN_ROOT, allow_abs=ALLOW_ABS) |
|
|
|
|
|
|
|
|
|
|
|
ROOT_DIR = FILESYSTEM_ROOT |
|
|
|
|
|
def _resolve_path(path: str) -> tuple[str, str]: |
|
|
"""Resolve path using the default filesystem sandbox.""" |
|
|
return filesystem_sandbox.resolve_path(path) |
|
|
|
|
|
def _display_path(abs_path: str) -> str: |
|
|
"""Display path using the default filesystem sandbox.""" |
|
|
return filesystem_sandbox.display_path(abs_path) |
|
|
|
|
|
def safe_open(file, *args, **kwargs): |
|
|
"""Open file using the default filesystem sandbox.""" |
|
|
return filesystem_sandbox.safe_open(file, *args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_safe_builtins() -> dict: |
|
|
"""Create a builtins dict with sandboxed open().""" |
|
|
if isinstance(__builtins__, dict): |
|
|
safe_builtins = __builtins__.copy() |
|
|
else: |
|
|
safe_builtins = vars(__builtins__).copy() |
|
|
safe_builtins["open"] = safe_open |
|
|
return safe_builtins |
|
|
|
|
|
|
|
|
def sandboxed_exec( |
|
|
code: str, |
|
|
*, |
|
|
extra_globals: dict[str, Any] | None = None, |
|
|
ast_mode: bool = False, |
|
|
) -> str: |
|
|
""" |
|
|
Execute Python code in a sandboxed environment. |
|
|
|
|
|
Args: |
|
|
code: Python source code to execute |
|
|
extra_globals: Additional globals to inject (e.g., tools) |
|
|
ast_mode: If True, parse and print results of all expression statements |
|
|
(like Agent_Terminal). If False, simple exec (like Code_Interpreter). |
|
|
|
|
|
Returns: |
|
|
Captured stdout output, or exception text on error. |
|
|
""" |
|
|
if not code: |
|
|
return "No code provided." |
|
|
|
|
|
old_stdout = sys.stdout |
|
|
old_cwd = os.getcwd() |
|
|
redirected_output = sys.stdout = StringIO() |
|
|
|
|
|
|
|
|
safe_builtins = create_safe_builtins() |
|
|
env: dict[str, Any] = { |
|
|
"open": safe_open, |
|
|
"__builtins__": safe_builtins, |
|
|
"print": print, |
|
|
} |
|
|
if extra_globals: |
|
|
env.update(extra_globals) |
|
|
|
|
|
try: |
|
|
os.chdir(ROOT_DIR) |
|
|
|
|
|
if ast_mode: |
|
|
|
|
|
tree = ast.parse(code) |
|
|
for node in tree.body: |
|
|
if isinstance(node, ast.Expr): |
|
|
|
|
|
expr = compile(ast.Expression(node.value), filename="<string>", mode="eval") |
|
|
result_val = eval(expr, env) |
|
|
if result_val is not None: |
|
|
print(result_val) |
|
|
else: |
|
|
|
|
|
mod = ast.Module(body=[node], type_ignores=[]) |
|
|
exec(compile(mod, filename="<string>", mode="exec"), env) |
|
|
else: |
|
|
|
|
|
exec(code, env) |
|
|
|
|
|
result = redirected_output.getvalue() |
|
|
except Exception as exc: |
|
|
result = str(exc) |
|
|
finally: |
|
|
sys.stdout = old_stdout |
|
|
try: |
|
|
os.chdir(old_cwd) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_hf_token() -> str | None: |
|
|
"""Get the HF API token from environment variables. |
|
|
|
|
|
Checks HF_READ_TOKEN first, then falls back to HF_TOKEN. |
|
|
""" |
|
|
return os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
|
|
|
|
|
|
|
|
|
|
|
HF_TOKEN = get_hf_token() |
|
|
|
|
|
|
|
|
DEFAULT_PROVIDERS = ["auto", "replicate", "fal-ai"] |
|
|
|
|
|
|
|
|
TEXTGEN_PROVIDERS = ["cerebras", "auto"] |
|
|
|
|
|
|
|
|
T = TypeVar("T") |
|
|
|
|
|
|
|
|
def handle_hf_error(msg: str, model_id: str, *, context: str = "generation") -> None: |
|
|
""" |
|
|
Raise appropriate gr.Error for common HF API error codes. |
|
|
|
|
|
Args: |
|
|
msg: Error message string to analyze |
|
|
model_id: The model ID being used (for error messages) |
|
|
context: Description of operation for error messages |
|
|
|
|
|
Raises: |
|
|
gr.Error: With user-friendly message based on error type |
|
|
""" |
|
|
lowered = msg.lower() |
|
|
|
|
|
if "404" in msg: |
|
|
raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
|
|
|
|
|
if "503" in msg: |
|
|
raise gr.Error("The model is warming up. Please try again shortly.") |
|
|
|
|
|
if "401" in msg or "403" in msg: |
|
|
raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
|
|
|
|
|
if any(pattern in lowered for pattern in ("api_key", "hf auth login", "unauthorized", "forbidden")): |
|
|
raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
|
|
|
|
|
|
|
|
raise gr.Error(f"{context.capitalize()} failed: {msg}") |
|
|
|
|
|
|
|
|
def invoke_with_fallback( |
|
|
fn: Callable[[str], T], |
|
|
providers: list[str] | None = None, |
|
|
) -> T: |
|
|
""" |
|
|
Try calling fn(provider) for each provider until one succeeds. |
|
|
|
|
|
Args: |
|
|
fn: Function that takes a provider string and returns a result. |
|
|
Should raise an exception on failure. |
|
|
providers: List of provider strings to try. Defaults to DEFAULT_PROVIDERS. |
|
|
|
|
|
Returns: |
|
|
The result from the first successful fn() call. |
|
|
|
|
|
Raises: |
|
|
The last exception if all providers fail. |
|
|
""" |
|
|
if providers is None: |
|
|
providers = DEFAULT_PROVIDERS |
|
|
|
|
|
last_error: Exception | None = None |
|
|
|
|
|
for provider in providers: |
|
|
try: |
|
|
return fn(provider) |
|
|
except Exception as exc: |
|
|
last_error = exc |
|
|
continue |
|
|
|
|
|
|
|
|
if last_error: |
|
|
raise last_error |
|
|
raise RuntimeError("No providers available") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
|
|
|
"_fmt_size", |
|
|
"build_tree", |
|
|
"render_tree", |
|
|
"walk_and_build_tree", |
|
|
"format_dir_listing", |
|
|
|
|
|
"SandboxedRoot", |
|
|
"filesystem_sandbox", |
|
|
"obsidian_sandbox", |
|
|
"ROOT_DIR", |
|
|
"FILESYSTEM_ROOT", |
|
|
"OBSIDIAN_ROOT", |
|
|
"ALLOW_ABS", |
|
|
"_resolve_path", |
|
|
"_display_path", |
|
|
"safe_open", |
|
|
|
|
|
"sandboxed_exec", |
|
|
"create_safe_builtins", |
|
|
|
|
|
"get_hf_token", |
|
|
"HF_TOKEN", |
|
|
"DEFAULT_PROVIDERS", |
|
|
"TEXTGEN_PROVIDERS", |
|
|
"handle_hf_error", |
|
|
"invoke_with_fallback", |
|
|
] |
|
|
|