|
|
import os |
|
|
import whisperx |
|
|
import torch |
|
|
import re |
|
|
import json |
|
|
import difflib |
|
|
from docx import Document |
|
|
import gc |
|
|
|
|
|
class TranscriptionProcessor: |
|
|
def __init__(self, device="cpu", model_name="large-v3", compute_type="int8"): |
|
|
self.device = device |
|
|
self.model_name = model_name |
|
|
self.compute_type = compute_type |
|
|
self.model = None |
|
|
self.align_model_cache = {} |
|
|
|
|
|
|
|
|
self.base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
self.model_dir = os.path.join(self.base_dir, "models") |
|
|
os.makedirs(self.model_dir, exist_ok=True) |
|
|
|
|
|
def load_models(self): |
|
|
"""Carregamento seguro com gerenciamento de memória""" |
|
|
if self.model is None: |
|
|
print(f"[PROCESSOR] Carregando modelo Whisper: {self.model_name} em {self.device}") |
|
|
|
|
|
gc.collect() |
|
|
if self.device == "cuda": torch.cuda.empty_cache() |
|
|
|
|
|
try: |
|
|
self.model = whisperx.load_model( |
|
|
self.model_name, |
|
|
self.device, |
|
|
compute_type=self.compute_type, |
|
|
download_root=self.model_dir |
|
|
) |
|
|
print("[PROCESSOR] Modelo Whisper carregado com sucesso.") |
|
|
except Exception as e: |
|
|
print(f"[PROCESSOR ERROR] Falha ao carregar modelo: {e}") |
|
|
raise e |
|
|
|
|
|
def process_docx(self, file_path): |
|
|
"""Processamento robusto de DOCX com limpeza de caracteres de controle""" |
|
|
if not file_path: return "" |
|
|
print(f"[DOCX] Processando roteiro: {file_path}") |
|
|
try: |
|
|
doc = Document(file_path) |
|
|
full_text = [] |
|
|
for para in doc.paragraphs: |
|
|
text = para.text.strip() |
|
|
if text: |
|
|
|
|
|
text = "".join(char for char in text if char.isprintable() or char == "\n") |
|
|
|
|
|
text = re.sub(r'[—–-]\s+', ', ', text) |
|
|
text = re.sub(r'\s+,\s+', ', ', text) |
|
|
full_text.append(text) |
|
|
return " ".join(full_text) |
|
|
except Exception as e: |
|
|
print(f"[DOCX ERROR] {e}") |
|
|
return "" |
|
|
|
|
|
def transcribe(self, audio_path, language="pt"): |
|
|
"""Transcrição com sistema de Fallback Blindado""" |
|
|
try: |
|
|
self.load_models() |
|
|
audio = whisperx.load_audio(audio_path) |
|
|
|
|
|
|
|
|
print("[WHISPER] Transcrevendo...") |
|
|
result = self.model.transcribe(audio, batch_size=8, language=language) |
|
|
|
|
|
|
|
|
print("[WHISPER] Alinhando...") |
|
|
try: |
|
|
if language not in self.align_model_cache: |
|
|
self.align_model_cache[language] = whisperx.load_align_model( |
|
|
language_code=language, device=self.device |
|
|
) |
|
|
model_a, metadata = self.align_model_cache[language] |
|
|
result = whisperx.align( |
|
|
result["segments"], model_a, metadata, audio, self.device |
|
|
) |
|
|
except Exception as align_err: |
|
|
print(f"[WHISPER WARNING] Falha no alinhamento: {align_err}. Seguindo com transcrição base.") |
|
|
|
|
|
|
|
|
|
|
|
words = [] |
|
|
for segment in result["segments"]: |
|
|
|
|
|
word_list = segment.get("words", segment.get("word_segments", [])) |
|
|
for w in word_list: |
|
|
if "start" in w and "end" in w: |
|
|
words.append({ |
|
|
"start": round(w["start"], 3), |
|
|
"end": round(w["end"], 3), |
|
|
"word": w.get("word", w.get("text", "")).strip() |
|
|
}) |
|
|
|
|
|
return words |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[WHISPER ERROR] {str(e)}") |
|
|
raise e |
|
|
|
|
|
def align_with_script(self, audio_words, script_text): |
|
|
""" |
|
|
CORREÇÃO INTELIGENTE (VSL BLINDADA): |
|
|
Compara a transcrição com o roteiro e corrige ortografia/termos técnicos |
|
|
preservando o tempo do áudio. |
|
|
""" |
|
|
if not script_text: |
|
|
return audio_words |
|
|
|
|
|
print("[REFINE] Iniciando correção inteligente baseada no roteiro...") |
|
|
|
|
|
|
|
|
script_raw = script_text.split() |
|
|
script_clean = [re.sub(r'[^\w]', '', w).lower() for w in script_raw] |
|
|
audio_clean = [re.sub(r'[^\w]', '', w['word']).lower() for w in audio_words] |
|
|
|
|
|
|
|
|
matcher = difflib.SequenceMatcher(None, audio_clean, script_clean) |
|
|
opcodes = matcher.get_opcodes() |
|
|
|
|
|
refined_words = [] |
|
|
|
|
|
for tag, i1, i2, j1, j2 in opcodes: |
|
|
if tag == 'equal': |
|
|
|
|
|
for k in range(i2 - i1): |
|
|
word_obj = audio_words[i1 + k].copy() |
|
|
word_obj['word'] = script_raw[j1 + k] |
|
|
refined_words.append(word_obj) |
|
|
|
|
|
elif tag == 'replace': |
|
|
|
|
|
|
|
|
if (i2 - i1) == (j2 - j1): |
|
|
|
|
|
for k in range(i2 - i1): |
|
|
word_obj = audio_words[i1 + k].copy() |
|
|
word_obj['word'] = script_raw[j1 + k] |
|
|
refined_words.append(word_obj) |
|
|
else: |
|
|
|
|
|
|
|
|
new_word_text = " ".join(script_raw[j1:j2]) |
|
|
word_obj = { |
|
|
"start": audio_words[i1]["start"], |
|
|
"end": audio_words[i2-1]["end"], |
|
|
"word": new_word_text |
|
|
} |
|
|
refined_words.append(word_obj) |
|
|
|
|
|
elif tag == 'delete': |
|
|
|
|
|
for k in range(i1, i2): |
|
|
refined_words.append(audio_words[k]) |
|
|
|
|
|
elif tag == 'insert': |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
print(f"[REFINE] Concluído. {len(refined_words)} palavras na saída final.") |
|
|
return refined_words |
|
|
|
|
|
def correct_orthography(self, words): |
|
|
"""Correções rápidas pós-processamento""" |
|
|
for w in words: |
|
|
|
|
|
w["word"] = w["word"].replace(" ,", ",").replace(",,", ",") |
|
|
return words |
|
|
|
|
|
def generate_json(self, words): |
|
|
"""Garante formato JSON estável para o WebApp""" |
|
|
return {"words": words} |
|
|
|