OpenMythos / agent /tools.py
KingNish's picture
BetterTools
72cc67e
Raw
History Blame Contribute Delete
11.7 kB
from markitdown import MarkItDown
from typing import Any, Callable, Generator, get_type_hints
import inspect
import requests
def python_type_to_json_schema(tp: type) -> str:
"""Map a Python type to a JSON Schema type string."""
mapping = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
return mapping.get(tp, "string")
class Tool:
"""A callable tool the agent can invoke."""
def __init__(
self,
name: str,
description: str,
parameters: dict,
handler: Callable[..., str],
streamable: bool = False,
) -> None:
self.name = name
self.description = description
self.parameters = parameters
self.handler = handler
self.streamable = streamable
def to_openai_spec(self) -> dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
def run(self, **kwargs: Any) -> str:
return self.handler(**kwargs)
def stream(self, **kwargs: Any) -> Generator[str, None, None]:
"""Yield partial results for streamable tools.
Override in subclasses or use streamable=True with a generator handler.
"""
if self.streamable and callable(self.handler):
result = self.handler(**kwargs)
if isinstance(result, Generator):
yield from result
else:
yield str(result)
else:
yield self.handler(**kwargs)
def _parse_docstring(docstring: str) -> tuple[str, dict[str, tuple[bool, str]]]:
"""Parse a tool docstring into description and param metadata.
Returns:
(description, {param_name: (required, description)})
Expected format:
First line: tool description.
Subsequent lines: ``param_name (required): description`` or
``param_name: description``.
"""
lines = (docstring or "").strip().split("\n")
description = lines[0].strip()
param_info: dict[str, tuple[bool, str]] = {}
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Match: param_name (required): description
# or: param_name: description
if ":" not in line:
continue
key, desc = line.split(":", 1)
key = key.strip()
desc = desc.strip()
required = False
if key.endswith("(required)"):
required = True
key = key[: -len("(required)")].strip()
if key:
param_info[key] = (required, desc)
return description, param_info
def tool(fn: Callable[..., str]) -> Tool:
"""Decorator that converts a function into a Tool instance.
Extracts name, description (first line of docstring), and parameters
from the function's type hints and signature.
Docstring format:
First line: tool description.
Subsequent lines: ``param_name (required): description`` or
``param_name: description``.
"""
name = fn.__name__
docstring = fn.__doc__ or ""
description, param_info = _parse_docstring(docstring)
hints = get_type_hints(fn)
sig = inspect.signature(fn)
properties: dict[str, dict] = {}
required: list[str] = []
for param_name, param in sig.parameters.items():
if param_name in hints:
param_schema: dict[str, Any] = {
"type": python_type_to_json_schema(hints[param_name])
}
# Enrich with docstring info if present
if param_name in param_info:
doc_required, doc_desc = param_info[param_name]
if doc_desc:
param_schema["description"] = doc_desc
# Docstring (required) overrides signature default check
if doc_required:
required.append(param_name)
elif param.default is inspect.Parameter.empty:
required.append(param_name)
elif param.default is inspect.Parameter.empty:
required.append(param_name)
properties[param_name] = param_schema
parameters = {
"type": "object",
"properties": properties,
"required": required,
}
return Tool(
name=name,
description=description,
parameters=parameters,
handler=fn,
)
@tool
def fetch_webpage(url: str) -> str:
"""Fetch a webpage and return its text content.
url (required): The URL to fetch
"""
try:
jina_ai_url = "https://r.jina.ai/"
response = requests.get(jina_ai_url + url)
response.raise_for_status()
return response.text
except Exception as e:
md = MarkItDown()
return md.convert(url).text_content
FETCH_WEBPAGE_TOOL = fetch_webpage # @tool already makes it a Tool instance
# ---------------------------------------------------------------------------
# Shell tool (streamable)
# ---------------------------------------------------------------------------
import time
import uuid as _uuid
from .shell import get_shell_manager
def _shell_handler(
command: str = "",
session_id: str = "",
input_text: str = "",
) -> str:
"""Run shell commands interactively with persistent sessions.
command: The shell command to execute (omit when checking output or sending input)
session_id: Session ID to check output or send input to (omit to start new command)
input_text: Text to send to running session's stdin
How it works:
- Start new command: provide command, returns session_id immediately (non-blocking)
- Check output: provide session_id only, returns current output
- Send input: provide session_id + input_text
- Sessions auto-destroy after 15 min idle
- Each session runs in its own temp folder (also cleaned up on timeout)
- Environment variables persist across calls in the same session
"""
manager = get_shell_manager()
existing_session = session_id and session_id in manager.sessions
# Send input to running session
if existing_session and input_text:
sent = manager.send_input(session_id, input_text)
if not sent:
return f"Error: Session '{session_id}' closed or not found"
time.sleep(0.3)
output = manager.poll_output(session_id)
running = manager.is_running(session_id)
status = "running" if running else f"exited (code {manager.sessions[session_id].returncode})"
if output:
return f"[{session_id}] {status}:\n{output}"
return f"[{session_id}] {status} (no new output)"
# Check output of existing session
if existing_session:
output = manager.get_output(session_id)
running = manager.is_running(session_id)
code = manager.sessions[session_id].returncode
status = "running" if running else f"exited with code {code}"
if output:
return f"[{session_id}] {status}:\n{output}"
return f"[{session_id}] {status}"
# Need a command to start a new session
if not command.strip():
if session_id:
return f"Error: Session '{session_id}' not found or expired"
return "Error: Provide a command to start a new session, or a session_id to check status"
# Start new command
sid = session_id or str(_uuid.uuid4())[:8]
session = manager.start(sid, command)
# Wait a bit to capture initial output (fast commands finish here)
time.sleep(0.5)
initial = session.read_new_output()
running = session.is_running()
if not running:
# Command finished quickly
code = session.process.returncode
final = session.read_new_output()
output = (initial + final).strip()
if output:
return f"[{sid}] exited with code {code}:\n{output}"
return f"[{sid}] exited with code {code}"
# Command still running — return status so model can check later
if initial:
return f"[{sid}] running (PID {session.pid}):\n{initial}\n\nCall again with session_id=\"{sid}\" to check output."
return f"[{sid}] running (PID {session.pid})\n\nCall again with session_id=\"{sid}\" to check output."
SHELL_TOOL = Tool(
name="shell",
description="Run shell commands with persistent sessions. Start command -> get session_id. Call with session_id to check output or send input. Sessions auto-destroy after 15 min idle. Each session has its own temp folder.",
parameters={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute (omit when checking output or sending input)",
},
"session_id": {
"type": "string",
"description": "Session ID to check output or send input (omit to start new command)",
},
"input_text": {
"type": "string",
"description": "Text to send to running session's stdin",
},
},
"required": [],
},
handler=_shell_handler,
streamable=False,
)
# Shared cache for read_tool_response - agent writes, tool reads
_TOOL_RESULTS_CACHE: dict[str, str] = {}
def _read_tool_handler(tool_call_id: str, start_line: int, num_lines: int = 50) -> str:
"""Read more lines from a truncated tool response.
tool_call_id (required): The tool_call_id from the truncated response
start_line (required): Line number to start reading from
num_lines: Number of lines to read (default 50)
"""
full = _TOOL_RESULTS_CACHE.get(tool_call_id)
if full is None:
return f"Error: No result found for tool_call_id '{tool_call_id}'"
lines = full.split("\n")
total = len(lines)
if start_line >= total:
return f"Error: start_line {start_line} >= total lines {total}"
end = min(start_line + num_lines, total)
chunk = "\n".join(lines[start_line:end])
remaining = total - end
header = f"Lines {start_line}-{end} of {total}"
if remaining > 0:
header += f" ({remaining} lines remaining)"
return f"{header}\n\n{chunk}"
READ_TOOL = Tool(
name="read_tool_response",
description="Read more lines from a truncated tool response. Use when a previous tool output was truncated.",
parameters={
"type": "object",
"properties": {
"tool_call_id": {
"type": "string",
"description": "The tool_call_id from the truncated response",
},
"start_line": {
"type": "integer",
"description": "Line number to start reading from (0-indexed)",
},
"num_lines": {
"type": "integer",
"description": "Number of lines to read (default 50)",
},
},
"required": ["tool_call_id", "start_line"],
},
handler=_read_tool_handler,
)
FINAL_MESSAGE_TOOL = Tool(
name="final_message",
description=(
"Signal that you have completed your response and want "
"to end the conversation. Call this ONLY when you are "
"truly done. Until you call this tool, the conversation "
"will continue. Means you will multiple times answer the"
"same question or can get stuck in loops if you never call it."
),
parameters={"type": "object", "properties": {}, "required": []},
handler=lambda: "",
)