#!/usr/bin/env python3 """ Agent Session for Avurna Flow (Final Corrected Version with HTTP Session Management) """ import os import json import traceback import aiohttp # 1. Import aiohttp from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks, Request from pydantic import BaseModel import uvicorn from fastapi.middleware.cors import CORSMiddleware from livekit.rtc import Room from livekit.agents import Agent, AgentSession from livekit.agents.llm import LLM from livekit.agents.stt.stream_adapter import StreamAdapter from livekit.plugins.google import LLM as GoogleLLM from livekit.plugins.groq import STT from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider from livekit.plugins.silero import VAD from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS from src.utils import validate_env_vars # 2. Create a lifespan manager for the http session @asynccontextmanager async def lifespan(app: FastAPI): # Create a single, shared session for the entire application async with aiohttp.ClientSession() as session: app.state.http_session = session yield # The session is automatically closed when the app shuts down app = FastAPI(lifespan=lifespan) origins = ["*"] app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) class JoinRoomRequest(BaseModel): room_name: str agent_token: str class VoiceAssistant(Agent): def __init__(self): super().__init__(instructions=SYSTEM_PROMPT) async def send_agent_state(room: Room, state: str): # This function is correct # ... pass # Omitted for brevity class LLMStateWrapper(LLM): # This class is correct # ... pass # Omitted for brevity # 3. Modify run_agent_session to accept the http_session async def run_agent_session(room_name: str, agent_token: str, http_session: aiohttp.ClientSession): livekit_url = os.getenv("LIVEKIT_URL") room = Room() try: print(f"DEBUG: 1. Connecting to LiveKit room '{room_name}'...") await room.connect(livekit_url, agent_token) print("DEBUG: 2. Connection successful.") await send_agent_state(room, "listening") print("DEBUG: 3. Initializing plugins with shared http_session...") # 4. Pass the session to every plugin that makes an external HTTP call llm_wrapper = LLMStateWrapper( llm=GoogleLLM(model="gemini-1.5-flash", session=http_session), room=room ) vad = VAD.load(min_speech_duration=0.1) stt = StreamAdapter( stt=STT(model="whisper-large-v3-turbo", session=http_session), vad=vad ) tts = TTS( voice=VoiceByName(name="Tiktok Fashion Influencer"), instant_mode=True, session=http_session ) print("DEBUG: 4. Plugins initialized.") print("DEBUG: 5. Creating AgentSession...") session = AgentSession(vad=vad, stt=stt, llm=llm_wrapper, tts=tts) print("DEBUG: 6. AgentSession created.") print("DEBUG: 7. Starting session...") await session.start(agent=VoiceAssistant(), room=room) print("DEBUG: 8. Session started. Generating initial greeting...") await send_agent_state(room, "speaking") await session.generate_reply(instructions=GREETING_INSTRUCTIONS) print("DEBUG: 9. Initial greeting complete. Agent is now fully operational.") except Exception as e: print(f"FATAL ERROR in agent session: {e}") print(traceback.format_exc()) finally: print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.") await room.disconnect() # 5. Modify the endpoint to retrieve the session from the app state @app.post("/join-room") async def join_room(req: JoinRoomRequest, request: Request, background_tasks: BackgroundTasks): http_session = request.app.state.http_session print(f"DEBUG: Received POST request to /join-room for: {req.room_name}") background_tasks.add_task(run_agent_session, req.room_name, req.agent_token, http_session) return {"status": "agent_triggered"} @app.get("/") async def root(): return {"status": "avurna_agent_server_online"} if __name__ == "__main__": validate_env_vars(["HUME_API_KEY", "LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "GROQ_API_KEY", "GOOGLE_API_KEY"]) uvicorn.run(app, host="0.0.0.0", port=7860)