""" app.py — Interface Gradio pour le Voice Agent LiveKit Démarre : 1. Le worker LiveKit agent (subprocess) 2. L'interface Gradio avec UI WebRTC """ import os import sys import json import subprocess import threading import gradio as gr import uvicorn from fastapi import FastAPI from fastapi.responses import HTMLResponse from pathlib import Path from dotenv import load_dotenv # Ajouter le dossier agent au path sys.path.insert(0, str(Path(__file__).parent / "agent")) load_dotenv() # ────────────────────────────────────────────── # Démarrage de l'agent en arrière-plan # ────────────────────────────────────────────── _agent_process: subprocess.Popen | None = None def start_agent_worker(): """Lance le worker LiveKit dans un sous-processus.""" global _agent_process agent_path = Path(__file__).parent / "agent" / "voice_agent.py" _agent_process = subprocess.Popen( [sys.executable, str(agent_path), "start"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) print(f"[Agent] Worker démarré — PID {_agent_process.pid}") for line in _agent_process.stdout: print(f"[Agent] {line}", end="") def launch_agent_thread(): t = threading.Thread(target=start_agent_worker, daemon=True) t.start() # ────────────────────────────────────────────── # Génération de token LiveKit # ────────────────────────────────────────────── def get_token(room_name: str = "") -> str: try: from token_server import get_connection_details details = get_connection_details(room_name=room_name or None) return json.dumps(details, indent=2) except Exception as e: return json.dumps({"success": False, "error": str(e)}) # ────────────────────────────────────────────── # HTML de l'interface LiveKit WebRTC # ────────────────────────────────────────────── def build_livekit_html() -> str: """Génère le HTML en injectant URL + token depuis le .env.""" try: from token_server import get_connection_details details = get_connection_details() injected_url = details["url"] injected_token = details["token"] auto_connect = "true" except Exception: injected_url = os.getenv("LIVEKIT_URL", "") injected_token = "" auto_connect = "false" return LIVEKIT_CLIENT_HTML_TEMPLATE.replace("__LIVEKIT_URL__", injected_url) \ .replace("__LIVEKIT_TOKEN__", injected_token) \ .replace("__AUTO_CONNECT__", auto_connect) LIVEKIT_CLIENT_HTML_TEMPLATE = """
LiveKit