|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import types |
|
|
import ast |
|
|
import inspect |
|
|
import functools |
|
|
from io import StringIO |
|
|
from typing import Annotated, get_type_hints, get_origin, get_args |
|
|
|
|
|
import gradio as gr |
|
|
from ._docstrings import autodoc |
|
|
from .File_System import ROOT_DIR, File_System |
|
|
from .Web_Fetch import Web_Fetch |
|
|
from .Web_Search import Web_Search |
|
|
from .Memory_Manager import Memory_Manager |
|
|
from .Generate_Speech import Generate_Speech, List_Kokoro_Voices, List_Supertonic_Voices |
|
|
from .Generate_Image import Generate_Image |
|
|
from .Generate_Video import Generate_Video |
|
|
from .Deep_Research import Deep_Research |
|
|
from .Obsidian_Vault import Obsidian_Vault |
|
|
from .Shell_Command import Shell_Command |
|
|
from .Code_Interpreter import Code_Interpreter |
|
|
|
|
|
from app import _log_call_end, _log_call_start, _truncate_for_log |
|
|
|
|
|
|
|
|
|
|
|
_TOOL_EXAMPLES = { |
|
|
"Web_Fetch": ( |
|
|
'Web_Fetch(url="https://example.com")', |
|
|
'Web_Fetch(url="https://example.com", max_chars=5000, mode="url_scraper")', |
|
|
), |
|
|
"Web_Search": ( |
|
|
'Web_Search(query="Python tutorials")', |
|
|
'Web_Search(query="AI news", max_results=10, search_type="news", date_filter="week")', |
|
|
), |
|
|
"Code_Interpreter": ( |
|
|
'Code_Interpreter(code="print(2 + 2)")', |
|
|
'Code_Interpreter(code="import math; print(math.pi)", timeout=60)', |
|
|
), |
|
|
"Shell_Command": ( |
|
|
'Shell_Command(command="echo Hello")', |
|
|
'Shell_Command(command="ls -la", timeout=30)', |
|
|
), |
|
|
"File_System": ( |
|
|
'File_System(action="list", path="/")', |
|
|
'File_System(action="read", path="/notes.txt", max_chars=5000)', |
|
|
), |
|
|
"Obsidian_Vault": ( |
|
|
'Obsidian_Vault(action="list", path="/")', |
|
|
'Obsidian_Vault(action="search", query="meeting notes", recursive=True)', |
|
|
), |
|
|
"Memory_Manager": ( |
|
|
'Memory_Manager(action="list")', |
|
|
'Memory_Manager(action="save", text="Remember this fact", tags="important, facts")', |
|
|
), |
|
|
"Generate_Speech": ( |
|
|
'Generate_Speech(text="Hello, world!")', |
|
|
'Generate_Speech(text="Welcome to the demo", model="Kokoro", voice="af_heart", speed=1.2)', |
|
|
), |
|
|
"Generate_Image": ( |
|
|
'Generate_Image(prompt="A sunset over mountains")', |
|
|
'Generate_Image(prompt="A cyberpunk city", steps=50, cfg_scale=9.0, width=1024, height=768)', |
|
|
), |
|
|
"Generate_Video": ( |
|
|
'Generate_Video(prompt="A cat playing piano")', |
|
|
'Generate_Video(prompt="Ocean waves", duration=5, aspect_ratio="16:9")', |
|
|
), |
|
|
"Deep_Research": ( |
|
|
'Deep_Research(query="Climate change effects")', |
|
|
'Deep_Research(query="Quantum computing advances", max_sources=10, search_type="news")', |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
def _format_tool_usage(func) -> str: |
|
|
"""Generate detailed usage information for a tool function.""" |
|
|
name = func.__name__ |
|
|
doc = func.__doc__ or "No description available." |
|
|
|
|
|
|
|
|
|
|
|
doc_lines = doc.strip().split('\n') |
|
|
summary_lines = [] |
|
|
for line in doc_lines: |
|
|
stripped = line.strip().lower() |
|
|
|
|
|
if stripped.startswith(('args:', 'returns:', 'parameters:', 'raises:', 'example:', 'note:', 'notes:')): |
|
|
break |
|
|
summary_lines.append(line) |
|
|
summary = '\n'.join(summary_lines).strip() |
|
|
|
|
|
|
|
|
sig = inspect.signature(func) |
|
|
|
|
|
|
|
|
try: |
|
|
hints = get_type_hints(func, include_extras=True) |
|
|
except Exception: |
|
|
hints = {} |
|
|
|
|
|
lines = [f"=== {name} ===", "", summary, "", "Parameters:"] |
|
|
|
|
|
for param_name, param in sig.parameters.items(): |
|
|
if param_name in ("self", "cls"): |
|
|
continue |
|
|
|
|
|
|
|
|
hint = hints.get(param_name) |
|
|
type_str = "any" |
|
|
desc = "" |
|
|
|
|
|
if hint is not None: |
|
|
if get_origin(hint) is Annotated: |
|
|
args = get_args(hint) |
|
|
if args: |
|
|
type_str = getattr(args[0], "__name__", str(args[0])) |
|
|
if len(args) > 1 and isinstance(args[1], str): |
|
|
desc = args[1] |
|
|
else: |
|
|
type_str = getattr(hint, "__name__", str(hint)) |
|
|
|
|
|
|
|
|
if param.default is not inspect.Parameter.empty: |
|
|
default_repr = repr(param.default) |
|
|
if len(default_repr) > 50: |
|
|
default_repr = default_repr[:47] + "..." |
|
|
default_str = f" = {default_repr}" |
|
|
else: |
|
|
default_str = " (required)" |
|
|
|
|
|
lines.append(f" - {param_name}: {type_str}{default_str}") |
|
|
if desc: |
|
|
lines.append(f" {desc}") |
|
|
|
|
|
|
|
|
lines.append("") |
|
|
lines.append("Examples:") |
|
|
if name in _TOOL_EXAMPLES: |
|
|
simple, advanced = _TOOL_EXAMPLES[name] |
|
|
lines.append(f" {simple}") |
|
|
lines.append(f" {advanced}") |
|
|
else: |
|
|
lines.append(f" {name}(...)") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def _wrap_tool_for_no_arg_usage(func): |
|
|
""" |
|
|
Wrap a tool function so that calling it with no arguments |
|
|
returns usage information instead of raising an error. |
|
|
""" |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
|
|
|
if not args and not kwargs: |
|
|
return _format_tool_usage(func) |
|
|
return func(*args, **kwargs) |
|
|
|
|
|
|
|
|
wrapper._original_func = func |
|
|
return wrapper |
|
|
|
|
|
def _get_tools_map(): |
|
|
"""Get all tools wrapped to return usage info when called with no arguments.""" |
|
|
raw_tools = { |
|
|
"Web_Fetch": Web_Fetch, |
|
|
"Web_Search": Web_Search, |
|
|
"Memory_Manager": Memory_Manager, |
|
|
"Generate_Speech": Generate_Speech, |
|
|
"List_Kokoro_Voices": List_Kokoro_Voices, |
|
|
"List_Supertonic_Voices": List_Supertonic_Voices, |
|
|
"Generate_Image": Generate_Image, |
|
|
"Generate_Video": Generate_Video, |
|
|
"Deep_Research": Deep_Research, |
|
|
"File_System": File_System, |
|
|
"Obsidian_Vault": Obsidian_Vault, |
|
|
"Shell_Command": Shell_Command, |
|
|
"Code_Interpreter": Code_Interpreter, |
|
|
} |
|
|
return {name: _wrap_tool_for_no_arg_usage(func) for name, func in raw_tools.items()} |
|
|
|
|
|
def search_tools(query: str) -> str: |
|
|
"""Search for tools by name or description. Returns usage info for matches.""" |
|
|
query = query.lower() |
|
|
matches = [] |
|
|
tools = _get_tools_map() |
|
|
for name, func in tools.items(): |
|
|
|
|
|
original = getattr(func, '_original_func', func) |
|
|
doc = (original.__doc__ or "").lower() |
|
|
if query in name.lower() or query in doc: |
|
|
matches.append((name, func)) |
|
|
|
|
|
if not matches: |
|
|
return f"No tools found matching '{query}'." |
|
|
|
|
|
output = [] |
|
|
for name, func in matches: |
|
|
output.append(_format_tool_usage(getattr(func, '_original_func', func))) |
|
|
output.append("") |
|
|
return "\n".join(output) |
|
|
|
|
|
def _initialize_mock_modules(): |
|
|
""" |
|
|
Registers a mock 'functions' module in sys.modules so that LLMs |
|
|
can do 'from functions import ...' without error. |
|
|
Uses wrapped tools that return usage info when called with no args. |
|
|
""" |
|
|
mock_module = types.ModuleType("functions") |
|
|
|
|
|
|
|
|
for name, tool in _get_tools_map().items(): |
|
|
setattr(mock_module, name, tool) |
|
|
|
|
|
|
|
|
helpers = { |
|
|
"search_tools": search_tools, |
|
|
} |
|
|
for name, func in helpers.items(): |
|
|
setattr(mock_module, name, func) |
|
|
|
|
|
sys.modules["functions"] = mock_module |
|
|
|
|
|
_initialize_mock_modules() |
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Executes Python code as the unified interface for the entire tools ecosystem. " |
|
|
"Use Agent Terminal repeatedly whenever you need to chain or combine tool operations. Input must be JSON that will be executed in Python. " |
|
|
"Available tools: `Web_Fetch`, `Web_Search`, `Code_Interpreter`, `Shell_Command`, `File_System`, `Obsidian_Vault`, `Memory_Manager`, `Generate_Speech`, `Generate_Image`, `Generate_Video`, `Deep_Research`." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@autodoc( |
|
|
summary=TOOL_SUMMARY, |
|
|
) |
|
|
def Agent_Terminal(input: Annotated[str, ( |
|
|
"Python source code to run; stdout is captured and returned. " |
|
|
"Use `search_tools(`query`)` to search tools by name or capability, returns tool definitions and examples. " |
|
|
"Call any tool with no arguments to get its full usage info (e.g., `Generate_Image()`)." |
|
|
)]) -> str: |
|
|
_log_call_start("Agent_Terminal", input=_truncate_for_log(input or "", 300)) |
|
|
if input is None: |
|
|
result = "No code provided." |
|
|
_log_call_end("Agent_Terminal", result) |
|
|
return result |
|
|
old_stdout = sys.stdout |
|
|
old_cwd = os.getcwd() |
|
|
redirected_output = sys.stdout = StringIO() |
|
|
|
|
|
|
|
|
wrapped_tools = _get_tools_map() |
|
|
|
|
|
|
|
|
tools_env = { |
|
|
**wrapped_tools, |
|
|
"search_tools": search_tools, |
|
|
"print": print, |
|
|
"__builtins__": __builtins__, |
|
|
} |
|
|
|
|
|
try: |
|
|
os.chdir(ROOT_DIR) |
|
|
|
|
|
|
|
|
tree = ast.parse(input) |
|
|
|
|
|
for node in tree.body: |
|
|
if isinstance(node, ast.Expr): |
|
|
|
|
|
expr = compile(ast.Expression(node.value), filename="<string>", mode="eval") |
|
|
result_val = eval(expr, tools_env) |
|
|
if result_val is not None: |
|
|
print(result_val) |
|
|
else: |
|
|
|
|
|
mod = ast.Module(body=[node], type_ignores=[]) |
|
|
exec(compile(mod, filename="<string>", mode="exec"), tools_env) |
|
|
|
|
|
result = redirected_output.getvalue() |
|
|
except Exception as exc: |
|
|
result = str(exc) |
|
|
finally: |
|
|
sys.stdout = old_stdout |
|
|
try: |
|
|
os.chdir(old_cwd) |
|
|
except Exception: |
|
|
pass |
|
|
_log_call_end("Agent_Terminal", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
return gr.Interface( |
|
|
fn=Agent_Terminal, |
|
|
inputs=gr.Code(label="Python Code", language="python"), |
|
|
outputs=gr.Textbox(label="Output", lines=5, max_lines=20), |
|
|
title="Agent Terminal", |
|
|
description="<div style=\"text-align:center\">Interact with all other tools via a Python API. Reduces token usage by 90%.</div>", |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["Agent_Terminal", "build_interface"] |
|
|
|