| import gradio as gr |
| import spaces |
| import torch |
| from pyannote.audio import Pipeline |
| import os |
| import json |
| import time |
| from typing import Dict, Any |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| print("🔄 Space restarting... Loading Pyannote Diarization") |
|
|
| class PyannoteService: |
| def __init__(self): |
| self.pipeline = None |
| self.model_loaded = False |
| |
| def load_model(self): |
| """Carrega o modelo Pyannote""" |
| if self.model_loaded: |
| return |
| |
| try: |
| logger.info("Carregando modelo Pyannote...") |
| |
| |
| hf_token = os.getenv("HUGGINGFACE_TOKEN") |
| if not hf_token: |
| logger.warning("Token do HuggingFace não encontrado") |
| |
| |
| self.pipeline = Pipeline.from_pretrained( |
| "pyannote/speaker-diarization-3.1", |
| use_auth_token=hf_token |
| ) |
| |
| |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| logger.info(f"Usando device: {device}") |
| self.pipeline = self.pipeline.to(device) |
| |
| self.model_loaded = True |
| logger.info("Modelo Pyannote carregado com sucesso!") |
| |
| except Exception as e: |
| logger.error(f"Erro ao carregar modelo: {str(e)}") |
| if "authentication" in str(e).lower() or "token" in str(e).lower(): |
| logger.error("Erro de autenticação. Verifique se o HUGGINGFACE_TOKEN está configurado corretamente.") |
| raise |
|
|
| @spaces.GPU(duration=120) |
| def diarize_audio(self, audio_path: str) -> Dict[str, Any]: |
| """Realiza diarization do áudio""" |
| start_time = time.time() |
| |
| try: |
| |
| if not self.model_loaded: |
| self.load_model() |
| |
| logger.info(f"Iniciando diarization: {audio_path}") |
| |
| |
| diarization = self.pipeline(audio_path) |
| |
| |
| segments = [] |
| speakers = set() |
| |
| for turn, _, speaker in diarization.itertracks(yield_label=True): |
| speakers.add(speaker) |
| segments.append({ |
| "start": round(turn.start, 2), |
| "end": round(turn.end, 2), |
| "duration": round(turn.end - turn.start, 2), |
| "speaker": speaker |
| }) |
| |
| processing_time = time.time() - start_time |
| |
| result = { |
| "status": "success", |
| "segments": segments, |
| "num_speakers": len(speakers), |
| "speakers": sorted(list(speakers)), |
| "total_duration": round(diarization.get_timeline().duration(), 2), |
| "processing_time": round(processing_time, 2), |
| "model": "pyannote/speaker-diarization-3.1" |
| } |
| |
| logger.info(f"Diarization concluído: {len(segments)} segmentos, {len(speakers)} speakers") |
| return result |
| |
| except Exception as e: |
| logger.error(f"Erro na diarization: {str(e)}") |
| return { |
| "status": "error", |
| "error": str(e), |
| "processing_time": time.time() - start_time |
| } |
|
|
| |
| service = PyannoteService() |
|
|
| def process_audio(audio_file): |
| """Processa arquivo de áudio para diarization""" |
| if not audio_file: |
| return "❌ Nenhum arquivo de áudio fornecido" |
| |
| try: |
| result = service.diarize_audio(audio_file) |
| |
| if result["status"] == "success": |
| |
| summary = f""" |
| ✅ **Diarization Concluído** |
| |
| 📊 **Resumo:** |
| - **Speakers detectados:** {result['num_speakers']} |
| - **Total de segmentos:** {len(result['segments'])} |
| - **Duração total:** {result['total_duration']}s |
| - **Tempo de processamento:** {result['processing_time']}s |
| |
| 👥 **Speakers:** {', '.join(result['speakers'])} |
| |
| 📋 **Segmentos:** |
| """ |
| |
| for i, segment in enumerate(result['segments'][:10]): |
| summary += f"\n{i+1}. {segment['start']}s - {segment['end']}s | {segment['speaker']} ({segment['duration']}s)" |
| |
| if len(result['segments']) > 10: |
| summary += f"\n... e mais {len(result['segments']) - 10} segmentos" |
| |
| return summary, json.dumps(result, indent=2) |
| else: |
| return f"❌ Erro: {result['error']}", json.dumps(result, indent=2) |
| |
| except Exception as e: |
| logger.error(f"Erro no processamento: {str(e)}") |
| return f"❌ Erro interno: {str(e)}", "" |
|
|
| def health_check(): |
| """Verifica se o serviço está funcionando""" |
| try: |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
| if torch.cuda.is_available(): |
| try: |
| gpu_info = f"GPU: {torch.cuda.get_device_name(0)}" |
| except: |
| gpu_info = "GPU: Available but name not accessible" |
| else: |
| gpu_info = "CPU" |
| |
| status = { |
| "status": "healthy", |
| "device": str(device), |
| "gpu_available": torch.cuda.is_available(), |
| "gpu_info": gpu_info, |
| "model_loaded": service.model_loaded |
| } |
| |
| return json.dumps(status, indent=2) |
| except Exception as e: |
| return json.dumps({"status": "error", "error": str(e)}, indent=2) |
|
|
| |
| with gr.Blocks(title="🎭 Pyannote Speaker Diarization", theme=gr.themes.Soft()) as interface: |
| gr.Markdown(""" |
| # 🎭 Pyannote Speaker Diarization |
| |
| Identifica e separa diferentes speakers em arquivos de áudio usando Pyannote 3.1 |
| |
| **Funcionalidades:** |
| - ✅ Detecção automática de speakers |
| - ✅ Segmentação temporal precisa |
| - ✅ Suporte a múltiplos formatos de áudio (WAV, MP3, FLAC, etc.) |
| - ✅ GPU dinâmica (quando disponível) |
| |
| **📁 Como usar:** Faça upload do seu arquivo de áudio abaixo e clique em "Realizar Diarization" |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| audio_input = gr.Audio( |
| type="filepath", |
| label="📄 Arquivo de Áudio" |
| ) |
| |
| |
| process_btn = gr.Button("🎭 Realizar Diarization", variant="primary", size="lg") |
| health_btn = gr.Button("🔍 Status do Serviço", variant="secondary") |
| |
| with gr.Column(scale=2): |
| |
| result_text = gr.Markdown(label="📊 Resultado", value="Aguardando áudio...") |
| result_json = gr.JSON(label="📋 Dados Completos") |
| |
| |
| process_btn.click( |
| fn=process_audio, |
| inputs=[audio_input], |
| outputs=[result_text, result_json] |
| ) |
| |
| health_btn.click( |
| fn=health_check, |
| outputs=[result_json] |
| ) |
| |
| |
| gr.Examples( |
| examples=[], |
| inputs=audio_input, |
| label="📁 Exemplos" |
| ) |
|
|
| if __name__ == "__main__": |
| interface.launch() |