Spaces:
Running
Running
| from markitdown import MarkItDown | |
| from typing import Any, Callable, Generator, get_type_hints | |
| import inspect | |
| import requests | |
| def python_type_to_json_schema(tp: type) -> str: | |
| """Map a Python type to a JSON Schema type string.""" | |
| mapping = { | |
| str: "string", | |
| int: "integer", | |
| float: "number", | |
| bool: "boolean", | |
| list: "array", | |
| dict: "object", | |
| } | |
| return mapping.get(tp, "string") | |
| class Tool: | |
| """A callable tool the agent can invoke.""" | |
| def __init__( | |
| self, | |
| name: str, | |
| description: str, | |
| parameters: dict, | |
| handler: Callable[..., str], | |
| streamable: bool = False, | |
| ) -> None: | |
| self.name = name | |
| self.description = description | |
| self.parameters = parameters | |
| self.handler = handler | |
| self.streamable = streamable | |
| def to_openai_spec(self) -> dict: | |
| return { | |
| "type": "function", | |
| "function": { | |
| "name": self.name, | |
| "description": self.description, | |
| "parameters": self.parameters, | |
| }, | |
| } | |
| def run(self, **kwargs: Any) -> str: | |
| return self.handler(**kwargs) | |
| def stream(self, **kwargs: Any) -> Generator[str, None, None]: | |
| """Yield partial results for streamable tools. | |
| Override in subclasses or use streamable=True with a generator handler. | |
| """ | |
| if self.streamable and callable(self.handler): | |
| result = self.handler(**kwargs) | |
| if isinstance(result, Generator): | |
| yield from result | |
| else: | |
| yield str(result) | |
| else: | |
| yield self.handler(**kwargs) | |
| def _parse_docstring(docstring: str) -> tuple[str, dict[str, tuple[bool, str]]]: | |
| """Parse a tool docstring into description and param metadata. | |
| Returns: | |
| (description, {param_name: (required, description)}) | |
| Expected format: | |
| First line: tool description. | |
| Subsequent lines: ``param_name (required): description`` or | |
| ``param_name: description``. | |
| """ | |
| lines = (docstring or "").strip().split("\n") | |
| description = lines[0].strip() | |
| param_info: dict[str, tuple[bool, str]] = {} | |
| for line in lines[1:]: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Match: param_name (required): description | |
| # or: param_name: description | |
| if ":" not in line: | |
| continue | |
| key, desc = line.split(":", 1) | |
| key = key.strip() | |
| desc = desc.strip() | |
| required = False | |
| if key.endswith("(required)"): | |
| required = True | |
| key = key[: -len("(required)")].strip() | |
| if key: | |
| param_info[key] = (required, desc) | |
| return description, param_info | |
| def tool(fn: Callable[..., str]) -> Tool: | |
| """Decorator that converts a function into a Tool instance. | |
| Extracts name, description (first line of docstring), and parameters | |
| from the function's type hints and signature. | |
| Docstring format: | |
| First line: tool description. | |
| Subsequent lines: ``param_name (required): description`` or | |
| ``param_name: description``. | |
| """ | |
| name = fn.__name__ | |
| docstring = fn.__doc__ or "" | |
| description, param_info = _parse_docstring(docstring) | |
| hints = get_type_hints(fn) | |
| sig = inspect.signature(fn) | |
| properties: dict[str, dict] = {} | |
| required: list[str] = [] | |
| for param_name, param in sig.parameters.items(): | |
| if param_name in hints: | |
| param_schema: dict[str, Any] = { | |
| "type": python_type_to_json_schema(hints[param_name]) | |
| } | |
| # Enrich with docstring info if present | |
| if param_name in param_info: | |
| doc_required, doc_desc = param_info[param_name] | |
| if doc_desc: | |
| param_schema["description"] = doc_desc | |
| # Docstring (required) overrides signature default check | |
| if doc_required: | |
| required.append(param_name) | |
| elif param.default is inspect.Parameter.empty: | |
| required.append(param_name) | |
| elif param.default is inspect.Parameter.empty: | |
| required.append(param_name) | |
| properties[param_name] = param_schema | |
| parameters = { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required, | |
| } | |
| return Tool( | |
| name=name, | |
| description=description, | |
| parameters=parameters, | |
| handler=fn, | |
| ) | |
| def fetch_webpage(url: str) -> str: | |
| """Fetch a webpage and return its text content. | |
| url (required): The URL to fetch | |
| """ | |
| try: | |
| jina_ai_url = "https://r.jina.ai/" | |
| response = requests.get(jina_ai_url + url) | |
| response.raise_for_status() | |
| return response.text | |
| except Exception as e: | |
| md = MarkItDown() | |
| return md.convert(url).text_content | |
| FETCH_WEBPAGE_TOOL = fetch_webpage # @tool already makes it a Tool instance | |
| # --------------------------------------------------------------------------- | |
| # Shell tool (streamable) | |
| # --------------------------------------------------------------------------- | |
| import time | |
| import uuid as _uuid | |
| from .shell import get_shell_manager | |
| def _shell_handler( | |
| command: str = "", | |
| session_id: str = "", | |
| input_text: str = "", | |
| ) -> str: | |
| """Run shell commands interactively with persistent sessions. | |
| command: The shell command to execute (omit when checking output or sending input) | |
| session_id: Session ID to check output or send input to (omit to start new command) | |
| input_text: Text to send to running session's stdin | |
| How it works: | |
| - Start new command: provide command, returns session_id immediately (non-blocking) | |
| - Check output: provide session_id only, returns current output | |
| - Send input: provide session_id + input_text | |
| - Sessions auto-destroy after 15 min idle | |
| - Each session runs in its own temp folder (also cleaned up on timeout) | |
| - Environment variables persist across calls in the same session | |
| """ | |
| manager = get_shell_manager() | |
| existing_session = session_id and session_id in manager.sessions | |
| # Send input to running session | |
| if existing_session and input_text: | |
| sent = manager.send_input(session_id, input_text) | |
| if not sent: | |
| return f"Error: Session '{session_id}' closed or not found" | |
| time.sleep(0.3) | |
| output = manager.poll_output(session_id) | |
| running = manager.is_running(session_id) | |
| status = "running" if running else f"exited (code {manager.sessions[session_id].returncode})" | |
| if output: | |
| return f"[{session_id}] {status}:\n{output}" | |
| return f"[{session_id}] {status} (no new output)" | |
| # Check output of existing session | |
| if existing_session: | |
| output = manager.get_output(session_id) | |
| running = manager.is_running(session_id) | |
| code = manager.sessions[session_id].returncode | |
| status = "running" if running else f"exited with code {code}" | |
| if output: | |
| return f"[{session_id}] {status}:\n{output}" | |
| return f"[{session_id}] {status}" | |
| # Need a command to start a new session | |
| if not command.strip(): | |
| if session_id: | |
| return f"Error: Session '{session_id}' not found or expired" | |
| return "Error: Provide a command to start a new session, or a session_id to check status" | |
| # Start new command | |
| sid = session_id or str(_uuid.uuid4())[:8] | |
| session = manager.start(sid, command) | |
| # Wait a bit to capture initial output (fast commands finish here) | |
| time.sleep(0.5) | |
| initial = session.read_new_output() | |
| running = session.is_running() | |
| if not running: | |
| # Command finished quickly | |
| code = session.process.returncode | |
| final = session.read_new_output() | |
| output = (initial + final).strip() | |
| if output: | |
| return f"[{sid}] exited with code {code}:\n{output}" | |
| return f"[{sid}] exited with code {code}" | |
| # Command still running — return status so model can check later | |
| if initial: | |
| return f"[{sid}] running (PID {session.pid}):\n{initial}\n\nCall again with session_id=\"{sid}\" to check output." | |
| return f"[{sid}] running (PID {session.pid})\n\nCall again with session_id=\"{sid}\" to check output." | |
| SHELL_TOOL = Tool( | |
| name="shell", | |
| description="Run shell commands with persistent sessions. Start command -> get session_id. Call with session_id to check output or send input. Sessions auto-destroy after 15 min idle. Each session has its own temp folder.", | |
| parameters={ | |
| "type": "object", | |
| "properties": { | |
| "command": { | |
| "type": "string", | |
| "description": "Shell command to execute (omit when checking output or sending input)", | |
| }, | |
| "session_id": { | |
| "type": "string", | |
| "description": "Session ID to check output or send input (omit to start new command)", | |
| }, | |
| "input_text": { | |
| "type": "string", | |
| "description": "Text to send to running session's stdin", | |
| }, | |
| }, | |
| "required": [], | |
| }, | |
| handler=_shell_handler, | |
| streamable=False, | |
| ) | |
| # Shared cache for read_tool_response - agent writes, tool reads | |
| _TOOL_RESULTS_CACHE: dict[str, str] = {} | |
| def _read_tool_handler(tool_call_id: str, start_line: int, num_lines: int = 50) -> str: | |
| """Read more lines from a truncated tool response. | |
| tool_call_id (required): The tool_call_id from the truncated response | |
| start_line (required): Line number to start reading from | |
| num_lines: Number of lines to read (default 50) | |
| """ | |
| full = _TOOL_RESULTS_CACHE.get(tool_call_id) | |
| if full is None: | |
| return f"Error: No result found for tool_call_id '{tool_call_id}'" | |
| lines = full.split("\n") | |
| total = len(lines) | |
| if start_line >= total: | |
| return f"Error: start_line {start_line} >= total lines {total}" | |
| end = min(start_line + num_lines, total) | |
| chunk = "\n".join(lines[start_line:end]) | |
| remaining = total - end | |
| header = f"Lines {start_line}-{end} of {total}" | |
| if remaining > 0: | |
| header += f" ({remaining} lines remaining)" | |
| return f"{header}\n\n{chunk}" | |
| READ_TOOL = Tool( | |
| name="read_tool_response", | |
| description="Read more lines from a truncated tool response. Use when a previous tool output was truncated.", | |
| parameters={ | |
| "type": "object", | |
| "properties": { | |
| "tool_call_id": { | |
| "type": "string", | |
| "description": "The tool_call_id from the truncated response", | |
| }, | |
| "start_line": { | |
| "type": "integer", | |
| "description": "Line number to start reading from (0-indexed)", | |
| }, | |
| "num_lines": { | |
| "type": "integer", | |
| "description": "Number of lines to read (default 50)", | |
| }, | |
| }, | |
| "required": ["tool_call_id", "start_line"], | |
| }, | |
| handler=_read_tool_handler, | |
| ) | |
| FINAL_MESSAGE_TOOL = Tool( | |
| name="final_message", | |
| description=( | |
| "Signal that you have completed your response and want " | |
| "to end the conversation. Call this ONLY when you are " | |
| "truly done. Until you call this tool, the conversation " | |
| "will continue. Means you will multiple times answer the" | |
| "same question or can get stuck in loops if you never call it." | |
| ), | |
| parameters={"type": "object", "properties": {}, "required": []}, | |
| handler=lambda: "", | |
| ) | |