#!/usr/bin/env python3 """ 🚀 Servidor WebRTC Otimizado com GPU + vLLM ============================================ Servidor WebRTC real usando a configuração otimizada: - Whisper na GPU - vLLM com Qwen2-0.5B - Latência target: <500ms """ import os os.environ['HF_HOME'] = '/tmp/hf_cache' os.environ['CUDA_VISIBLE_DEVICES'] = '0' import asyncio import json import time import torch import numpy as np import whisper from aiohttp import web from aiortc import RTCPeerConnection, RTCSessionDescription from aiortc.contrib.media import MediaPlayer, MediaRecorder from vllm import LLM, SamplingParams import tempfile import soundfile as sf from gtts import gTTS import librosa import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuração global SAMPLE_RATE = 16000 pc_dict = {} class OptimizedWebRTCServer: def __init__(self): """Inicializa servidor com GPU + vLLM""" logger.info("🚀 Inicializando Servidor WebRTC Otimizado") # Verificar GPU if not torch.cuda.is_available(): raise RuntimeError("GPU não disponÃvel!") logger.info(f"✅ GPU: {torch.cuda.get_device_name(0)}") # Whisper na GPU logger.info("📦 Carregando Whisper (GPU)...") self.whisper_model = whisper.load_model("base", device="cuda") # vLLM com Qwen2-0.5B logger.info("📦 Carregando vLLM + Qwen2-0.5B...") self.llm = LLM( model="Qwen/Qwen2-0.5B", trust_remote_code=True, dtype="float16", gpu_memory_utilization=0.80, max_model_len=512, download_dir="/tmp/hf_cache", disable_log_stats=True, enforce_eager=False, max_num_seqs=1 ) self.sampling_params = SamplingParams( max_tokens=20, temperature=0.7, top_p=0.9 ) # Warm-up logger.info("🔥 Warm-up do sistema...") for i in range(3): _ = self.llm.generate(["teste"], self.sampling_params) logger.info("✅ Servidor pronto!") # Métricas self.request_count = 0 self.total_latency = 0 self.latencies = [] async def process_audio(self, audio_data: np.ndarray) -> tuple: """Processa áudio com pipeline otimizado""" start_time = time.perf_counter() # 1. Whisper Encoder (GPU) whisper_start = time.perf_counter() audio_30s = whisper.pad_or_trim(audio_data.astype(np.float32)) mel = whisper.log_mel_spectrogram(audio_30s).cuda() with torch.no_grad(): embeddings = self.whisper_model.encoder(mel.unsqueeze(0)) whisper_time = (time.perf_counter() - whisper_start) * 1000 # 2. Speech Projector (simulado) # Na implementação real, aqui entraria o speech projector # Por enquanto, vamos usar transcrição para teste options = whisper.DecodingOptions(language="pt", fp16=True) result = whisper.decode(self.whisper_model, mel, options) transcription = result.text # 3. LLM com vLLM llm_start = time.perf_counter() prompt = f"Usuário disse: {transcription}\nAssistente responde brevemente:" outputs = self.llm.generate([prompt], self.sampling_params) response = outputs[0].outputs[0].text.strip() llm_time = (time.perf_counter() - llm_start) * 1000 # 4. TTS tts_start = time.perf_counter() tts = gTTS(text=response[:100], lang='pt', slow=False) with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: tts.save(f.name) audio_response, sr = sf.read(f.name) if sr != SAMPLE_RATE: audio_response = librosa.resample( audio_response, orig_sr=sr, target_sr=SAMPLE_RATE ) tts_time = (time.perf_counter() - tts_start) * 1000 # Métricas total_time = (time.perf_counter() - start_time) * 1000 self.request_count += 1 self.total_latency += total_time self.latencies.append(total_time) logger.info(f"📊 Request #{self.request_count}:") logger.info(f" • Whisper: {whisper_time:.0f}ms") logger.info(f" • LLM: {llm_time:.0f}ms") logger.info(f" • TTS: {tts_time:.0f}ms") logger.info(f" • TOTAL: {total_time:.0f}ms") logger.info(f" • Transcrição: {transcription}") logger.info(f" • Resposta: {response[:50]}...") return audio_response, { 'latency_ms': total_time, 'whisper_ms': whisper_time, 'llm_ms': llm_time, 'tts_ms': tts_time, 'transcription': transcription, 'response': response } def get_stats(self): """Retorna estatÃsticas do servidor""" if not self.latencies: return {} return { 'request_count': self.request_count, 'avg_latency_ms': self.total_latency / self.request_count if self.request_count > 0 else 0, 'min_latency_ms': min(self.latencies), 'max_latency_ms': max(self.latencies), 'last_latency_ms': self.latencies[-1] if self.latencies else 0 } # Instância global do servidor server = None async def offer(request): """Handle WebRTC offer""" params = await request.json() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) pc = RTCPeerConnection() pc_id = f"PeerConnection_{len(pc_dict)}" pc_dict[pc_id] = pc @pc.on("datachannel") def on_datachannel(channel): logger.info(f"Data channel established: {channel.label}") @channel.on("message") async def on_message(message): if isinstance(message, bytes): # Processar áudio recebido audio_data = np.frombuffer(message, dtype=np.float32) # Processar com pipeline otimizado audio_response, metrics = await server.process_audio(audio_data) # Enviar resposta channel.send(audio_response.astype(np.float32).tobytes()) # Enviar métricas como JSON channel.send(json.dumps(metrics)) @pc.on("track") async def on_track(track): logger.info(f"Track received: {track.kind}") if track.kind == "audio": # Buffer para acumular áudio audio_buffer = [] while True: try: frame = await track.recv() # Converter frame para numpy array audio_chunk = frame.to_ndarray() audio_buffer.extend(audio_chunk.flatten()) # Processar quando tiver 1 segundo de áudio if len(audio_buffer) >= SAMPLE_RATE: audio_data = np.array(audio_buffer[:SAMPLE_RATE]) audio_buffer = audio_buffer[SAMPLE_RATE:] # Processar audio_response, metrics = await server.process_audio(audio_data) # Aqui você enviaria de volta via WebRTC # Por simplicidade, apenas logamos logger.info(f"✅ Processado: {metrics['latency_ms']:.0f}ms") except Exception as e: logger.error(f"Erro no track: {e}") break await pc.setRemoteDescription(offer) answer = await pc.createAnswer() await pc.setLocalDescription(answer) return web.Response( content_type="application/json", text=json.dumps({ "sdp": pc.localDescription.sdp, "type": pc.localDescription.type }) ) async def stats(request): """Endpoint de estatÃsticas""" return web.Response( content_type="application/json", text=json.dumps(server.get_stats()) ) async def health(request): """Health check""" return web.Response(text="OK") async def test_audio(request): """Endpoint de teste direto (sem WebRTC)""" # Pegar texto da query ou usar padrão query_text = request.query.get('text', None) if query_text: test_text = query_text else: # Lista de perguntas para variar import random test_questions = [ "Olá, como você está?", "Qual é a capital do Brasil?", "Quanto é dois mais dois?", "Como está o tempo hoje?", "Qual seu nome?", "O que é Python?" ] test_text = random.choice(test_questions) # Criar áudio da pergunta tts = gTTS(text=test_text, lang='pt', slow=False) with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: tts.save(f.name) audio_data, sr = sf.read(f.name) if sr != SAMPLE_RATE: audio_data = librosa.resample(audio_data, orig_sr=sr, target_sr=SAMPLE_RATE) # Processar audio_response, metrics = await server.process_audio(audio_data) metrics['question'] = test_text # Adicionar pergunta original return web.Response( content_type="application/json", text=json.dumps(metrics) ) async def on_shutdown(app): """Cleanup on shutdown""" for pc in pc_dict.values(): await pc.close() def main(): """Inicia servidor WebRTC""" global server # Criar servidor otimizado server = OptimizedWebRTCServer() # Criar aplicação web app = web.Application() app.router.add_post("/offer", offer) app.router.add_get("/stats", stats) app.router.add_get("/health", health) app.router.add_get("/test", test_audio) app.on_shutdown.append(on_shutdown) # Página HTML de teste app.router.add_get("/", lambda r: web.Response(text="""