Whisper_2 / app.py
RenanOF's picture
Update app.py
cfbffcc verified
import gradio as gr
from transformers import pipeline
from pydub import AudioSegment
from pydub.utils import make_chunks
import tempfile
import os
import math
# --- Configurações ---
MODEL_NAME = "openai/whisper-small" # Ou "base", "small" - Cuidado com RAM/Tempo na CPU
CHUNK_LENGTH_MS = 30_000 # 30 segundos por chunk
MAX_FILE_SIZE_MB = 250 # Aumentado para ~120 min (ajuste conforme necessário)
TARGET_SAMPLE_RATE = 16000
# ---------------------
print(f"Carregando modelo Whisper: {MODEL_NAME}...")
# Inicialize o modelo Whisper
transcriber = pipeline(
"automatic-speech-recognition",
model=MODEL_NAME,
device="cpu" # Mantendo CPU conforme original
)
print("Modelo carregado.")
# Função para dividir áudios longos
def split_audio(audio_path, chunk_length=CHUNK_LENGTH_MS):
try:
audio = AudioSegment.from_file(audio_path)
print(f"Áudio carregado: Duração={audio.duration_seconds:.2f}s, Canais={audio.channels}, Taxa={audio.frame_rate}Hz")
chunks = make_chunks(audio, chunk_length)
print(f"Áudio dividido em {len(chunks)} chunks de ~{chunk_length/1000}s")
return chunks
except Exception as e:
print(f"Erro ao carregar ou dividir áudio: {e}")
raise # Re-lança a exceção para ser pega na função principal
# Função para comprimir/preparar áudio para Whisper
def prepare_audio(audio_path):
try:
audio = AudioSegment.from_file(audio_path)
# Converter para mono, taxa de amostragem alvo, profundidade de bits padrão (16)
prepared_audio = audio.set_frame_rate(TARGET_SAMPLE_RATE).set_channels(1).set_sample_width(2)
# Usar um arquivo temporário gerenciado pelo 'with' se possível
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_f:
prepared_path = temp_f.name
prepared_audio.export(prepared_path, format="wav")
print(f"Áudio preparado e salvo em: {prepared_path}")
return prepared_path
except Exception as e:
print(f"Erro ao preparar áudio: {e}")
raise
# Função para transcrever o áudio (agora como gerador para feedback)
def transcribe_audio_generator(audio_filepath):
if audio_filepath is None:
yield "Erro: Nenhum arquivo de áudio enviado."
return
try:
file_size_bytes = os.path.getsize(audio_filepath)
file_size_mb = file_size_bytes / (1024 * 1024)
print(f"Arquivo recebido: {audio_filepath}, Tamanho: {file_size_mb:.2f} MB")
# Verificar o tamanho do arquivo
if file_size_bytes > MAX_FILE_SIZE_MB * 1024 * 1024:
yield f"Erro: O arquivo excede o limite de {MAX_FILE_SIZE_MB} MB. Tamanho atual: {file_size_mb:.2f} MB."
return
yield f"Iniciando pré-processamento (pode levar um tempo)... Tamanho: {file_size_mb:.2f} MB"
prepared_audio_path = None
try:
prepared_audio_path = prepare_audio(audio_filepath)
yield f"Pré-processamento concluído. Dividindo em chunks..."
chunks = split_audio(prepared_audio_path)
total_chunks = len(chunks)
if total_chunks == 0:
yield "Erro: Não foi possível dividir o áudio em chunks."
return
full_transcription = []
yield f"Iniciando transcrição de {total_chunks} chunks (Modelo: {MODEL_NAME}). Isso pode demorar bastante..."
# Processar cada parte separadamente
for i, chunk in enumerate(chunks):
# Usar arquivo temporário para o chunk
with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as temp_chunk_file:
chunk.export(temp_chunk_file.name, format="wav")
# Transcrever o chunk
# Usar chunk_length para o pipeline pode ajudar em alguns casos
result = transcriber(
temp_chunk_file.name,
chunk_length_s=math.ceil(CHUNK_LENGTH_MS / 1000), # Whisper espera em segundos
return_timestamps=False # Mantido como False
)
transcription = result["text"]
# Adicionar ao resultado geral
chunk_label = f"[Chunk {i+1}/{total_chunks}]"
full_transcription.append(f"{chunk_label}: {transcription}")
# Atualizar a interface a cada chunk (ou a cada N chunks)
progress_update = f"Processando: {i+1}/{total_chunks} chunks...\n\n" + "\n".join(full_transcription)
yield progress_update
yield "Transcrição completa!\n\n" + "\n".join(full_transcription)
except Exception as e:
yield f"Erro durante o processamento: {str(e)}"
# Log detalhado do erro no console do servidor
import traceback
print("Erro detalhado:")
traceback.print_exc()
finally:
# Limpar o arquivo preparado se ele foi criado
if prepared_audio_path and os.path.exists(prepared_audio_path):
try:
os.remove(prepared_audio_path)
print(f"Arquivo temporário preparado removido: {prepared_audio_path}")
except OSError as e:
print(f"Erro ao remover arquivo temporário {prepared_audio_path}: {e}")
# O arquivo original (audio_filepath) é gerenciado pelo Gradio
# Os arquivos de chunk são gerenciados pelo 'with tempfile.NamedTemporaryFile'
except Exception as e:
yield f"Erro inesperado ao processar áudio: {str(e)}"
import traceback
print("Erro detalhado:")
traceback.print_exc()
# Interface gráfica com Gradio
with gr.Blocks() as demo:
gr.Markdown(f"# 🎙️ Whisper Transcription - Áudios Longos (até {MAX_FILE_SIZE_MB} MB)")
gr.Markdown(f"**Atenção:** Áudios muito longos podem levar **muito tempo** para processar (potencialmente horas), especialmente com o modelo `{MODEL_NAME}` na CPU.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(f"### 1️⃣ Envie seu áudio (máx. {MAX_FILE_SIZE_MB} MB)")
audio_input = gr.Audio(type="filepath", label="Envie um arquivo de áudio")
with gr.Column(scale=1):
gr.Markdown("### 2️⃣ Resultado da transcrição (atualizado durante o processo)")
transcription_output = gr.Textbox(label="Transcrição", lines=20, interactive=False)
transcribe_button = gr.Button("🚀 Transcrever Áudio")
# Vincular ação ao botão - Usando a função geradora
transcribe_button.click(
fn=transcribe_audio_generator,
inputs=[audio_input],
outputs=[transcription_output]
)
# Rodar a aplicação
print("Iniciando interface Gradio...")
# share=True pode não funcionar bem com processos muito longos em ambientes gratuitos
demo.launch(share=False) # Recomendo testar localmente primeiro (share=False)
print("Interface disponível.")