WebEssentz's picture
Realtime Flow
4e316af
raw
history blame
4.53 kB
#!/usr/bin/env python3
"""
Agent Session for Avurna Flow (Final Corrected Version with HTTP Session Management)
"""
import os
import json
import traceback
import aiohttp # 1. Import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, Request
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from livekit.rtc import Room
from livekit.agents import Agent, AgentSession
from livekit.agents.llm import LLM
from livekit.agents.stt.stream_adapter import StreamAdapter
from livekit.plugins.google import LLM as GoogleLLM
from livekit.plugins.groq import STT
from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
from livekit.plugins.silero import VAD
from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
from src.utils import validate_env_vars
# 2. Create a lifespan manager for the http session
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create a single, shared session for the entire application
async with aiohttp.ClientSession() as session:
app.state.http_session = session
yield
# The session is automatically closed when the app shuts down
app = FastAPI(lifespan=lifespan)
origins = ["*"]
app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
class JoinRoomRequest(BaseModel):
room_name: str
agent_token: str
class VoiceAssistant(Agent):
def __init__(self):
super().__init__(instructions=SYSTEM_PROMPT)
async def send_agent_state(room: Room, state: str):
# This function is correct
# ...
pass # Omitted for brevity
class LLMStateWrapper(LLM):
# This class is correct
# ...
pass # Omitted for brevity
# 3. Modify run_agent_session to accept the http_session
async def run_agent_session(room_name: str, agent_token: str, http_session: aiohttp.ClientSession):
livekit_url = os.getenv("LIVEKIT_URL")
room = Room()
try:
print(f"DEBUG: 1. Connecting to LiveKit room '{room_name}'...")
await room.connect(livekit_url, agent_token)
print("DEBUG: 2. Connection successful.")
await send_agent_state(room, "listening")
print("DEBUG: 3. Initializing plugins with shared http_session...")
# 4. Pass the session to every plugin that makes an external HTTP call
llm_wrapper = LLMStateWrapper(
llm=GoogleLLM(model="gemini-1.5-flash", session=http_session),
room=room
)
vad = VAD.load(min_speech_duration=0.1)
stt = StreamAdapter(
stt=STT(model="whisper-large-v3-turbo", session=http_session),
vad=vad
)
tts = TTS(
voice=VoiceByName(name="Tiktok Fashion Influencer"),
instant_mode=True,
session=http_session
)
print("DEBUG: 4. Plugins initialized.")
print("DEBUG: 5. Creating AgentSession...")
session = AgentSession(vad=vad, stt=stt, llm=llm_wrapper, tts=tts)
print("DEBUG: 6. AgentSession created.")
print("DEBUG: 7. Starting session...")
await session.start(agent=VoiceAssistant(), room=room)
print("DEBUG: 8. Session started. Generating initial greeting...")
await send_agent_state(room, "speaking")
await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
print("DEBUG: 9. Initial greeting complete. Agent is now fully operational.")
except Exception as e:
print(f"FATAL ERROR in agent session: {e}")
print(traceback.format_exc())
finally:
print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
await room.disconnect()
# 5. Modify the endpoint to retrieve the session from the app state
@app.post("/join-room")
async def join_room(req: JoinRoomRequest, request: Request, background_tasks: BackgroundTasks):
http_session = request.app.state.http_session
print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
background_tasks.add_task(run_agent_session, req.room_name, req.agent_token, http_session)
return {"status": "agent_triggered"}
@app.get("/")
async def root():
return {"status": "avurna_agent_server_online"}
if __name__ == "__main__":
validate_env_vars(["HUME_API_KEY", "LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "GROQ_API_KEY", "GOOGLE_API_KEY"])
uvicorn.run(app, host="0.0.0.0", port=7860)