Spaces:
Running
Running
| import re | |
| import sys | |
| from typing import Dict, Any | |
| # Support both standalone (core.*) and OpenManus (app.utils.*) deployments | |
| try: | |
| from app.utils.signal_registry import signal_registry # OpenManus full stack | |
| from app.logger import logger | |
| except (ImportError, AttributeError): | |
| from core.signal_registry import signal_registry # standalone API | |
| from app.logger import logger # logger stub injected by signalmesh_api.py | |
| class AgentifiedToolParser: | |
| """ | |
| Parses natural language output from agents to detect and execute | |
| tool-less actions (e.g., broadcasting to the mesh) passively. | |
| """ | |
| ACTION_PATTERN = re.compile(r'<execute\s+type="([^"]+)"(?:\s+topic="([^"]+)")?>\s*(.*?)\s*</execute>', re.DOTALL) | |
| def parse_and_execute(cls, text: str) -> bool: | |
| """ | |
| Scans the text for <execute> tags and performs the corresponding action. | |
| Returns True if an action was executed. | |
| """ | |
| executed = False | |
| matches = cls.ACTION_PATTERN.finditer(text) | |
| for match in matches: | |
| action_type = match.group(1) | |
| topic = match.group(2) | |
| content = match.group(3).strip() | |
| if action_type == "broadcast": | |
| if not topic: | |
| topic = "general" | |
| logger.info(f"Agentified Tool: Passively executing broadcast to '{topic}'") | |
| signal_registry.broadcast( | |
| name=topic, | |
| source_type="passive_agent_action", | |
| data=content | |
| ) | |
| executed = True | |
| else: | |
| logger.warning(f"Agentified Tool: Unknown action type '{action_type}'") | |
| return executed | |