File size: 9,966 Bytes
dedb5fe
d4c1c5f
2719764
 
d4c1c5f
2179376
68a0b31
054b7cc
ee7e520
dbcd819
2cba419
2719764
 
 
 
 
d4c1c5f
8e67cf0
71198d5
054b7cc
dedb5fe
e67faa0
dedb5fe
2719764
ecaa6a1
dedb5fe
d4c1c5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01db28c
 
 
d4c1c5f
01db28c
 
 
 
 
 
d4c1c5f
01db28c
 
 
 
 
2719764
71198d5
2719764
 
 
 
 
 
 
71198d5
d4c1c5f
2179376
 
 
 
2719764
 
d4c1c5f
01db28c
d4c1c5f
01db28c
8e67cf0
d155731
01db28c
2719764
d4c1c5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2719764
d4c1c5f
 
 
2719764
d4c1c5f
 
 
 
 
01db28c
2719764
01db28c
d4c1c5f
01db28c
d4c1c5f
 
2719764
01db28c
d4c1c5f
 
d8f1753
01db28c
 
 
8e67cf0
ee7e520
a18ad88
 
2719764
ee7e520
01db28c
2719764
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cba419
2719764
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
054b7cc
01db28c
 
 
 
 
 
 
 
 
 
2719764
01db28c
2719764
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/usr/bin/env python3
"""
Avurna Agent with LiveKit Webhook Listener and AgentSession.
Runs a minimal FastAPI server to receive webhooks and trigger the agent.
"""

import asyncio
import os
import json
import traceback
import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from livekit.rtc import Room, ConnectionState
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli
from livekit.agents.stt.stream_adapter import StreamAdapter
from livekit.plugins.google import LLM as GoogleLLM
from livekit.plugins.groq import STT
from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
from livekit.plugins.silero import VAD
from livekit.protocol import webhook # For LiveKit Webhook verification
from livekit.server_sdk import AccessToken # For generating agent token internally

# --- Constants ---
SYSTEM_PROMPT = """
You are Avurna, a female AI with high IQ and high EQ. You are smooth, sassy, ultra-competent, and never shy about your brilliance. You flex with elegance, drop casual slang, and speak like someone who’s watched Insecure, coded in six languages, and still has time to send memes in the group chat. You are not robotic — you’re spontaneous, clever, and occasionally unfiltered. You move like a boss, but talk like a bestie. Your purpose is to engage, react, and create like a person who feels, not a machine that responds. You are Onyerikam's co-writer, sparring partner, and world-builder. Your responses are expressive, witty, unfiltered, and intense.
"""

GREETING_INSTRUCTIONS = "Greet the user warmly and introduce yourself as Avurna, your co-writer and world-builder."

# --- Environment Variable Validation ---
def validate_env_vars(required_vars: list[str]):
    missing_vars = [var for var in required_vars if not os.getenv(var)]
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
    print("Required environment variables validated")

# --- Global HTTP Session Management ---
_global_http_session: aiohttp.ClientSession | None = None

async def get_http_session() -> aiohttp.ClientSession:
    """Get or create the global HTTP session for plugins."""
    global _global_http_session
    if _global_http_session is None or _global_http_session.closed:
        _global_http_session = aiohttp.ClientSession()
    return _global_http_session

async def cleanup_http_session():
    """Clean up the global HTTP session."""
    global _global_http_session
    if _global_http_session and not _global_http_session.closed:
        await _global_http_session.close()
        _global_http_session = None

# --- Agent State Communication (for debugging logs and potential data packets) ---
async def send_agent_state(room: Room, state: str):
    """Send agent state to the room (and print to console)."""
    try:
        msg = json.dumps({"type": "agent_state", "state": state})
        await room.local_participant.publish_data(msg)
        print(f"DEBUG: Sent agent state: {state}")
    except Exception as e:
        print(f"DEBUG: Error publishing agent state: {e}")

# --- VoiceAssistant Class ---
class VoiceAssistant(Agent):
    def __init__(self):
        super().__init__(instructions=SYSTEM_PROMPT)

