File size: 11,704 Bytes
e3b02b7 10fbb32 2407c30 a991154 e3b02b7 2407c30 e3b02b7 10fbb32 e3b02b7 10fbb32 e3b02b7 10fbb32 e3b02b7 2407c30 e3b02b7 2407c30 a991154 e3b02b7 2407c30 10fbb32 6ee487c 10fbb32 3cf0269 6ee487c 10fbb32 6ee487c 3cf0269 10fbb32 3cf0269 6ee487c 10fbb32 3cf0269 10fbb32 6ee487c 3cf0269 10fbb32 3cf0269 10fbb32 3cf0269 10fbb32 3cf0269 10fbb32 3cf0269 10fbb32 6ee487c 3cf0269 10fbb32 3cf0269 10fbb32 3cf0269 10fbb32 3cf0269 10fbb32 6ee487c 10fbb32 6ee487c 10fbb32 6ee487c 10fbb32 6ee487c 10fbb32 3cf0269 10fbb32 72cc67e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 | from markitdown import MarkItDown
from typing import Any, Callable, Generator, get_type_hints
import inspect
import requests
def python_type_to_json_schema(tp: type) -> str:
"""Map a Python type to a JSON Schema type string."""
mapping = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
return mapping.get(tp, "string")
class Tool:
"""A callable tool the agent can invoke."""
def __init__(
self,
name: str,
description: str,
parameters: dict,
handler: Callable[..., str],
streamable: bool = False,
) -> None:
self.name = name
self.description = description
self.parameters = parameters
self.handler = handler
self.streamable = streamable
def to_openai_spec(self) -> dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
def run(self, **kwargs: Any) -> str:
return self.handler(**kwargs)
def stream(self, **kwargs: Any) -> Generator[str, None, None]:
"""Yield partial results for streamable tools.
Override in subclasses or use streamable=True with a generator handler.
"""
if self.streamable and callable(self.handler):
result = self.handler(**kwargs)
if isinstance(result, Generator):
yield from result
else:
yield str(result)
else:
yield self.handler(**kwargs)
def _parse_docstring(docstring: str) -> tuple[str, dict[str, tuple[bool, str]]]:
"""Parse a tool docstring into description and param metadata.
Returns:
(description, {param_name: (required, description)})
Expected format:
First line: tool description.
Subsequent lines: ``param_name (required): description`` or
``param_name: description``.
"""
lines = (docstring or "").strip().split("\n")
description = lines[0].strip()
param_info: dict[str, tuple[bool, str]] = {}
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Match: param_name (required): description
# or: param_name: description
if ":" not in line:
continue
key, desc = line.split(":", 1)
key = key.strip()
desc = desc.strip()
required = False
if key.endswith("(required)"):
required = True
key = key[: -len("(required)")].strip()
if key:
param_info[key] = (required, desc)
return description, param_info
def tool(fn: Callable[..., str]) -> Tool:
"""Decorator that converts a function into a Tool instance.
Extracts name, description (first line of docstring), and parameters
from the function's type hints and signature.
Docstring format:
First line: tool description.
Subsequent lines: ``param_name (required): description`` or
``param_name: description``.
"""
name = fn.__name__
docstring = fn.__doc__ or ""
description, param_info = _parse_docstring(docstring)
hints = get_type_hints(fn)
sig = inspect.signature(fn)
properties: dict[str, dict] = {}
required: list[str] = []
for param_name, param in sig.parameters.items():
if param_name in hints:
param_schema: dict[str, Any] = {
"type": python_type_to_json_schema(hints[param_name])
}
# Enrich with docstring info if present
if param_name in param_info:
doc_required, doc_desc = param_info[param_name]
if doc_desc:
param_schema["description"] = doc_desc
# Docstring (required) overrides signature default check
if doc_required:
required.append(param_name)
elif param.default is inspect.Parameter.empty:
required.append(param_name)
elif param.default is inspect.Parameter.empty:
required.append(param_name)
properties[param_name] = param_schema
parameters = {
"type": "object",
"properties": properties,
"required": required,
}
return Tool(
name=name,
description=description,
parameters=parameters,
handler=fn,
)
@tool
def fetch_webpage(url: str) -> str:
"""Fetch a webpage and return its text content.
url (required): The URL to fetch
"""
try:
jina_ai_url = "https://r.jina.ai/"
response = requests.get(jina_ai_url + url)
response.raise_for_status()
return response.text
except Exception as e:
md = MarkItDown()
return md.convert(url).text_content
FETCH_WEBPAGE_TOOL = fetch_webpage # @tool already makes it a Tool instance
# ---------------------------------------------------------------------------
# Shell tool (streamable)
# ---------------------------------------------------------------------------
import time
import uuid as _uuid
from .shell import get_shell_manager
def _shell_handler(
command: str = "",
session_id: str = "",
input_text: str = "",
) -> str:
"""Run shell commands interactively with persistent sessions.
command: The shell command to execute (omit when checking output or sending input)
session_id: Session ID to check output or send input to (omit to start new command)
input_text: Text to send to running session's stdin
How it works:
- Start new command: provide command, returns session_id immediately (non-blocking)
- Check output: provide session_id only, returns current output
- Send input: provide session_id + input_text
- Sessions auto-destroy after 15 min idle
- Each session runs in its own temp folder (also cleaned up on timeout)
- Environment variables persist across calls in the same session
"""
manager = get_shell_manager()
existing_session = session_id and session_id in manager.sessions
# Send input to running session
if existing_session and input_text:
sent = manager.send_input(session_id, input_text)
if not sent:
return f"Error: Session '{session_id}' closed or not found"
time.sleep(0.3)
output = manager.poll_output(session_id)
running = manager.is_running(session_id)
status = "running" if running else f"exited (code {manager.sessions[session_id].returncode})"
if output:
return f"[{session_id}] {status}:\n{output}"
return f"[{session_id}] {status} (no new output)"
# Check output of existing session
if existing_session:
output = manager.get_output(session_id)
running = manager.is_running(session_id)
code = manager.sessions[session_id].returncode
status = "running" if running else f"exited with code {code}"
if output:
return f"[{session_id}] {status}:\n{output}"
return f"[{session_id}] {status}"
# Need a command to start a new session
if not command.strip():
if session_id:
return f"Error: Session '{session_id}' not found or expired"
return "Error: Provide a command to start a new session, or a session_id to check status"
# Start new command
sid = session_id or str(_uuid.uuid4())[:8]
session = manager.start(sid, command)
# Wait a bit to capture initial output (fast commands finish here)
time.sleep(0.5)
initial = session.read_new_output()
running = session.is_running()
if not running:
# Command finished quickly
code = session.process.returncode
final = session.read_new_output()
output = (initial + final).strip()
if output:
return f"[{sid}] exited with code {code}:\n{output}"
return f"[{sid}] exited with code {code}"
# Command still running — return status so model can check later
if initial:
return f"[{sid}] running (PID {session.pid}):\n{initial}\n\nCall again with session_id=\"{sid}\" to check output."
return f"[{sid}] running (PID {session.pid})\n\nCall again with session_id=\"{sid}\" to check output."
SHELL_TOOL = Tool(
name="shell",
description="Run shell commands with persistent sessions. Start command -> get session_id. Call with session_id to check output or send input. Sessions auto-destroy after 15 min idle. Each session has its own temp folder.",
parameters={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute (omit when checking output or sending input)",
},
"session_id": {
"type": "string",
"description": "Session ID to check output or send input (omit to start new command)",
},
"input_text": {
"type": "string",
"description": "Text to send to running session's stdin",
},
},
"required": [],
},
handler=_shell_handler,
streamable=False,
)
# Shared cache for read_tool_response - agent writes, tool reads
_TOOL_RESULTS_CACHE: dict[str, str] = {}
def _read_tool_handler(tool_call_id: str, start_line: int, num_lines: int = 50) -> str:
"""Read more lines from a truncated tool response.
tool_call_id (required): The tool_call_id from the truncated response
start_line (required): Line number to start reading from
num_lines: Number of lines to read (default 50)
"""
full = _TOOL_RESULTS_CACHE.get(tool_call_id)
if full is None:
return f"Error: No result found for tool_call_id '{tool_call_id}'"
lines = full.split("\n")
total = len(lines)
if start_line >= total:
return f"Error: start_line {start_line} >= total lines {total}"
end = min(start_line + num_lines, total)
chunk = "\n".join(lines[start_line:end])
remaining = total - end
header = f"Lines {start_line}-{end} of {total}"
if remaining > 0:
header += f" ({remaining} lines remaining)"
return f"{header}\n\n{chunk}"
READ_TOOL = Tool(
name="read_tool_response",
description="Read more lines from a truncated tool response. Use when a previous tool output was truncated.",
parameters={
"type": "object",
"properties": {
"tool_call_id": {
"type": "string",
"description": "The tool_call_id from the truncated response",
},
"start_line": {
"type": "integer",
"description": "Line number to start reading from (0-indexed)",
},
"num_lines": {
"type": "integer",
"description": "Number of lines to read (default 50)",
},
},
"required": ["tool_call_id", "start_line"],
},
handler=_read_tool_handler,
)
FINAL_MESSAGE_TOOL = Tool(
name="final_message",
description=(
"Signal that you have completed your response and want "
"to end the conversation. Call this ONLY when you are "
"truly done. Until you call this tool, the conversation "
"will continue. Means you will multiple times answer the"
"same question or can get stuck in loops if you never call it."
),
parameters={"type": "object", "properties": {}, "required": []},
handler=lambda: "",
)
|