"""Tool definitions for interaction agent.""" import asyncio import json from dataclasses import dataclass from typing import Any, Optional from ...logging_config import logger from ...services.conversation import get_conversation_log from ...services.execution import get_agent_roster, get_execution_agent_logs from ..execution_agent.batch_manager import ExecutionBatchManager @dataclass class ToolResult: """Standardized payload returned by interaction-agent tools.""" success: bool payload: Any = None user_message: Optional[str] = None recorded_reply: bool = False # Tool schemas for OpenRouter TOOL_SCHEMAS = [ { "type": "function", "function": { "name": "send_message_to_agent", "description": "Deliver instructions to a specific execution agent. Creates a new agent if the name doesn't exist in the roster, or reuses an existing one.", "parameters": { "type": "object", "properties": { "agent_name": { "type": "string", "description": "Human-readable agent name describing its purpose (e.g., 'Vercel Job Offer', 'Email to Sharanjeet'). This name will be used to identify and potentially reuse the agent." }, "instructions": {"type": "string", "description": "Instructions for the agent to execute."}, }, "required": ["agent_name", "instructions"], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "send_message_to_user", "description": "Deliver a natural-language response directly to the user. Use this for updates, confirmations, or any assistant response the user should see immediately.", "parameters": { "type": "object", "properties": { "message": { "type": "string", "description": "Plain-text message that will be shown to the user and recorded in the conversation log.", }, }, "required": ["message"], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "send_draft", "description": "Record an email draft so the user can review the exact text.", "parameters": { "type": "object", "properties": { "to": { "type": "string", "description": "Recipient email for the draft.", }, "subject": { "type": "string", "description": "Email subject for the draft.", }, "body": { "type": "string", "description": "Email body content (plain text).", }, }, "required": ["to", "subject", "body"], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "wait", "description": "Wait silently when a message is already in conversation history to avoid duplicating responses. Adds a log entry that is not visible to the user.", "parameters": { "type": "object", "properties": { "reason": { "type": "string", "description": "Brief explanation of why waiting (e.g., 'Message already sent', 'Draft already created').", }, }, "required": ["reason"], "additionalProperties": False, }, }, }, ] _EXECUTION_BATCH_MANAGER = ExecutionBatchManager() # Create or reuse execution agent and dispatch instructions asynchronously def send_message_to_agent(agent_name: str, instructions: str) -> ToolResult: """Send instructions to an execution agent.""" roster = get_agent_roster() roster.load() existing_agents = set(roster.get_agents()) is_new = agent_name not in existing_agents if is_new: roster.add_agent(agent_name) get_execution_agent_logs().record_request(agent_name, instructions) action = "Created" if is_new else "Reused" logger.info(f"{action} agent: {agent_name}") async def _execute_async() -> None: try: result = await _EXECUTION_BATCH_MANAGER.execute_agent(agent_name, instructions) status = "SUCCESS" if result.success else "FAILED" logger.info(f"Agent '{agent_name}' completed: {status}") except Exception as exc: # pragma: no cover - defensive logger.error(f"Agent '{agent_name}' failed: {str(exc)}") try: loop = asyncio.get_running_loop() except RuntimeError: logger.error("No running event loop available for async execution") return ToolResult(success=False, payload={"error": "No event loop available"}) loop.create_task(_execute_async()) return ToolResult( success=True, payload={ "status": "submitted", "agent_name": agent_name, "new_agent_created": is_new, }, ) # Send immediate message to user and record in conversation history def send_message_to_user(message: str) -> ToolResult: """Record a user-visible reply in the conversation log.""" log = get_conversation_log() log.record_reply(message) return ToolResult( success=True, payload={"status": "delivered"}, user_message=message, recorded_reply=True, ) # Format and record email draft for user review def send_draft( to: str, subject: str, body: str, ) -> ToolResult: """Record a draft update in the conversation log for the interaction agent.""" log = get_conversation_log() message = f"To: {to}\nSubject: {subject}\n\n{body}" log.record_reply(message) logger.info(f"Draft recorded for: {to}") return ToolResult( success=True, payload={ "status": "draft_recorded", "to": to, "subject": subject, }, recorded_reply=True, ) # Record silent wait state to avoid duplicate responses def wait(reason: str) -> ToolResult: """Wait silently and add a wait log entry that is not visible to the user.""" log = get_conversation_log() # Record a dedicated wait entry so the UI knows to ignore it log.record_wait(reason) return ToolResult( success=True, payload={ "status": "waiting", "reason": reason, }, recorded_reply=True, ) # Return predefined tool schemas for LLM function calling def get_tool_schemas(): """Return OpenAI-compatible tool schemas.""" return TOOL_SCHEMAS # Route tool calls to appropriate handlers with argument validation and error handling def handle_tool_call(name: str, arguments: Any) -> ToolResult: """Handle tool calls from interaction agent.""" try: if isinstance(arguments, str): args = json.loads(arguments) if arguments.strip() else {} elif isinstance(arguments, dict): args = arguments else: return ToolResult(success=False, payload={"error": "Invalid arguments format"}) if name == "send_message_to_agent": return send_message_to_agent(**args) if name == "send_message_to_user": return send_message_to_user(**args) if name == "send_draft": return send_draft(**args) if name == "wait": return wait(**args) logger.warning("unexpected tool", extra={"tool": name}) return ToolResult(success=False, payload={"error": f"Unknown tool: {name}"}) except json.JSONDecodeError: return ToolResult(success=False, payload={"error": "Invalid JSON"}) except TypeError as exc: return ToolResult(success=False, payload={"error": f"Missing required arguments: {exc}"}) except Exception as exc: # pragma: no cover - defensive logger.error("tool call failed", extra={"tool": name, "error": str(exc)}) return ToolResult(success=False, payload={"error": "Failed to execute"})