import re
import sys
from typing import Dict, Any
# Support both standalone (core.*) and OpenManus (app.utils.*) deployments
try:
from app.utils.signal_registry import signal_registry # OpenManus full stack
from app.logger import logger
except (ImportError, AttributeError):
from core.signal_registry import signal_registry # standalone API
from app.logger import logger # logger stub injected by signalmesh_api.py
class AgentifiedToolParser:
"""
Parses natural language output from agents to detect and execute
tool-less actions (e.g., broadcasting to the mesh) passively.
"""
ACTION_PATTERN = re.compile(r'\s*(.*?)\s*', re.DOTALL)
@classmethod
def parse_and_execute(cls, text: str) -> bool:
"""
Scans the text for tags and performs the corresponding action.
Returns True if an action was executed.
"""
executed = False
matches = cls.ACTION_PATTERN.finditer(text)
for match in matches:
action_type = match.group(1)
topic = match.group(2)
content = match.group(3).strip()
if action_type == "broadcast":
if not topic:
topic = "general"
logger.info(f"Agentified Tool: Passively executing broadcast to '{topic}'")
signal_registry.broadcast(
name=topic,
source_type="passive_agent_action",
data=content
)
executed = True
else:
logger.warning(f"Agentified Tool: Unknown action type '{action_type}'")
return executed