from a2a.server.agent_execution import AgentExecutor, RequestContext from a2a.server.events import EventQueue from a2a.utils import new_agent_text_message # --8<-- [start:HelloWorldAgent] class HelloWorldAgent: """Hello World Agent.""" async def invoke(self) -> str: return 'Hello World' # --8<-- [end:HelloWorldAgent] # --8<-- [start:HelloWorldAgentExecutor_init] class HelloWorldAgentExecutor(AgentExecutor): """Test AgentProxy Implementation.""" def __init__(self): self.agent = HelloWorldAgent() # --8<-- [end:HelloWorldAgentExecutor_init] # --8<-- [start:HelloWorldAgentExecutor_execute] async def execute( self, context: RequestContext, event_queue: EventQueue, ) -> None: result = await self.agent.invoke() await event_queue.enqueue_event(new_agent_text_message(result)) # --8<-- [end:HelloWorldAgentExecutor_execute] # --8<-- [start:HelloWorldAgentExecutor_cancel] async def cancel( self, context: RequestContext, event_queue: EventQueue ) -> None: raise Exception('cancel not supported') # --8<-- [end:HelloWorldAgentExecutor_cancel]