"""Coordinate execution agents and batch their results for the interaction agent.""" from __future__ import annotations import asyncio import uuid from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional from .runtime import ExecutionAgentRuntime, ExecutionResult from ...logging_config import logger @dataclass class PendingExecution: """Track a pending execution request.""" request_id: str agent_name: str instructions: str batch_id: str created_at: datetime = field(default_factory=datetime.now) @dataclass class _BatchState: """Collect results for a single interaction-agent turn.""" batch_id: str created_at: datetime = field(default_factory=datetime.now) pending: int = 0 results: List[ExecutionResult] = field(default_factory=list) class ExecutionBatchManager: """Run execution agents and deliver their combined outcome.""" # Initialize batch manager with timeout and coordination state for execution agents def __init__(self, timeout_seconds: int = 90) -> None: self.timeout_seconds = timeout_seconds self._pending: Dict[str, PendingExecution] = {} self._batch_lock = asyncio.Lock() self._batch_state: Optional[_BatchState] = None # Run execution agent with timeout handling and batch coordination for interaction agent async def execute_agent( self, agent_name: str, instructions: str, request_id: Optional[str] = None, ) -> ExecutionResult: """Execute an agent asynchronously and buffer the result for batch dispatch.""" if not request_id: request_id = str(uuid.uuid4()) batch_id = await self._register_pending_execution(agent_name, instructions, request_id) try: logger.info(f"[{agent_name}] Execution started") runtime = ExecutionAgentRuntime(agent_name=agent_name) result = await asyncio.wait_for( runtime.execute(instructions), timeout=self.timeout_seconds, ) status = "SUCCESS" if result.success else "FAILED" logger.info(f"[{agent_name}] Execution finished: {status}") except asyncio.TimeoutError: logger.error(f"[{agent_name}] Execution timed out after {self.timeout_seconds}s") result = ExecutionResult( agent_name=agent_name, success=False, response=f"Execution timed out after {self.timeout_seconds} seconds", error="Timeout", ) except Exception as exc: # pragma: no cover - defensive logger.exception(f"[{agent_name}] Execution failed unexpectedly") result = ExecutionResult( agent_name=agent_name, success=False, response=f"Execution failed: {exc}", error=str(exc), ) finally: self._pending.pop(request_id, None) await self._complete_execution(batch_id, result, agent_name) return result # Add execution request to current batch or create new batch if none exists async def _register_pending_execution( self, agent_name: str, instructions: str, request_id: str, ) -> str: """Attach a new execution to the active batch, opening one when required.""" async with self._batch_lock: if self._batch_state is None: batch_id = str(uuid.uuid4()) self._batch_state = _BatchState(batch_id=batch_id) else: batch_id = self._batch_state.batch_id self._batch_state.pending += 1 self._pending[request_id] = PendingExecution( request_id=request_id, agent_name=agent_name, instructions=instructions, batch_id=batch_id, ) return batch_id # Store execution result and send combined batch to interaction agent when complete async def _complete_execution( self, batch_id: str, result: ExecutionResult, agent_name: str, ) -> None: """Record the execution result and dispatch when the batch drains.""" dispatch_payload: Optional[str] = None async with self._batch_lock: state = self._batch_state if state is None or state.batch_id != batch_id: logger.warning(f"[{agent_name}] Dropping result for unknown batch") return state.results.append(result) state.pending -= 1 if state.pending == 0: dispatch_payload = self._format_batch_payload(state.results) agent_names = [entry.agent_name for entry in state.results] logger.info(f"Execution batch completed: {', '.join(agent_names)}") self._batch_state = None if dispatch_payload: await self._dispatch_to_interaction_agent(dispatch_payload) # Return list of currently pending execution requests for monitoring purposes def get_pending_executions(self) -> List[Dict[str, str]]: """Expose pending executions for observability.""" return [ { "request_id": pending.request_id, "agent_name": pending.agent_name, "batch_id": pending.batch_id, "created_at": pending.created_at.isoformat(), "elapsed_seconds": (datetime.now() - pending.created_at).total_seconds(), } for pending in self._pending.values() ] # Clean up all pending executions and batch state on shutdown async def shutdown(self) -> None: """Clear pending bookkeeping (no background work remains).""" self._pending.clear() async with self._batch_lock: self._batch_state = None # Format multiple execution results into single message for interaction agent def _format_batch_payload(self, results: List[ExecutionResult]) -> str: """Render execution results into the interaction-agent format.""" entries: List[str] = [] for result in results: status = "SUCCESS" if result.success else "FAILED" response_text = (result.response or "(no response provided)").strip() entries.append(f"[{status}] {result.agent_name}: {response_text}") return "\n".join(entries) # Forward combined execution results to interaction agent for user response generation async def _dispatch_to_interaction_agent(self, payload: str) -> None: """Send the aggregated execution summary to the interaction agent.""" from ..interaction_agent.runtime import InteractionAgentRuntime runtime = InteractionAgentRuntime() try: loop = asyncio.get_running_loop() except RuntimeError: asyncio.run(runtime.handle_agent_message(payload)) return loop.create_task(runtime.handle_agent_message(payload))