LunarTech / src /runner_app.py
vishalkatheriya's picture
Upload 14 files
24773d4 verified
"""
ADK Runner setup — used by Streamlit to run the agent directly (no API).
"""
import asyncio
import logging
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from agent import root_agent
logger = logging.getLogger(__name__)
APP_NAME = "handbook_app"
session_service = InMemorySessionService()
runner = Runner(
agent=root_agent,
app_name=APP_NAME,
session_service=session_service,
)
def _text_from_content(content) -> str | None:
if content is None:
return None
try:
parts = getattr(content, "parts", None)
if not parts:
return None
for part in parts:
text = getattr(part, "text", None)
if text is not None and str(text).strip():
return str(text).strip()
except (AttributeError, TypeError, IndexError):
pass
return None
async def _run_chat_async(user_id: str, session_id: str, message: str) -> str:
session = await session_service.get_session(
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if not session:
await session_service.create_session(
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
user_content = types.Content(
role="user",
parts=[types.Part(text=message)],
)
final_response = None
async for event in runner.run_async(
user_id=user_id,
session_id=session_id,
new_message=user_content,
):
try:
if getattr(event, "is_final_response", lambda: False)() and getattr(event, "content", None):
text = _text_from_content(event.content)
if text:
final_response = text
break
if getattr(event, "content", None):
text = _text_from_content(event.content)
if text:
final_response = text
except (AttributeError, TypeError, KeyError):
continue
return final_response or "No response from agent. Please try again."
def run_chat(message: str, user_id: str = "default_user") -> str:
"""Run the ADK agent with the given message. Sync wrapper for Streamlit."""
session_id = f"{user_id}_session"
try:
try:
return asyncio.run(_run_chat_async(user_id, session_id, message))
except RuntimeError as re:
if "event loop" in str(re).lower() or "already running" in str(re).lower():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
future = ex.submit(
asyncio.run, _run_chat_async(user_id, session_id, message)
)
return future.result(timeout=120)
raise
except Exception as e:
logger.exception("Chat failed: %s", e)
return f"Error: {e}"