import re import sys from typing import Dict, Any # Support both standalone (core.*) and OpenManus (app.utils.*) deployments try: from app.utils.signal_registry import signal_registry # OpenManus full stack from app.logger import logger except (ImportError, AttributeError): from core.signal_registry import signal_registry # standalone API from app.logger import logger # logger stub injected by signalmesh_api.py class AgentifiedToolParser: """ Parses natural language output from agents to detect and execute tool-less actions (e.g., broadcasting to the mesh) passively. """ ACTION_PATTERN = re.compile(r'\s*(.*?)\s*', re.DOTALL) @classmethod def parse_and_execute(cls, text: str) -> bool: """ Scans the text for tags and performs the corresponding action. Returns True if an action was executed. """ executed = False matches = cls.ACTION_PATTERN.finditer(text) for match in matches: action_type = match.group(1) topic = match.group(2) content = match.group(3).strip() if action_type == "broadcast": if not topic: topic = "general" logger.info(f"Agentified Tool: Passively executing broadcast to '{topic}'") signal_registry.broadcast( name=topic, source_type="passive_agent_action", data=content ) executed = True else: logger.warning(f"Agentified Tool: Unknown action type '{action_type}'") return executed