Spaces:
Runtime error
Runtime error
File size: 2,144 Bytes
22df7bd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | import logging
from typing import Awaitable, Callable
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
InvalidParamsError,
Part,
Task,
TextPart,
UnsupportedOperationError,
)
from a2a.utils import completed_task, new_artifact
from a2a.utils.errors import ServerError
from .agent import A2AManus
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ManusExecutor(AgentExecutor):
"""Currency Conversion AgentExecutor Example."""
def __init__(self, agent_factory: Callable[[], Awaitable[A2AManus]]):
self.agent_factory = agent_factory
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
error = self._validate_request(context)
if error:
raise ServerError(error=InvalidParamsError())
query = context.get_user_input()
try:
self.agent = await self.agent_factory()
result = await self.agent.invoke(query, context.context_id)
print(f"Final Result ===> {result}")
except Exception as e:
print("Error invoking agent: %s", e)
raise ServerError(error=ValueError(f"Error invoking agent: {e}")) from e
parts = [
Part(
root=TextPart(
text=(
result["content"]
if result["content"]
else "failed to generate response"
)
),
)
]
event_queue.enqueue_event(
completed_task(
context.task_id,
context.context_id,
[new_artifact(parts, f"task_{context.task_id}")],
[context.message],
)
)
def _validate_request(self, context: RequestContext) -> bool:
return False
async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())
|