Spaces:
Runtime error
Runtime error
| import logging | |
| from typing import Awaitable, Callable | |
| from a2a.server.agent_execution import AgentExecutor, RequestContext | |
| from a2a.server.events import EventQueue | |
| from a2a.types import ( | |
| InvalidParamsError, | |
| Part, | |
| Task, | |
| TextPart, | |
| UnsupportedOperationError, | |
| ) | |
| from a2a.utils import completed_task, new_artifact | |
| from a2a.utils.errors import ServerError | |
| from .agent import A2AManus | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ManusExecutor(AgentExecutor): | |
| """Currency Conversion AgentExecutor Example.""" | |
| def __init__(self, agent_factory: Callable[[], Awaitable[A2AManus]]): | |
| self.agent_factory = agent_factory | |
| async def execute( | |
| self, | |
| context: RequestContext, | |
| event_queue: EventQueue, | |
| ) -> None: | |
| error = self._validate_request(context) | |
| if error: | |
| raise ServerError(error=InvalidParamsError()) | |
| query = context.get_user_input() | |
| try: | |
| self.agent = await self.agent_factory() | |
| result = await self.agent.invoke(query, context.context_id) | |
| print(f"Final Result ===> {result}") | |
| except Exception as e: | |
| print("Error invoking agent: %s", e) | |
| raise ServerError(error=ValueError(f"Error invoking agent: {e}")) from e | |
| parts = [ | |
| Part( | |
| root=TextPart( | |
| text=( | |
| result["content"] | |
| if result["content"] | |
| else "failed to generate response" | |
| ) | |
| ), | |
| ) | |
| ] | |
| event_queue.enqueue_event( | |
| completed_task( | |
| context.task_id, | |
| context.context_id, | |
| [new_artifact(parts, f"task_{context.task_id}")], | |
| [context.message], | |
| ) | |
| ) | |
| def _validate_request(self, context: RequestContext) -> bool: | |
| return False | |
| async def cancel( | |
| self, request: RequestContext, event_queue: EventQueue | |
| ) -> Task | None: | |
| raise ServerError(error=UnsupportedOperationError()) | |