marcosremar2's picture
Update to pyannote.audio 3.1 exact version
e846eba
import gradio as gr
import spaces
import torch
from pyannote.audio import Pipeline
import os
import json
import time
from typing import Dict, Any
import logging
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Force restart trigger - 2025-05-31 15:35
print("🔄 Space restarting... Loading Pyannote Diarization")
class PyannoteService:
def __init__(self):
self.pipeline = None
self.model_loaded = False
def load_model(self):
"""Carrega o modelo Pyannote"""
if self.model_loaded:
return
try:
logger.info("Carregando modelo Pyannote...")
# Verificar se há token do HuggingFace
hf_token = os.getenv("HUGGINGFACE_TOKEN")
if not hf_token:
logger.warning("Token do HuggingFace não encontrado")
# Carregar pipeline de diarization
self.pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=hf_token
)
# Configurar device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Usando device: {device}")
self.pipeline = self.pipeline.to(device)
self.model_loaded = True
logger.info("Modelo Pyannote carregado com sucesso!")
except Exception as e:
logger.error(f"Erro ao carregar modelo: {str(e)}")
if "authentication" in str(e).lower() or "token" in str(e).lower():
logger.error("Erro de autenticação. Verifique se o HUGGINGFACE_TOKEN está configurado corretamente.")
raise
@spaces.GPU(duration=120)
def diarize_audio(self, audio_path: str) -> Dict[str, Any]:
"""Realiza diarization do áudio"""
start_time = time.time()
try:
# Carregar modelo se necessário
if not self.model_loaded:
self.load_model()
logger.info(f"Iniciando diarization: {audio_path}")
# Aplicar diarization
diarization = self.pipeline(audio_path)
# Processar resultados
segments = []
speakers = set()
for turn, _, speaker in diarization.itertracks(yield_label=True):
speakers.add(speaker)
segments.append({
"start": round(turn.start, 2),
"end": round(turn.end, 2),
"duration": round(turn.end - turn.start, 2),
"speaker": speaker
})
processing_time = time.time() - start_time
result = {
"status": "success",
"segments": segments,
"num_speakers": len(speakers),
"speakers": sorted(list(speakers)),
"total_duration": round(diarization.get_timeline().duration(), 2),
"processing_time": round(processing_time, 2),
"model": "pyannote/speaker-diarization-3.1"
}
logger.info(f"Diarization concluído: {len(segments)} segmentos, {len(speakers)} speakers")
return result
except Exception as e:
logger.error(f"Erro na diarization: {str(e)}")
return {
"status": "error",
"error": str(e),
"processing_time": time.time() - start_time
}
# Inicializar serviço
service = PyannoteService()
def process_audio(audio_file):
"""Processa arquivo de áudio para diarization"""
if not audio_file:
return "❌ Nenhum arquivo de áudio fornecido"
try:
result = service.diarize_audio(audio_file)
if result["status"] == "success":
# Formatar resultado para exibição
summary = f"""
✅ **Diarization Concluído**
📊 **Resumo:**
- **Speakers detectados:** {result['num_speakers']}
- **Total de segmentos:** {len(result['segments'])}
- **Duração total:** {result['total_duration']}s
- **Tempo de processamento:** {result['processing_time']}s
👥 **Speakers:** {', '.join(result['speakers'])}
📋 **Segmentos:**
"""
for i, segment in enumerate(result['segments'][:10]): # Mostrar apenas os primeiros 10
summary += f"\n{i+1}. {segment['start']}s - {segment['end']}s | {segment['speaker']} ({segment['duration']}s)"
if len(result['segments']) > 10:
summary += f"\n... e mais {len(result['segments']) - 10} segmentos"
return summary, json.dumps(result, indent=2)
else:
return f"❌ Erro: {result['error']}", json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Erro no processamento: {str(e)}")
return f"❌ Erro interno: {str(e)}", ""
def health_check():
"""Verifica se o serviço está funcionando"""
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
try:
gpu_info = f"GPU: {torch.cuda.get_device_name(0)}"
except:
gpu_info = "GPU: Available but name not accessible"
else:
gpu_info = "CPU"
status = {
"status": "healthy",
"device": str(device),
"gpu_available": torch.cuda.is_available(),
"gpu_info": gpu_info,
"model_loaded": service.model_loaded
}
return json.dumps(status, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
# Interface Gradio
with gr.Blocks(title="🎭 Pyannote Speaker Diarization", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# 🎭 Pyannote Speaker Diarization
Identifica e separa diferentes speakers em arquivos de áudio usando Pyannote 3.1
**Funcionalidades:**
- ✅ Detecção automática de speakers
- ✅ Segmentação temporal precisa
- ✅ Suporte a múltiplos formatos de áudio (WAV, MP3, FLAC, etc.)
- ✅ GPU dinâmica (quando disponível)
**📁 Como usar:** Faça upload do seu arquivo de áudio abaixo e clique em "Realizar Diarization"
""")
with gr.Row():
with gr.Column(scale=1):
# Input
audio_input = gr.Audio(
type="filepath",
label="📄 Arquivo de Áudio"
)
# Botões
process_btn = gr.Button("🎭 Realizar Diarization", variant="primary", size="lg")
health_btn = gr.Button("🔍 Status do Serviço", variant="secondary")
with gr.Column(scale=2):
# Outputs
result_text = gr.Markdown(label="📊 Resultado", value="Aguardando áudio...")
result_json = gr.JSON(label="📋 Dados Completos")
# Eventos
process_btn.click(
fn=process_audio,
inputs=[audio_input],
outputs=[result_text, result_json]
)
health_btn.click(
fn=health_check,
outputs=[result_json]
)
# Exemplos
gr.Examples(
examples=[],
inputs=audio_input,
label="📁 Exemplos"
)
if __name__ == "__main__":
interface.launch()