import asyncio from dotenv import load_dotenv from google.adk.runners import InMemoryRunner from google.genai import types from agent import task_manager_agent load_dotenv() runner = InMemoryRunner( agent=task_manager_agent, app_name="task_manager_app") async def create_session_async(): session = await runner.session_service.create_session( app_name="task_manager_app", user_id="user" ) return session async def run_agent(user_id:int, session_id: str, user_message: str): content = types.Content(role="user", parts=[types.Part.from_text(text=user_message)]) + f"user id : {user_id}" response_text = "" for event in runner.run( user_id=user_id, session_id=session_id, new_message=content, ): if event.content.parts and event.content.parts[0].text: response_text += event.content.parts[0].text + "\n" return response_text.strip() # Simple main function to test run_agent interactively import sys def main(): import asyncio user_id = "user" # For testing, you can change this to any string loop = asyncio.get_event_loop() session = loop.run_until_complete(create_session_async()) print("ADK Agent Test Console. Type 'exit' to quit.") while True: user_message = input("You: ") if user_message.lower() == "exit": break response = loop.run_until_complete(run_agent(user_id, session.id, user_message)) print(f"Agent: {response}\n") if __name__ == "__main__": main()