| """Central registry for all hermes-agent tools. |
| |
| Each tool file calls ``registry.register()`` at module level to declare its |
| schema, handler, toolset membership, and availability check. ``model_tools.py`` |
| queries the registry instead of maintaining its own parallel data structures. |
| |
| Import chain (circular-import safe): |
| tools/registry.py (no imports from model_tools or tool files) |
| ^ |
| tools/*.py (import from tools.registry at module level) |
| ^ |
| model_tools.py (imports tools.registry + all tool modules) |
| ^ |
| run_agent.py, cli.py, batch_runner.py, etc. |
| """ |
|
|
| import json |
| import logging |
| from typing import Any, Callable, Dict, List, Optional, Set |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ToolEntry: |
| """Metadata for a single registered tool.""" |
|
|
| __slots__ = ( |
| "name", "toolset", "schema", "handler", "check_fn", |
| "requires_env", "is_async", "description", "emoji", |
| ) |
|
|
| def __init__(self, name, toolset, schema, handler, check_fn, |
| requires_env, is_async, description, emoji): |
| self.name = name |
| self.toolset = toolset |
| self.schema = schema |
| self.handler = handler |
| self.check_fn = check_fn |
| self.requires_env = requires_env |
| self.is_async = is_async |
| self.description = description |
| self.emoji = emoji |
|
|
|
|
| class ToolRegistry: |
| """Singleton registry that collects tool schemas + handlers from tool files.""" |
|
|
| def __init__(self): |
| self._tools: Dict[str, ToolEntry] = {} |
| self._toolset_checks: Dict[str, Callable] = {} |
|
|
| |
| |
| |
|
|
| def register( |
| self, |
| name: str, |
| toolset: str, |
| schema: dict, |
| handler: Callable, |
| check_fn: Callable = None, |
| requires_env: list = None, |
| is_async: bool = False, |
| description: str = "", |
| emoji: str = "", |
| ): |
| """Register a tool. Called at module-import time by each tool file.""" |
| self._tools[name] = ToolEntry( |
| name=name, |
| toolset=toolset, |
| schema=schema, |
| handler=handler, |
| check_fn=check_fn, |
| requires_env=requires_env or [], |
| is_async=is_async, |
| description=description or schema.get("description", ""), |
| emoji=emoji, |
| ) |
| if check_fn and toolset not in self._toolset_checks: |
| self._toolset_checks[toolset] = check_fn |
|
|
| |
| |
| |
|
|
| def get_definitions(self, tool_names: Set[str], quiet: bool = False) -> List[dict]: |
| """Return OpenAI-format tool schemas for the requested tool names. |
| |
| Only tools whose ``check_fn()`` returns True (or have no check_fn) |
| are included. |
| """ |
| result = [] |
| for name in sorted(tool_names): |
| entry = self._tools.get(name) |
| if not entry: |
| continue |
| if entry.check_fn: |
| try: |
| if not entry.check_fn(): |
| if not quiet: |
| logger.debug("Tool %s unavailable (check failed)", name) |
| continue |
| except Exception: |
| if not quiet: |
| logger.debug("Tool %s check raised; skipping", name) |
| continue |
| result.append({"type": "function", "function": entry.schema}) |
| return result |
|
|
| |
| |
| |
|
|
| def dispatch(self, name: str, args: dict, **kwargs) -> str: |
| """Execute a tool handler by name. |
| |
| * Async handlers are bridged automatically via ``_run_async()``. |
| * All exceptions are caught and returned as ``{"error": "..."}`` |
| for consistent error format. |
| """ |
| entry = self._tools.get(name) |
| if not entry: |
| return json.dumps({"error": f"Unknown tool: {name}"}) |
| try: |
| if entry.is_async: |
| from model_tools import _run_async |
| return _run_async(entry.handler(args, **kwargs)) |
| return entry.handler(args, **kwargs) |
| except Exception as e: |
| logger.exception("Tool %s dispatch error: %s", name, e) |
| return json.dumps({"error": f"Tool execution failed: {type(e).__name__}: {e}"}) |
|
|
| |
| |
| |
|
|
| def get_all_tool_names(self) -> List[str]: |
| """Return sorted list of all registered tool names.""" |
| return sorted(self._tools.keys()) |
|
|
| def get_toolset_for_tool(self, name: str) -> Optional[str]: |
| """Return the toolset a tool belongs to, or None.""" |
| entry = self._tools.get(name) |
| return entry.toolset if entry else None |
|
|
| def get_emoji(self, name: str, default: str = "⚡") -> str: |
| """Return the emoji for a tool, or *default* if unset.""" |
| entry = self._tools.get(name) |
| return (entry.emoji if entry and entry.emoji else default) |
|
|
| def get_tool_to_toolset_map(self) -> Dict[str, str]: |
| """Return ``{tool_name: toolset_name}`` for every registered tool.""" |
| return {name: e.toolset for name, e in self._tools.items()} |
|
|
| def is_toolset_available(self, toolset: str) -> bool: |
| """Check if a toolset's requirements are met. |
| |
| Returns False (rather than crashing) when the check function raises |
| an unexpected exception (e.g. network error, missing import, bad config). |
| """ |
| check = self._toolset_checks.get(toolset) |
| if not check: |
| return True |
| try: |
| return bool(check()) |
| except Exception: |
| logger.debug("Toolset %s check raised; marking unavailable", toolset) |
| return False |
|
|
| def check_toolset_requirements(self) -> Dict[str, bool]: |
| """Return ``{toolset: available_bool}`` for every toolset.""" |
| toolsets = set(e.toolset for e in self._tools.values()) |
| return {ts: self.is_toolset_available(ts) for ts in sorted(toolsets)} |
|
|
| def get_available_toolsets(self) -> Dict[str, dict]: |
| """Return toolset metadata for UI display.""" |
| toolsets: Dict[str, dict] = {} |
| for entry in self._tools.values(): |
| ts = entry.toolset |
| if ts not in toolsets: |
| toolsets[ts] = { |
| "available": self.is_toolset_available(ts), |
| "tools": [], |
| "description": "", |
| "requirements": [], |
| } |
| toolsets[ts]["tools"].append(entry.name) |
| if entry.requires_env: |
| for env in entry.requires_env: |
| if env not in toolsets[ts]["requirements"]: |
| toolsets[ts]["requirements"].append(env) |
| return toolsets |
|
|
| def get_toolset_requirements(self) -> Dict[str, dict]: |
| """Build a TOOLSET_REQUIREMENTS-compatible dict for backward compat.""" |
| result: Dict[str, dict] = {} |
| for entry in self._tools.values(): |
| ts = entry.toolset |
| if ts not in result: |
| result[ts] = { |
| "name": ts, |
| "env_vars": [], |
| "check_fn": self._toolset_checks.get(ts), |
| "setup_url": None, |
| "tools": [], |
| } |
| if entry.name not in result[ts]["tools"]: |
| result[ts]["tools"].append(entry.name) |
| for env in entry.requires_env: |
| if env not in result[ts]["env_vars"]: |
| result[ts]["env_vars"].append(env) |
| return result |
|
|
| def check_tool_availability(self, quiet: bool = False): |
| """Return (available_toolsets, unavailable_info) like the old function.""" |
| available = [] |
| unavailable = [] |
| seen = set() |
| for entry in self._tools.values(): |
| ts = entry.toolset |
| if ts in seen: |
| continue |
| seen.add(ts) |
| if self.is_toolset_available(ts): |
| available.append(ts) |
| else: |
| unavailable.append({ |
| "name": ts, |
| "env_vars": entry.requires_env, |
| "tools": [e.name for e in self._tools.values() if e.toolset == ts], |
| }) |
| return available, unavailable |
|
|
|
|
| |
| registry = ToolRegistry() |
|
|