| |
| """ |
| 🚀 Servidor WebRTC Otimizado com GPU + vLLM |
| ============================================ |
| Servidor WebRTC real usando a configuração otimizada: |
| - Whisper na GPU |
| - vLLM com Qwen2-0.5B |
| - Latência target: <500ms |
| """ |
|
|
| import os |
| os.environ['HF_HOME'] = '/tmp/hf_cache' |
| os.environ['CUDA_VISIBLE_DEVICES'] = '0' |
|
|
| import asyncio |
| import json |
| import time |
| import torch |
| import numpy as np |
| import whisper |
| from aiohttp import web |
| from aiortc import RTCPeerConnection, RTCSessionDescription |
| from aiortc.contrib.media import MediaPlayer, MediaRecorder |
| from vllm import LLM, SamplingParams |
| import tempfile |
| import soundfile as sf |
| from gtts import gTTS |
| import librosa |
| import logging |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| SAMPLE_RATE = 16000 |
| pc_dict = {} |
|
|
| class OptimizedWebRTCServer: |
| def __init__(self): |
| """Inicializa servidor com GPU + vLLM""" |
| logger.info("🚀 Inicializando Servidor WebRTC Otimizado") |
| |
| |
| if not torch.cuda.is_available(): |
| raise RuntimeError("GPU não disponível!") |
| |
| logger.info(f"✅ GPU: {torch.cuda.get_device_name(0)}") |
| |
| |
| logger.info("📦 Carregando Whisper (GPU)...") |
| self.whisper_model = whisper.load_model("base", device="cuda") |
| |
| |
| logger.info("📦 Carregando vLLM + Qwen2-0.5B...") |
| self.llm = LLM( |
| model="Qwen/Qwen2-0.5B", |
| trust_remote_code=True, |
| dtype="float16", |
| gpu_memory_utilization=0.80, |
| max_model_len=512, |
| download_dir="/tmp/hf_cache", |
| disable_log_stats=True, |
| enforce_eager=False, |
| max_num_seqs=1 |
| ) |
| |
| self.sampling_params = SamplingParams( |
| max_tokens=20, |
| temperature=0.7, |
| top_p=0.9 |
| ) |
| |
| |
| logger.info("🔥 Warm-up do sistema...") |
| for i in range(3): |
| _ = self.llm.generate(["teste"], self.sampling_params) |
| |
| logger.info("✅ Servidor pronto!") |
| |
| |
| self.request_count = 0 |
| self.total_latency = 0 |
| self.latencies = [] |
| |
| async def process_audio(self, audio_data: np.ndarray) -> tuple: |
| """Processa áudio com pipeline otimizado""" |
| start_time = time.perf_counter() |
| |
| |
| whisper_start = time.perf_counter() |
| audio_30s = whisper.pad_or_trim(audio_data.astype(np.float32)) |
| mel = whisper.log_mel_spectrogram(audio_30s).cuda() |
| |
| with torch.no_grad(): |
| embeddings = self.whisper_model.encoder(mel.unsqueeze(0)) |
| |
| whisper_time = (time.perf_counter() - whisper_start) * 1000 |
| |
| |
| |
| |
| options = whisper.DecodingOptions(language="pt", fp16=True) |
| result = whisper.decode(self.whisper_model, mel, options) |
| transcription = result.text |
| |
| |
| llm_start = time.perf_counter() |
| prompt = f"Usuário disse: {transcription}\nAssistente responde brevemente:" |
| outputs = self.llm.generate([prompt], self.sampling_params) |
| response = outputs[0].outputs[0].text.strip() |
| llm_time = (time.perf_counter() - llm_start) * 1000 |
| |
| |
| tts_start = time.perf_counter() |
| tts = gTTS(text=response[:100], lang='pt', slow=False) |
| |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: |
| tts.save(f.name) |
| audio_response, sr = sf.read(f.name) |
| |
| if sr != SAMPLE_RATE: |
| audio_response = librosa.resample( |
| audio_response, |
| orig_sr=sr, |
| target_sr=SAMPLE_RATE |
| ) |
| |
| tts_time = (time.perf_counter() - tts_start) * 1000 |
| |
| |
| total_time = (time.perf_counter() - start_time) * 1000 |
| self.request_count += 1 |
| self.total_latency += total_time |
| self.latencies.append(total_time) |
| |
| logger.info(f"📊 Request #{self.request_count}:") |
| logger.info(f" • Whisper: {whisper_time:.0f}ms") |
| logger.info(f" • LLM: {llm_time:.0f}ms") |
| logger.info(f" • TTS: {tts_time:.0f}ms") |
| logger.info(f" • TOTAL: {total_time:.0f}ms") |
| logger.info(f" • Transcrição: {transcription}") |
| logger.info(f" • Resposta: {response[:50]}...") |
| |
| return audio_response, { |
| 'latency_ms': total_time, |
| 'whisper_ms': whisper_time, |
| 'llm_ms': llm_time, |
| 'tts_ms': tts_time, |
| 'transcription': transcription, |
| 'response': response |
| } |
| |
| def get_stats(self): |
| """Retorna estatísticas do servidor""" |
| if not self.latencies: |
| return {} |
| |
| return { |
| 'request_count': self.request_count, |
| 'avg_latency_ms': self.total_latency / self.request_count if self.request_count > 0 else 0, |
| 'min_latency_ms': min(self.latencies), |
| 'max_latency_ms': max(self.latencies), |
| 'last_latency_ms': self.latencies[-1] if self.latencies else 0 |
| } |
|
|
| |
| server = None |
|
|
| async def offer(request): |
| """Handle WebRTC offer""" |
| params = await request.json() |
| offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) |
| |
| pc = RTCPeerConnection() |
| pc_id = f"PeerConnection_{len(pc_dict)}" |
| pc_dict[pc_id] = pc |
| |
| @pc.on("datachannel") |
| def on_datachannel(channel): |
| logger.info(f"Data channel established: {channel.label}") |
| |
| @channel.on("message") |
| async def on_message(message): |
| if isinstance(message, bytes): |
| |
| audio_data = np.frombuffer(message, dtype=np.float32) |
| |
| |
| audio_response, metrics = await server.process_audio(audio_data) |
| |
| |
| channel.send(audio_response.astype(np.float32).tobytes()) |
| |
| |
| channel.send(json.dumps(metrics)) |
| |
| @pc.on("track") |
| async def on_track(track): |
| logger.info(f"Track received: {track.kind}") |
| |
| if track.kind == "audio": |
| |
| audio_buffer = [] |
| |
| while True: |
| try: |
| frame = await track.recv() |
| |
| audio_chunk = frame.to_ndarray() |
| audio_buffer.extend(audio_chunk.flatten()) |
| |
| |
| if len(audio_buffer) >= SAMPLE_RATE: |
| audio_data = np.array(audio_buffer[:SAMPLE_RATE]) |
| audio_buffer = audio_buffer[SAMPLE_RATE:] |
| |
| |
| audio_response, metrics = await server.process_audio(audio_data) |
| |
| |
| |
| logger.info(f"✅ Processado: {metrics['latency_ms']:.0f}ms") |
| |
| except Exception as e: |
| logger.error(f"Erro no track: {e}") |
| break |
| |
| await pc.setRemoteDescription(offer) |
| answer = await pc.createAnswer() |
| await pc.setLocalDescription(answer) |
| |
| return web.Response( |
| content_type="application/json", |
| text=json.dumps({ |
| "sdp": pc.localDescription.sdp, |
| "type": pc.localDescription.type |
| }) |
| ) |
|
|
| async def stats(request): |
| """Endpoint de estatísticas""" |
| return web.Response( |
| content_type="application/json", |
| text=json.dumps(server.get_stats()) |
| ) |
|
|
| async def health(request): |
| """Health check""" |
| return web.Response(text="OK") |
|
|
| async def test_audio(request): |
| """Endpoint de teste direto (sem WebRTC)""" |
| |
| query_text = request.query.get('text', None) |
| |
| if query_text: |
| test_text = query_text |
| else: |
| |
| import random |
| test_questions = [ |
| "Olá, como você está?", |
| "Qual é a capital do Brasil?", |
| "Quanto é dois mais dois?", |
| "Como está o tempo hoje?", |
| "Qual seu nome?", |
| "O que é Python?" |
| ] |
| test_text = random.choice(test_questions) |
| |
| |
| tts = gTTS(text=test_text, lang='pt', slow=False) |
| |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: |
| tts.save(f.name) |
| audio_data, sr = sf.read(f.name) |
| |
| if sr != SAMPLE_RATE: |
| audio_data = librosa.resample(audio_data, orig_sr=sr, target_sr=SAMPLE_RATE) |
| |
| |
| audio_response, metrics = await server.process_audio(audio_data) |
| metrics['question'] = test_text |
| |
| return web.Response( |
| content_type="application/json", |
| text=json.dumps(metrics) |
| ) |
|
|
| async def on_shutdown(app): |
| """Cleanup on shutdown""" |
| for pc in pc_dict.values(): |
| await pc.close() |
|
|
| def main(): |
| """Inicia servidor WebRTC""" |
| global server |
| |
| |
| server = OptimizedWebRTCServer() |
| |
| |
| app = web.Application() |
| app.router.add_post("/offer", offer) |
| app.router.add_get("/stats", stats) |
| app.router.add_get("/health", health) |
| app.router.add_get("/test", test_audio) |
| app.on_shutdown.append(on_shutdown) |
| |
| |
| app.router.add_get("/", lambda r: web.Response(text=""" |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>WebRTC GPU+vLLM Test</title> |
| </head> |
| <body> |
| <h1>🚀 Servidor WebRTC Otimizado</h1> |
| <h2>GPU + vLLM - Latência Target: <500ms</h2> |
| |
| <div id="status">Pronto para conectar...</div> |
| |
| <button onclick="testAudio()">Testar Pipeline</button> |
| <button onclick="getStats()">Ver Estatísticas</button> |
| |
| <div id="results"></div> |
| |
| <script> |
| async function testAudio() { |
| document.getElementById('status').innerHTML = 'Testando...'; |
| const response = await fetch('/test'); |
| const data = await response.json(); |
| document.getElementById('results').innerHTML = ` |
| <h3>Resultado:</h3> |
| <p>Latência Total: ${data.latency_ms.toFixed(0)}ms</p> |
| <p>Whisper: ${data.whisper_ms.toFixed(0)}ms</p> |
| <p>LLM: ${data.llm_ms.toFixed(0)}ms</p> |
| <p>TTS: ${data.tts_ms.toFixed(0)}ms</p> |
| <p>Transcrição: ${data.transcription}</p> |
| <p>Resposta: ${data.response}</p> |
| `; |
| } |
| |
| async function getStats() { |
| const response = await fetch('/stats'); |
| const data = await response.json(); |
| document.getElementById('results').innerHTML = ` |
| <h3>Estatísticas:</h3> |
| <p>Requests: ${data.request_count || 0}</p> |
| <p>Latência Média: ${(data.avg_latency_ms || 0).toFixed(0)}ms</p> |
| <p>Min: ${(data.min_latency_ms || 0).toFixed(0)}ms</p> |
| <p>Max: ${(data.max_latency_ms || 0).toFixed(0)}ms</p> |
| `; |
| } |
| </script> |
| </body> |
| </html> |
| """, content_type='text/html')) |
| |
| logger.info("="*70) |
| logger.info("🚀 SERVIDOR WEBRTC OTIMIZADO INICIADO") |
| logger.info("="*70) |
| logger.info("✅ GPU: Ativada") |
| logger.info("✅ vLLM: Qwen2-0.5B") |
| logger.info("✅ Target: <500ms") |
| logger.info("") |
| logger.info("📡 Endpoints:") |
| logger.info(" • http://localhost:8888/ - Interface Web") |
| logger.info(" • http://localhost:8888/offer - WebRTC Offer") |
| logger.info(" • http://localhost:8888/test - Teste direto") |
| logger.info(" • http://localhost:8888/stats - Estatísticas") |
| logger.info("="*70) |
| |
| web.run_app(app, host="0.0.0.0", port=8888) |
|
|
| if __name__ == "__main__": |
| main() |