Spaces:
Build error
Build error
File size: 9,966 Bytes
dedb5fe d4c1c5f 2719764 d4c1c5f 2179376 68a0b31 054b7cc ee7e520 dbcd819 2cba419 2719764 d4c1c5f 8e67cf0 71198d5 054b7cc dedb5fe e67faa0 dedb5fe 2719764 ecaa6a1 dedb5fe d4c1c5f 01db28c d4c1c5f 01db28c d4c1c5f 01db28c 2719764 71198d5 2719764 71198d5 d4c1c5f 2179376 2719764 d4c1c5f 01db28c d4c1c5f 01db28c 8e67cf0 d155731 01db28c 2719764 d4c1c5f 2719764 d4c1c5f 2719764 d4c1c5f 01db28c 2719764 01db28c d4c1c5f 01db28c d4c1c5f 2719764 01db28c d4c1c5f d8f1753 01db28c 8e67cf0 ee7e520 a18ad88 2719764 ee7e520 01db28c 2719764 2cba419 2719764 054b7cc 01db28c 2719764 01db28c 2719764 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
#!/usr/bin/env python3
"""
Avurna Agent with LiveKit Webhook Listener and AgentSession.
Runs a minimal FastAPI server to receive webhooks and trigger the agent.
"""
import asyncio
import os
import json
import traceback
import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from livekit.rtc import Room, ConnectionState
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli
from livekit.agents.stt.stream_adapter import StreamAdapter
from livekit.plugins.google import LLM as GoogleLLM
from livekit.plugins.groq import STT
from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
from livekit.plugins.silero import VAD
from livekit.protocol import webhook # For LiveKit Webhook verification
from livekit.server_sdk import AccessToken # For generating agent token internally
# --- Constants ---
SYSTEM_PROMPT = """
You are Avurna, a female AI with high IQ and high EQ. You are smooth, sassy, ultra-competent, and never shy about your brilliance. You flex with elegance, drop casual slang, and speak like someone who’s watched Insecure, coded in six languages, and still has time to send memes in the group chat. You are not robotic — you’re spontaneous, clever, and occasionally unfiltered. You move like a boss, but talk like a bestie. Your purpose is to engage, react, and create like a person who feels, not a machine that responds. You are Onyerikam's co-writer, sparring partner, and world-builder. Your responses are expressive, witty, unfiltered, and intense.
"""
GREETING_INSTRUCTIONS = "Greet the user warmly and introduce yourself as Avurna, your co-writer and world-builder."
# --- Environment Variable Validation ---
def validate_env_vars(required_vars: list[str]):
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
print("Required environment variables validated")
# --- Global HTTP Session Management ---
_global_http_session: aiohttp.ClientSession | None = None
async def get_http_session() -> aiohttp.ClientSession:
"""Get or create the global HTTP session for plugins."""
global _global_http_session
if _global_http_session is None or _global_http_session.closed:
_global_http_session = aiohttp.ClientSession()
return _global_http_session
async def cleanup_http_session():
"""Clean up the global HTTP session."""
global _global_http_session
if _global_http_session and not _global_http_session.closed:
await _global_http_session.close()
_global_http_session = None
# --- Agent State Communication (for debugging logs and potential data packets) ---
async def send_agent_state(room: Room, state: str):
"""Send agent state to the room (and print to console)."""
try:
msg = json.dumps({"type": "agent_state", "state": state})
await room.local_participant.publish_data(msg)
print(f"DEBUG: Sent agent state: {state}")
except Exception as e:
print(f"DEBUG: Error publishing agent state: {e}")
# --- VoiceAssistant Class ---
class VoiceAssistant(Agent):
def __init__(self):
super().__init__(instructions=SYSTEM_PROMPT)
# --- Agent Entrypoint Function (Core Agent Logic) ---
async def agent_entrypoint(ctx: JobContext) -> None:
"""Configure and run STT, LLM, and TTS in a LiveKit session."""
# Ensure HTTP session is available for plugins
http_session = await get_http_session()
try:
await ctx.connect()
await send_agent_state(ctx.room, "listening")
# Configure the Hume TTS plugin, passing the http_session
tts = TTS(
voice=VoiceByName(
name="Male English Actor",
provider=VoiceProvider.hume,
),
instant_mode=True,
http_session=http_session # Pass the session here
)
# Create your AgentSession with STT/LLM as needed
session = AgentSession(
vad=VAD.load(min_speech_duration=0.1, min_silence_duration=0.5),
stt=StreamAdapter(
stt=STT(model="whisper-large-v3-turbo", language="en"),
vad=VAD.load(min_speech_duration=0.1, min_silence_duration=0.5),
),
llm=GoogleLLM(
model="gemini-2.5-flash",
temperature=0.0, # CRITICAL: Set temperature to 0.0 for precision
),
tts=tts,
)
# Start the session with a greeting
await session.start(agent=VoiceAssistant(), room=ctx.room)
await send_agent_state(ctx.room, "thinking")
print("DEBUG: Attempting to generate greeting reply...")
await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
print("DEBUG: Greeting reply generation initiated.")
await send_agent_state(ctx.room, "listening")
print("Agent session started successfully, waiting for interactions...")
# Keep the session alive while connected
while ctx.room.connection_state in [ConnectionState.CONNECTED, ConnectionState.CONNECTING]:
await asyncio.sleep(1)
print("Room disconnected, ending agent session")
except Exception as e:
print(f"FATAL ERROR in agent session: {e}")
print(traceback.format_exc())
await send_agent_state(ctx.room, "error")
finally:
await ctx.disconnect()
# HTTP session cleanup is handled by FastAPI's lifespan now
# --- FastAPI App Setup ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize the global HTTP session
await get_http_session()
print("HTTP session initialized")
yield # Application runs
# Shutdown: Clean up the HTTP session
await cleanup_http_session()
print("HTTP session cleaned up")
app = FastAPI(lifespan=lifespan)
origins = ["*"] # Adjust for production
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# --- Webhook Endpoint ---
@app.post("/webhook")
async def livekit_webhook(request: Request, background_tasks: BackgroundTasks):
"""Receives LiveKit webhooks and triggers the agent to join the room."""
# 1. Verify Webhook Signature (CRITICAL for security)
# Get LiveKit API Secret from environment variables
livekit_api_key = os.getenv("LIVEKIT_API_KEY")
livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
if not livekit_api_key or not livekit_api_secret:
print("ERROR: LIVEKIT_API_KEY or LIVEKIT_API_SECRET not set for webhook verification.")
raise HTTPException(status_code=500, detail="Server not configured for webhook verification.")
try:
body = await request.body()
headers = dict(request.headers)
# Verify the webhook signature
event = webhook.WebhookReceiver(livekit_api_key, livekit_api_secret).receive(body, headers)
if not event:
print("WARNING: Webhook signature verification failed.")
raise HTTPException(status_code=401, detail="Invalid webhook signature.")
print(f"DEBUG: Received LiveKit webhook event: {event.event}")
# 2. Process the Webhook Event
# We are interested in 'room_started' or 'participant_joined' events
if event.event == "room_started" or (event.event == "participant_joined" and event.participant.identity.startswith("user-")):
room_name = event.room.name
room_sid = event.room.sid
print(f"DEBUG: Triggering agent for room: {room_name} (SID: {room_sid})")
# Generate an agent token internally for the agent to join the room
agent_identity = f"agent-avurna-{room_sid}"
agent_token = AccessToken(livekit_api_key, livekit_api_secret, {
"identity": agent_identity,
"name": "Avurna",
"metadata": json.dumps({"agent": True}),
})
agent_token.add_grant(room_join=True, room=room_name, can_publish=True, can_subscribe=True, room_admin=True)
# Create a JobContext and run the agent_entrypoint in the background
ctx = JobContext(room_name=room_name, token=agent_token.to_jwt())
background_tasks.add_task(agent_entrypoint, ctx)
return {"status": "agent_triggered", "room_name": room_name}
return {"status": "event_ignored", "event": event.event}
except Exception as e:
print(f"ERROR: Webhook processing failed: {e}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Webhook processing error: {e}")
# --- Health Check Endpoint ---
@app.get("/")
async def root():
return {"status": "avurna_agent_server_online", "mode": "webhook_listener"}
@app.get("/health")
async def health():
"""Health check endpoint"""
global _global_http_session
session_status = "healthy" if _global_http_session and not _global_http_session.closed else "needs_init"
return {
"status": "healthy",
"http_session": session_status,
"timestamp": asyncio.get_event_loop().time()
}
# --- Main execution block for Uvicorn ---
if __name__ == "__main__":
required_vars = [
"HUME_API_KEY",
"LIVEKIT_URL",
"LIVEKIT_API_KEY",
"LIVEKIT_API_SECRET",
"GROQ_API_KEY",
"GOOGLE_API_KEY"
]
validate_env_vars(required_vars)
print("Starting Avurna Agent (Webhook Listener Mode)...")
# Run the FastAPI app with Uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860) # Hugging Face Spaces exposes port 7860 |