Tools / Modules /Agent_Terminal.py
Nymbo's picture
Update Modules/Agent_Terminal.py
a05c1fd verified
raw
history blame
11.2 kB
from __future__ import annotations
import os
import sys
import types
import ast
import inspect
import functools
from io import StringIO
from typing import Annotated, get_type_hints, get_origin, get_args
import gradio as gr
from ._docstrings import autodoc
from .File_System import ROOT_DIR, File_System
from .Web_Fetch import Web_Fetch
from .Web_Search import Web_Search
from .Memory_Manager import Memory_Manager
from .Generate_Speech import Generate_Speech, List_Kokoro_Voices, List_Supertonic_Voices
from .Generate_Image import Generate_Image
from .Generate_Video import Generate_Video
from .Deep_Research import Deep_Research
from .Obsidian_Vault import Obsidian_Vault
from .Shell_Command import Shell_Command
from .Code_Interpreter import Code_Interpreter
from app import _log_call_end, _log_call_start, _truncate_for_log
# Example usages for each tool - simple and advanced
_TOOL_EXAMPLES = {
"Web_Fetch": (
'Web_Fetch(url="https://example.com")',
'Web_Fetch(url="https://example.com", max_chars=5000, mode="url_scraper")',
),
"Web_Search": (
'Web_Search(query="Python tutorials")',
'Web_Search(query="AI news", max_results=10, search_type="news", date_filter="week")',
),
"Code_Interpreter": (
'Code_Interpreter(code="print(2 + 2)")',
'Code_Interpreter(code="import math; print(math.pi)", timeout=60)',
),
"Shell_Command": (
'Shell_Command(command="echo Hello")',
'Shell_Command(command="ls -la", timeout=30)',
),
"File_System": (
'File_System(action="list", path="/")',
'File_System(action="read", path="/notes.txt", max_chars=5000)',
),
"Obsidian_Vault": (
'Obsidian_Vault(action="list", path="/")',
'Obsidian_Vault(action="search", query="meeting notes", recursive=True)',
),
"Memory_Manager": (
'Memory_Manager(action="list")',
'Memory_Manager(action="save", text="Remember this fact", tags="important, facts")',
),
"Generate_Speech": (
'Generate_Speech(text="Hello, world!")',
'Generate_Speech(text="Welcome to the demo", model="Kokoro", voice="af_heart", speed=1.2)',
),
"Generate_Image": (
'Generate_Image(prompt="A sunset over mountains")',
'Generate_Image(prompt="A cyberpunk city", steps=50, cfg_scale=9.0, width=1024, height=768)',
),
"Generate_Video": (
'Generate_Video(prompt="A cat playing piano")',
'Generate_Video(prompt="Ocean waves", duration=5, aspect_ratio="16:9")',
),
"Deep_Research": (
'Deep_Research(query="Climate change effects")',
'Deep_Research(query="Quantum computing advances", max_sources=10, search_type="news")',
),
}
def _format_tool_usage(func) -> str:
"""Generate detailed usage information for a tool function."""
name = func.__name__
doc = func.__doc__ or "No description available."
# Extract just the summary (first paragraph) - skip Args/Returns sections
# since we generate our own detailed parameter list
doc_lines = doc.strip().split('\n')
summary_lines = []
for line in doc_lines:
stripped = line.strip().lower()
# Stop at Args:, Returns:, Parameters:, etc.
if stripped.startswith(('args:', 'returns:', 'parameters:', 'raises:', 'example:', 'note:', 'notes:')):
break
summary_lines.append(line)
summary = '\n'.join(summary_lines).strip()
# Get the signature
sig = inspect.signature(func)
# Try to get type hints
try:
hints = get_type_hints(func, include_extras=True)
except Exception:
hints = {}
lines = [f"=== {name} ===", "", summary, "", "Parameters:"]
for param_name, param in sig.parameters.items():
if param_name in ("self", "cls"):
continue
# Get type and description from Annotated if available
hint = hints.get(param_name)
type_str = "any"
desc = ""
if hint is not None:
if get_origin(hint) is Annotated:
args = get_args(hint)
if args:
type_str = getattr(args[0], "__name__", str(args[0]))
if len(args) > 1 and isinstance(args[1], str):
desc = args[1]
else:
type_str = getattr(hint, "__name__", str(hint))
# Check for default
if param.default is not inspect.Parameter.empty:
default_repr = repr(param.default)
if len(default_repr) > 50:
default_repr = default_repr[:47] + "..."
default_str = f" = {default_repr}"
else:
default_str = " (required)"
lines.append(f" - {param_name}: {type_str}{default_str}")
if desc:
lines.append(f" {desc}")
# Add examples
lines.append("")
lines.append("Examples:")
if name in _TOOL_EXAMPLES:
simple, advanced = _TOOL_EXAMPLES[name]
lines.append(f" {simple}")
lines.append(f" {advanced}")
else:
lines.append(f" {name}(...)")
return "\n".join(lines)
def _wrap_tool_for_no_arg_usage(func):
"""
Wrap a tool function so that calling it with no arguments
returns usage information instead of raising an error.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# If called with no arguments, return usage info
if not args and not kwargs:
return _format_tool_usage(func)
return func(*args, **kwargs)
# Preserve the original function for introspection
wrapper._original_func = func
return wrapper
def _get_tools_map():
"""Get all tools wrapped to return usage info when called with no arguments."""
raw_tools = {
"Web_Fetch": Web_Fetch,
"Web_Search": Web_Search,
"Memory_Manager": Memory_Manager,
"Generate_Speech": Generate_Speech,
"List_Kokoro_Voices": List_Kokoro_Voices,
"List_Supertonic_Voices": List_Supertonic_Voices,
"Generate_Image": Generate_Image,
"Generate_Video": Generate_Video,
"Deep_Research": Deep_Research,
"File_System": File_System,
"Obsidian_Vault": Obsidian_Vault,
"Shell_Command": Shell_Command,
"Code_Interpreter": Code_Interpreter,
}
return {name: _wrap_tool_for_no_arg_usage(func) for name, func in raw_tools.items()}
def search_tools(query: str) -> str:
"""Search for tools by name or description. Returns usage info for matches."""
query = query.lower()
matches = []
tools = _get_tools_map()
for name, func in tools.items():
# Get original function for docstring if wrapped
original = getattr(func, '_original_func', func)
doc = (original.__doc__ or "").lower()
if query in name.lower() or query in doc:
matches.append((name, func))
if not matches:
return f"No tools found matching '{query}'."
output = []
for name, func in matches:
output.append(_format_tool_usage(getattr(func, '_original_func', func)))
output.append("")
return "\n".join(output)
def _initialize_mock_modules():
"""
Registers a mock 'functions' module in sys.modules so that LLMs
can do 'from functions import ...' without error.
Uses wrapped tools that return usage info when called with no args.
"""
mock_module = types.ModuleType("functions")
# Add wrapped tools (return usage when called with no args)
for name, tool in _get_tools_map().items():
setattr(mock_module, name, tool)
# Add helpers
helpers = {
"search_tools": search_tools,
}
for name, func in helpers.items():
setattr(mock_module, name, func)
sys.modules["functions"] = mock_module
_initialize_mock_modules()
# Single source of truth for the LLM-facing tool description
TOOL_SUMMARY = (
"Executes Python code as the unified interface for the entire tools ecosystem. "
"Use Agent Terminal repeatedly whenever you need to chain or combine tool operations. Input must be JSON that will be executed in Python. "
"Available tools: `Web_Fetch`, `Web_Search`, `Code_Interpreter`, `Shell_Command`, `File_System`, `Obsidian_Vault`, `Memory_Manager`, `Generate_Speech`, `Generate_Image`, `Generate_Video`, `Deep_Research`."
)
@autodoc(
summary=TOOL_SUMMARY,
)
def Agent_Terminal(input: Annotated[str, (
"Python source code to run; stdout is captured and returned. "
"Use `search_tools(`query`)` to search tools by name or capability, returns tool definitions and examples. "
"Call any tool with no arguments to get its full usage info (e.g., `Generate_Image()`)."
)]) -> str:
_log_call_start("Agent_Terminal", input=_truncate_for_log(input or "", 300))
if input is None:
result = "No code provided."
_log_call_end("Agent_Terminal", result)
return result
old_stdout = sys.stdout
old_cwd = os.getcwd()
redirected_output = sys.stdout = StringIO()
# Get wrapped tools that return usage info when called with no args
wrapped_tools = _get_tools_map()
# Prepare the execution environment with all tools
tools_env = {
**wrapped_tools,
"search_tools": search_tools,
"print": print, # Ensure print is available
"__builtins__": __builtins__,
}
try:
os.chdir(ROOT_DIR)
# Parse code and print results of ALL expression statements (not just the last)
tree = ast.parse(input)
for node in tree.body:
if isinstance(node, ast.Expr):
# This is a standalone expression - evaluate and print its result
expr = compile(ast.Expression(node.value), filename="<string>", mode="eval")
result_val = eval(expr, tools_env)
if result_val is not None:
print(result_val)
else:
# This is a statement (assignment, if, for, etc.) - just execute it
mod = ast.Module(body=[node], type_ignores=[])
exec(compile(mod, filename="<string>", mode="exec"), tools_env)
result = redirected_output.getvalue()
except Exception as exc: # pylint: disable=broad-except
result = str(exc)
finally:
sys.stdout = old_stdout
try:
os.chdir(old_cwd)
except Exception:
pass
_log_call_end("Agent_Terminal", _truncate_for_log(result))
return result
def build_interface() -> gr.Interface:
return gr.Interface(
fn=Agent_Terminal,
inputs=gr.Code(label="Python Code", language="python"),
outputs=gr.Textbox(label="Output", lines=5, max_lines=20),
title="Agent Terminal",
description="<div style=\"text-align:center\">Interact with all other tools via a Python API. Reduces token usage by 90%.</div>",
api_description=TOOL_SUMMARY,
flagging_mode="never",
)
__all__ = ["Agent_Terminal", "build_interface"]