OpenManus-Gemini / protocol /a2a /app /agent_executor.py
Tsitsi19's picture
Upload folder using huggingface_hub
22df7bd verified
import logging
from typing import Awaitable, Callable
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
InvalidParamsError,
Part,
Task,
TextPart,
UnsupportedOperationError,
)
from a2a.utils import completed_task, new_artifact
from a2a.utils.errors import ServerError
from .agent import A2AManus
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ManusExecutor(AgentExecutor):
"""Currency Conversion AgentExecutor Example."""
def __init__(self, agent_factory: Callable[[], Awaitable[A2AManus]]):
self.agent_factory = agent_factory
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
error = self._validate_request(context)
if error:
raise ServerError(error=InvalidParamsError())
query = context.get_user_input()
try:
self.agent = await self.agent_factory()
result = await self.agent.invoke(query, context.context_id)
print(f"Final Result ===> {result}")
except Exception as e:
print("Error invoking agent: %s", e)
raise ServerError(error=ValueError(f"Error invoking agent: {e}")) from e
parts = [
Part(
root=TextPart(
text=(
result["content"]
if result["content"]
else "failed to generate response"
)
),
)
]
event_queue.enqueue_event(
completed_task(
context.task_id,
context.context_id,
[new_artifact(parts, f"task_{context.task_id}")],
[context.message],
)
)
def _validate_request(self, context: RequestContext) -> bool:
return False
async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())