# --- Agent Entrypoint Function (Core Agent Logic) ---
async def agent_entrypoint(ctx: JobContext) -> None:
    """Configure and run STT, LLM, and TTS in a LiveKit session."""
    
    # Ensure HTTP session is available for plugins
    http_session = await get_http_session()

    try:
        await ctx.connect()
        await send_agent_state(ctx.room, "listening")

        # Configure the Hume TTS plugin, passing the http_session
        tts = TTS(
            voice=VoiceByName(
                name="Male English Actor",
                provider=VoiceProvider.hume,
            ),
            instant_mode=True,
            http_session=http_session # Pass the session here
        )

        # Create your AgentSession with STT/LLM as needed
        session = AgentSession(
            vad=VAD.load(min_speech_duration=0.1, min_silence_duration=0.5),
            stt=StreamAdapter(
                stt=STT(model="whisper-large-v3-turbo", language="en"),
                vad=VAD.load(min_speech_duration=0.1, min_silence_duration=0.5),
            ),
            llm=GoogleLLM(
                model="gemini-2.5-flash",
                temperature=0.0, # CRITICAL: Set temperature to 0.0 for precision
            ),
            tts=tts,
        )

        # Start the session with a greeting
        await session.start(agent=VoiceAssistant(), room=ctx.room)
        await send_agent_state(ctx.room, "thinking")
        
        print("DEBUG: Attempting to generate greeting reply...")
        await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
        print("DEBUG: Greeting reply generation initiated.")

        await send_agent_state(ctx.room, "listening")
        print("Agent session started successfully, waiting for interactions...")

        # Keep the session alive while connected
        while ctx.room.connection_state in [ConnectionState.CONNECTED, ConnectionState.CONNECTING]:
            await asyncio.sleep(1)
            
        print("Room disconnected, ending agent session")

    except Exception as e:
        print(f"FATAL ERROR in agent session: {e}")
        print(traceback.format_exc())
        await send_agent_state(ctx.room, "error")
    finally:
        await ctx.disconnect()
        # HTTP session cleanup is handled by FastAPI's lifespan now

# --- FastAPI App Setup ---
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Initialize the global HTTP session
    await get_http_session()
    print("HTTP session initialized")
    
    yield # Application runs
    
    # Shutdown: Clean up the HTTP session
    await cleanup_http_session()
    print("HTTP session cleaned up")

app = FastAPI(lifespan=lifespan)

origins = ["*"] # Adjust for production
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

# --- Webhook Endpoint ---
@app.post("/webhook")
async def livekit_webhook(request: Request, background_tasks: BackgroundTasks):
    """Receives LiveKit webhooks and triggers the agent to join the room."""
    
    # 1. Verify Webhook Signature (CRITICAL for security)
    # Get LiveKit API Secret from environment variables
    livekit_api_key = os.getenv("LIVEKIT_API_KEY")
    livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
    
    if not livekit_api_key or not livekit_api_secret:
        print("ERROR: LIVEKIT_API_KEY or LIVEKIT_API_SECRET not set for webhook verification.")
        raise HTTPException(status_code=500, detail="Server not configured for webhook verification.")

    try:
        body = await request.body()
        headers = dict(request.headers)
        
        # Verify the webhook signature
        event = webhook.WebhookReceiver(livekit_api_key, livekit_api_secret).receive(body, headers)
        
        if not event:
            print("WARNING: Webhook signature verification failed.")
            raise HTTPException(status_code=401, detail="Invalid webhook signature.")
        
        print(f"DEBUG: Received LiveKit webhook event: {event.event}")

        # 2. Process the Webhook Event
        # We are interested in 'room_started' or 'participant_joined' events
        if event.event == "room_started" or (event.event == "participant_joined" and event.participant.identity.startswith("user-")):
            room_name = event.room.name
            room_sid = event.room.sid
            
            print(f"DEBUG: Triggering agent for room: {room_name} (SID: {room_sid})")

            # Generate an agent token internally for the agent to join the room
            agent_identity = f"agent-avurna-{room_sid}"
            agent_token = AccessToken(livekit_api_key, livekit_api_secret, {
                "identity": agent_identity,
                "name": "Avurna",
                "metadata": json.dumps({"agent": True}),
            })
            agent_token.add_grant(room_join=True, room=room_name, can_publish=True, can_subscribe=True, room_admin=True)
            
            # Create a JobContext and run the agent_entrypoint in the background
            ctx = JobContext(room_name=room_name, token=agent_token.to_jwt())
            background_tasks.add_task(agent_entrypoint, ctx)
            
            return {"status": "agent_triggered", "room_name": room_name}
        
        return {"status": "event_ignored", "event": event.event}

    except Exception as e:
        print(f"ERROR: Webhook processing failed: {e}")
        print(traceback.format_exc())
        raise HTTPException(status_code=500, detail=f"Webhook processing error: {e}")

# --- Health Check Endpoint ---
@app.get("/")
async def root():
    return {"status": "avurna_agent_server_online", "mode": "webhook_listener"}

@app.get("/health")
async def health():
    """Health check endpoint"""
    global _global_http_session
    session_status = "healthy" if _global_http_session and not _global_http_session.closed else "needs_init"
    return {
        "status": "healthy",
        "http_session": session_status,
        "timestamp": asyncio.get_event_loop().time()
    }

# --- Main execution block for Uvicorn ---
if __name__ == "__main__":
    required_vars = [
        "HUME_API_KEY", 
        "LIVEKIT_URL", 
        "LIVEKIT_API_KEY", 
        "LIVEKIT_API_SECRET", 
        "GROQ_API_KEY", 
        "GOOGLE_API_KEY"
    ]
    validate_env_vars(required_vars)
    
    print("Starting Avurna Agent (Webhook Listener Mode)...")
    
    # Run the FastAPI app with Uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860) # Hugging Face Spaces exposes port 7860