File size: 11,163 Bytes
5d58591 934aebe e5e1de3 6fcf4d0 5d58591 6fcf4d0 5d58591 195b607 5d58591 6fcf4d0 5d58591 6fcf4d0 5d58591 195b607 5d58591 6fcf4d0 5d58591 6fcf4d0 5d58591 6fcf4d0 5d58591 934aebe 6fcf4d0 934aebe 6fcf4d0 934aebe 5d58591 a05c1fd e5e1de3 5d58591 35a7647 5d58591 39f8869 e5e1de3 a05c1fd e5e1de3 39f8869 5d58591 6fcf4d0 5d58591 6fcf4d0 5d58591 35a7647 5d58591 e5e1de3 6fcf4d0 39f8869 6fcf4d0 e5e1de3 5d58591 527e7a6 5d58591 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
from __future__ import annotations
import os
import sys
import types
import ast
import inspect
import functools
from io import StringIO
from typing import Annotated, get_type_hints, get_origin, get_args
import gradio as gr
from ._docstrings import autodoc
from .File_System import ROOT_DIR, File_System
from .Web_Fetch import Web_Fetch
from .Web_Search import Web_Search
from .Memory_Manager import Memory_Manager
from .Generate_Speech import Generate_Speech, List_Kokoro_Voices, List_Supertonic_Voices
from .Generate_Image import Generate_Image
from .Generate_Video import Generate_Video
from .Deep_Research import Deep_Research
from .Obsidian_Vault import Obsidian_Vault
from .Shell_Command import Shell_Command
from .Code_Interpreter import Code_Interpreter
from app import _log_call_end, _log_call_start, _truncate_for_log
# Example usages for each tool - simple and advanced
_TOOL_EXAMPLES = {
"Web_Fetch": (
'Web_Fetch(url="https://example.com")',
'Web_Fetch(url="https://example.com", max_chars=5000, mode="url_scraper")',
),
"Web_Search": (
'Web_Search(query="Python tutorials")',
'Web_Search(query="AI news", max_results=10, search_type="news", date_filter="week")',
),
"Code_Interpreter": (
'Code_Interpreter(code="print(2 + 2)")',
'Code_Interpreter(code="import math; print(math.pi)", timeout=60)',
),
"Shell_Command": (
'Shell_Command(command="echo Hello")',
'Shell_Command(command="ls -la", timeout=30)',
),
"File_System": (
'File_System(action="list", path="/")',
'File_System(action="read", path="/notes.txt", max_chars=5000)',
),
"Obsidian_Vault": (
'Obsidian_Vault(action="list", path="/")',
'Obsidian_Vault(action="search", query="meeting notes", recursive=True)',
),
"Memory_Manager": (
'Memory_Manager(action="list")',
'Memory_Manager(action="save", text="Remember this fact", tags="important, facts")',
),
"Generate_Speech": (
'Generate_Speech(text="Hello, world!")',
'Generate_Speech(text="Welcome to the demo", model="Kokoro", voice="af_heart", speed=1.2)',
),
"Generate_Image": (
'Generate_Image(prompt="A sunset over mountains")',
'Generate_Image(prompt="A cyberpunk city", steps=50, cfg_scale=9.0, width=1024, height=768)',
),
"Generate_Video": (
'Generate_Video(prompt="A cat playing piano")',
'Generate_Video(prompt="Ocean waves", duration=5, aspect_ratio="16:9")',
),
"Deep_Research": (
'Deep_Research(query="Climate change effects")',
'Deep_Research(query="Quantum computing advances", max_sources=10, search_type="news")',
),
}
def _format_tool_usage(func) -> str:
"""Generate detailed usage information for a tool function."""
name = func.__name__
doc = func.__doc__ or "No description available."
# Extract just the summary (first paragraph) - skip Args/Returns sections
# since we generate our own detailed parameter list
doc_lines = doc.strip().split('\n')
summary_lines = []
for line in doc_lines:
stripped = line.strip().lower()
# Stop at Args:, Returns:, Parameters:, etc.
if stripped.startswith(('args:', 'returns:', 'parameters:', 'raises:', 'example:', 'note:', 'notes:')):
break
summary_lines.append(line)
summary = '\n'.join(summary_lines).strip()
# Get the signature
sig = inspect.signature(func)
# Try to get type hints
try:
hints = get_type_hints(func, include_extras=True)
except Exception:
hints = {}
lines = [f"=== {name} ===", "", summary, "", "Parameters:"]
for param_name, param in sig.parameters.items():
if param_name in ("self", "cls"):
continue
# Get type and description from Annotated if available
hint = hints.get(param_name)
type_str = "any"
desc = ""
if hint is not None:
if get_origin(hint) is Annotated:
args = get_args(hint)
if args:
type_str = getattr(args[0], "__name__", str(args[0]))
if len(args) > 1 and isinstance(args[1], str):
desc = args[1]
else:
type_str = getattr(hint, "__name__", str(hint))
# Check for default
if param.default is not inspect.Parameter.empty:
default_repr = repr(param.default)
if len(default_repr) > 50:
default_repr = default_repr[:47] + "..."
default_str = f" = {default_repr}"
else:
default_str = " (required)"
lines.append(f" - {param_name}: {type_str}{default_str}")
if desc:
lines.append(f" {desc}")
# Add examples
lines.append("")
lines.append("Examples:")
if name in _TOOL_EXAMPLES:
simple, advanced = _TOOL_EXAMPLES[name]
lines.append(f" {simple}")
lines.append(f" {advanced}")
else:
lines.append(f" {name}(...)")
return "\n".join(lines)
def _wrap_tool_for_no_arg_usage(func):
"""
Wrap a tool function so that calling it with no arguments
returns usage information instead of raising an error.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# If called with no arguments, return usage info
if not args and not kwargs:
return _format_tool_usage(func)
return func(*args, **kwargs)
# Preserve the original function for introspection
wrapper._original_func = func
return wrapper
def _get_tools_map():
"""Get all tools wrapped to return usage info when called with no arguments."""
raw_tools = {
"Web_Fetch": Web_Fetch,
"Web_Search": Web_Search,
"Memory_Manager": Memory_Manager,
"Generate_Speech": Generate_Speech,
"List_Kokoro_Voices": List_Kokoro_Voices,
"List_Supertonic_Voices": List_Supertonic_Voices,
"Generate_Image": Generate_Image,
"Generate_Video": Generate_Video,
"Deep_Research": Deep_Research,
"File_System": File_System,
"Obsidian_Vault": Obsidian_Vault,
"Shell_Command": Shell_Command,
"Code_Interpreter": Code_Interpreter,
}
return {name: _wrap_tool_for_no_arg_usage(func) for name, func in raw_tools.items()}
def search_tools(query: str) -> str:
"""Search for tools by name or description. Returns usage info for matches."""
query = query.lower()
matches = []
tools = _get_tools_map()
for name, func in tools.items():
# Get original function for docstring if wrapped
original = getattr(func, '_original_func', func)
doc = (original.__doc__ or "").lower()
if query in name.lower() or query in doc:
matches.append((name, func))
if not matches:
return f"No tools found matching '{query}'."
output = []
for name, func in matches:
output.append(_format_tool_usage(getattr(func, '_original_func', func)))
output.append("")
return "\n".join(output)
def _initialize_mock_modules():
"""
Registers a mock 'functions' module in sys.modules so that LLMs
can do 'from functions import ...' without error.
Uses wrapped tools that return usage info when called with no args.
"""
mock_module = types.ModuleType("functions")
# Add wrapped tools (return usage when called with no args)
for name, tool in _get_tools_map().items():
setattr(mock_module, name, tool)
# Add helpers
helpers = {
"search_tools": search_tools,
}
for name, func in helpers.items():
setattr(mock_module, name, func)
sys.modules["functions"] = mock_module
_initialize_mock_modules()
# Single source of truth for the LLM-facing tool description
TOOL_SUMMARY = (
"Executes Python code as the unified interface for the entire tools ecosystem. "
"Use Agent Terminal repeatedly whenever you need to chain or combine tool operations. Input must be JSON that will be executed in Python. "
"Available tools: `Web_Fetch`, `Web_Search`, `Code_Interpreter`, `Shell_Command`, `File_System`, `Obsidian_Vault`, `Memory_Manager`, `Generate_Speech`, `Generate_Image`, `Generate_Video`, `Deep_Research`."
)
@autodoc(
summary=TOOL_SUMMARY,
)
def Agent_Terminal(input: Annotated[str, (
"Python source code to run; stdout is captured and returned. "
"Use `search_tools(`query`)` to search tools by name or capability, returns tool definitions and examples. "
"Call any tool with no arguments to get its full usage info (e.g., `Generate_Image()`)."
)]) -> str:
_log_call_start("Agent_Terminal", input=_truncate_for_log(input or "", 300))
if input is None:
result = "No code provided."
_log_call_end("Agent_Terminal", result)
return result
old_stdout = sys.stdout
old_cwd = os.getcwd()
redirected_output = sys.stdout = StringIO()
# Get wrapped tools that return usage info when called with no args
wrapped_tools = _get_tools_map()
# Prepare the execution environment with all tools
tools_env = {
**wrapped_tools,
"search_tools": search_tools,
"print": print, # Ensure print is available
"__builtins__": __builtins__,
}
try:
os.chdir(ROOT_DIR)
# Parse code and print results of ALL expression statements (not just the last)
tree = ast.parse(input)
for node in tree.body:
if isinstance(node, ast.Expr):
# This is a standalone expression - evaluate and print its result
expr = compile(ast.Expression(node.value), filename="<string>", mode="eval")
result_val = eval(expr, tools_env)
if result_val is not None:
print(result_val)
else:
# This is a statement (assignment, if, for, etc.) - just execute it
mod = ast.Module(body=[node], type_ignores=[])
exec(compile(mod, filename="<string>", mode="exec"), tools_env)
result = redirected_output.getvalue()
except Exception as exc: # pylint: disable=broad-except
result = str(exc)
finally:
sys.stdout = old_stdout
try:
os.chdir(old_cwd)
except Exception:
pass
_log_call_end("Agent_Terminal", _truncate_for_log(result))
return result
def build_interface() -> gr.Interface:
return gr.Interface(
fn=Agent_Terminal,
inputs=gr.Code(label="Python Code", language="python"),
outputs=gr.Textbox(label="Output", lines=5, max_lines=20),
title="Agent Terminal",
description="<div style=\"text-align:center\">Interact with all other tools via a Python API. Reduces token usage by 90%.</div>",
api_description=TOOL_SUMMARY,
flagging_mode="never",
)
__all__ = ["Agent_Terminal", "build_interface"]
|