Spaces:
Configuration error
Configuration error
File size: 7,126 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
"""Coordinate execution agents and batch their results for the interaction agent."""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
from .runtime import ExecutionAgentRuntime, ExecutionResult
from ...logging_config import logger
@dataclass
class PendingExecution:
"""Track a pending execution request."""
request_id: str
agent_name: str
instructions: str
batch_id: str
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class _BatchState:
"""Collect results for a single interaction-agent turn."""
batch_id: str
created_at: datetime = field(default_factory=datetime.now)
pending: int = 0
results: List[ExecutionResult] = field(default_factory=list)
class ExecutionBatchManager:
"""Run execution agents and deliver their combined outcome."""
# Initialize batch manager with timeout and coordination state for execution agents
def __init__(self, timeout_seconds: int = 90) -> None:
self.timeout_seconds = timeout_seconds
self._pending: Dict[str, PendingExecution] = {}
self._batch_lock = asyncio.Lock()
self._batch_state: Optional[_BatchState] = None
# Run execution agent with timeout handling and batch coordination for interaction agent
async def execute_agent(
self,
agent_name: str,
instructions: str,
request_id: Optional[str] = None,
) -> ExecutionResult:
"""Execute an agent asynchronously and buffer the result for batch dispatch."""
if not request_id:
request_id = str(uuid.uuid4())
batch_id = await self._register_pending_execution(agent_name, instructions, request_id)
try:
logger.info(f"[{agent_name}] Execution started")
runtime = ExecutionAgentRuntime(agent_name=agent_name)
result = await asyncio.wait_for(
runtime.execute(instructions),
timeout=self.timeout_seconds,
)
status = "SUCCESS" if result.success else "FAILED"
logger.info(f"[{agent_name}] Execution finished: {status}")
except asyncio.TimeoutError:
logger.error(f"[{agent_name}] Execution timed out after {self.timeout_seconds}s")
result = ExecutionResult(
agent_name=agent_name,
success=False,
response=f"Execution timed out after {self.timeout_seconds} seconds",
error="Timeout",
)
except Exception as exc: # pragma: no cover - defensive
logger.exception(f"[{agent_name}] Execution failed unexpectedly")
result = ExecutionResult(
agent_name=agent_name,
success=False,
response=f"Execution failed: {exc}",
error=str(exc),
)
finally:
self._pending.pop(request_id, None)
await self._complete_execution(batch_id, result, agent_name)
return result
# Add execution request to current batch or create new batch if none exists
async def _register_pending_execution(
self,
agent_name: str,
instructions: str,
request_id: str,
) -> str:
"""Attach a new execution to the active batch, opening one when required."""
async with self._batch_lock:
if self._batch_state is None:
batch_id = str(uuid.uuid4())
self._batch_state = _BatchState(batch_id=batch_id)
else:
batch_id = self._batch_state.batch_id
self._batch_state.pending += 1
self._pending[request_id] = PendingExecution(
request_id=request_id,
agent_name=agent_name,
instructions=instructions,
batch_id=batch_id,
)
return batch_id
# Store execution result and send combined batch to interaction agent when complete
async def _complete_execution(
self,
batch_id: str,
result: ExecutionResult,
agent_name: str,
) -> None:
"""Record the execution result and dispatch when the batch drains."""
dispatch_payload: Optional[str] = None
async with self._batch_lock:
state = self._batch_state
if state is None or state.batch_id != batch_id:
logger.warning(f"[{agent_name}] Dropping result for unknown batch")
return
state.results.append(result)
state.pending -= 1
if state.pending == 0:
dispatch_payload = self._format_batch_payload(state.results)
agent_names = [entry.agent_name for entry in state.results]
logger.info(f"Execution batch completed: {', '.join(agent_names)}")
self._batch_state = None
if dispatch_payload:
await self._dispatch_to_interaction_agent(dispatch_payload)
# Return list of currently pending execution requests for monitoring purposes
def get_pending_executions(self) -> List[Dict[str, str]]:
"""Expose pending executions for observability."""
return [
{
"request_id": pending.request_id,
"agent_name": pending.agent_name,
"batch_id": pending.batch_id,
"created_at": pending.created_at.isoformat(),
"elapsed_seconds": (datetime.now() - pending.created_at).total_seconds(),
}
for pending in self._pending.values()
]
# Clean up all pending executions and batch state on shutdown
async def shutdown(self) -> None:
"""Clear pending bookkeeping (no background work remains)."""
self._pending.clear()
async with self._batch_lock:
self._batch_state = None
# Format multiple execution results into single message for interaction agent
def _format_batch_payload(self, results: List[ExecutionResult]) -> str:
"""Render execution results into the interaction-agent format."""
entries: List[str] = []
for result in results:
status = "SUCCESS" if result.success else "FAILED"
response_text = (result.response or "(no response provided)").strip()
entries.append(f"[{status}] {result.agent_name}: {response_text}")
return "\n".join(entries)
# Forward combined execution results to interaction agent for user response generation
async def _dispatch_to_interaction_agent(self, payload: str) -> None:
"""Send the aggregated execution summary to the interaction agent."""
from ..interaction_agent.runtime import InteractionAgentRuntime
runtime = InteractionAgentRuntime()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
asyncio.run(runtime.handle_agent_message(payload))
return
loop.create_task(runtime.handle_agent_message(payload))
|