File size: 4,532 Bytes
dedb5fe
 
4e316af
dedb5fe
054b7cc
ee7e520
dbcd819
4e316af
 
 
054b7cc
 
ee7e520
dedb5fe
7c08fe7
dbcd819
ee7e520
dedb5fe
054b7cc
dedb5fe
e67faa0
dedb5fe
 
 
 
 
4e316af
 
 
 
 
 
 
 
 
 
decb45c
e67faa0
 
054b7cc
 
 
 
dedb5fe
 
a5e767c
dedb5fe
dbcd819
4e316af
 
 
aab8473
ee7e520
4e316af
 
 
 
 
 
 
054b7cc
7c08fe7
dbcd819
ee7e520
7c08fe7
 
9705b58
dbcd819
 
ee7e520
4e316af
 
 
 
 
 
7c08fe7
4e316af
 
 
 
 
 
 
 
 
a18ad88
 
 
9705b58
 
 
e67faa0
9705b58
ee7e520
9705b58
dbcd819
ee7e520
a18ad88
9705b58
aab8473
ee7e520
a18ad88
 
ee7e520
a18ad88
dbcd819
dedb5fe
4e316af
054b7cc
4e316af
 
dbcd819
4e316af
9705b58
054b7cc
 
 
 
 
 
e67faa0
054b7cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
"""
Agent Session for Avurna Flow (Final Corrected Version with HTTP Session Management)
"""
import os
import json
import traceback
import aiohttp # 1. Import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, Request
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware

from livekit.rtc import Room
from livekit.agents import Agent, AgentSession
from livekit.agents.llm import LLM
from livekit.agents.stt.stream_adapter import StreamAdapter
from livekit.plugins.google import LLM as GoogleLLM
from livekit.plugins.groq import STT
from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
from livekit.plugins.silero import VAD

from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
from src.utils import validate_env_vars

# 2. Create a lifespan manager for the http session
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create a single, shared session for the entire application
    async with aiohttp.ClientSession() as session:
        app.state.http_session = session
        yield
    # The session is automatically closed when the app shuts down

app = FastAPI(lifespan=lifespan)
origins = ["*"]
app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

class JoinRoomRequest(BaseModel):
    room_name: str
    agent_token: str

class VoiceAssistant(Agent):
    def __init__(self):
        super().__init__(instructions=SYSTEM_PROMPT)

async def send_agent_state(room: Room, state: str):
    # This function is correct
    # ...
    pass # Omitted for brevity

class LLMStateWrapper(LLM):
    # This class is correct
    # ...
    pass # Omitted for brevity


# 3. Modify run_agent_session to accept the http_session
async def run_agent_session(room_name: str, agent_token: str, http_session: aiohttp.ClientSession):
    livekit_url = os.getenv("LIVEKIT_URL")
    room = Room()

    try:
        print(f"DEBUG: 1. Connecting to LiveKit room '{room_name}'...")
        await room.connect(livekit_url, agent_token)
        print("DEBUG: 2. Connection successful.")
        
        await send_agent_state(room, "listening")

        print("DEBUG: 3. Initializing plugins with shared http_session...")
        # 4. Pass the session to every plugin that makes an external HTTP call
        llm_wrapper = LLMStateWrapper(
            llm=GoogleLLM(model="gemini-1.5-flash", session=http_session), 
            room=room
        )
        vad = VAD.load(min_speech_duration=0.1)
        stt = StreamAdapter(
            stt=STT(model="whisper-large-v3-turbo", session=http_session), 
            vad=vad
        )
        tts = TTS(
            voice=VoiceByName(name="Tiktok Fashion Influencer"), 
            instant_mode=True, 
            session=http_session
        )
        print("DEBUG: 4. Plugins initialized.")

        print("DEBUG: 5. Creating AgentSession...")
        session = AgentSession(vad=vad, stt=stt, llm=llm_wrapper, tts=tts)
        print("DEBUG: 6. AgentSession created.")

        print("DEBUG: 7. Starting session...")
        await session.start(agent=VoiceAssistant(), room=room)
        
        print("DEBUG: 8. Session started. Generating initial greeting...")
        await send_agent_state(room, "speaking")
        await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
        
        print("DEBUG: 9. Initial greeting complete. Agent is now fully operational.")
    
    except Exception as e:
        print(f"FATAL ERROR in agent session: {e}")
        print(traceback.format_exc())
    finally:
        print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
        await room.disconnect()

# 5. Modify the endpoint to retrieve the session from the app state
@app.post("/join-room")
async def join_room(req: JoinRoomRequest, request: Request, background_tasks: BackgroundTasks):
    http_session = request.app.state.http_session
    print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
    background_tasks.add_task(run_agent_session, req.room_name, req.agent_token, http_session)
    return {"status": "agent_triggered"}

@app.get("/")
async def root():
    return {"status": "avurna_agent_server_online"}

if __name__ == "__main__":
    validate_env_vars(["HUME_API_KEY", "LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "GROQ_API_KEY", "GOOGLE_API_KEY"])
    uvicorn.run(app, host="0.0.0.0", port=7860)