SignalMesh / core /agentified_tool.py
Ig0tU
feat: reposition as dev-environment hub; sync all gap-closing components
78e921e
Raw
History Blame Contribute Delete
1.74 kB
import re
import sys
from typing import Dict, Any
# Support both standalone (core.*) and OpenManus (app.utils.*) deployments
try:
from app.utils.signal_registry import signal_registry # OpenManus full stack
from app.logger import logger
except (ImportError, AttributeError):
from core.signal_registry import signal_registry # standalone API
from app.logger import logger # logger stub injected by signalmesh_api.py
class AgentifiedToolParser:
"""
Parses natural language output from agents to detect and execute
tool-less actions (e.g., broadcasting to the mesh) passively.
"""
ACTION_PATTERN = re.compile(r'<execute\s+type="([^"]+)"(?:\s+topic="([^"]+)")?>\s*(.*?)\s*</execute>', re.DOTALL)
@classmethod
def parse_and_execute(cls, text: str) -> bool:
"""
Scans the text for <execute> tags and performs the corresponding action.
Returns True if an action was executed.
"""
executed = False
matches = cls.ACTION_PATTERN.finditer(text)
for match in matches:
action_type = match.group(1)
topic = match.group(2)
content = match.group(3).strip()
if action_type == "broadcast":
if not topic:
topic = "general"
logger.info(f"Agentified Tool: Passively executing broadcast to '{topic}'")
signal_registry.broadcast(
name=topic,
source_type="passive_agent_action",
data=content
)
executed = True
else:
logger.warning(f"Agentified Tool: Unknown action type '{action_type}'")
return executed