Update app.py
Browse files
app.py
CHANGED
|
@@ -1,4 +1,4 @@
|
|
| 1 |
-
# app.py — veureu/asr (Aina faster-whisper Catalan · ZeroGPU) — compatible
|
| 2 |
from __future__ import annotations
|
| 3 |
import os, json, tempfile
|
| 4 |
from typing import Dict, Any, List, Tuple, Optional
|
|
@@ -10,20 +10,17 @@ import torch
|
|
| 10 |
# faster-whisper (CTranslate2)
|
| 11 |
from faster_whisper import WhisperModel
|
| 12 |
|
| 13 |
-
|
| 14 |
-
|
| 15 |
-
|
| 16 |
# =========================
|
| 17 |
-
# Config
|
| 18 |
# =========================
|
| 19 |
-
#
|
| 20 |
-
#
|
| 21 |
MODEL_ID = os.environ.get("MODEL_ID", "projecte-aina/faster-whisper-large-v3-ca-3catparla")
|
| 22 |
|
| 23 |
-
#
|
| 24 |
HAS_CUDA = os.environ.get("CUDA_VISIBLE_DEVICES") not in (None, "", "-1")
|
| 25 |
DEVICE = "cuda" if HAS_CUDA else "cpu"
|
| 26 |
-
COMPUTE_TYPE = "float16" if HAS_CUDA else "int8" # "int8_float16"
|
| 27 |
|
| 28 |
_model: Optional[WhisperModel] = None
|
| 29 |
|
|
@@ -34,7 +31,7 @@ def _lazy_model() -> WhisperModel:
|
|
| 34 |
MODEL_ID,
|
| 35 |
device=DEVICE,
|
| 36 |
compute_type=COMPUTE_TYPE,
|
| 37 |
-
download_root=os.environ.get("HF_HOME") or None, #
|
| 38 |
)
|
| 39 |
return _model
|
| 40 |
|
|
@@ -49,11 +46,11 @@ def _lazy_load_whisper():
|
|
| 49 |
global _model_whis, _processor_whis
|
| 50 |
if _model_whis is None or _processor_whis is None:
|
| 51 |
model_name = "projecte-aina/whisper-large-v3-ca-3catparla"
|
| 52 |
-
|
| 53 |
-
#
|
| 54 |
_processor_whis = WhisperProcessor.from_pretrained(model_name)
|
| 55 |
|
| 56 |
-
#
|
| 57 |
m = WhisperForConditionalGeneration.from_pretrained(
|
| 58 |
model_name,
|
| 59 |
low_cpu_mem_usage=True,
|
|
@@ -67,7 +64,7 @@ def _lazy_load_whisper():
|
|
| 67 |
return _processor_whis, _model_whis
|
| 68 |
|
| 69 |
# ==================================
|
| 70 |
-
#
|
| 71 |
# ==================================
|
| 72 |
@spaces.GPU
|
| 73 |
def _transcribe_core(
|
|
@@ -80,9 +77,9 @@ def _transcribe_core(
|
|
| 80 |
word_timestamps: bool = False,
|
| 81 |
) -> Dict[str, Any]:
|
| 82 |
"""
|
| 83 |
-
|
| 84 |
{
|
| 85 |
-
"text": "
|
| 86 |
"segments": [
|
| 87 |
{"start": 0.10, "end": 1.92, "text": "…"},
|
| 88 |
...
|
|
@@ -93,7 +90,7 @@ def _transcribe_core(
|
|
| 93 |
"""
|
| 94 |
model = _lazy_model()
|
| 95 |
|
| 96 |
-
# faster-whisper
|
| 97 |
segments, info = model.transcribe(
|
| 98 |
audio_path,
|
| 99 |
language=language or "ca",
|
|
@@ -204,33 +201,33 @@ import base64
|
|
| 204 |
import soundfile as sf
|
| 205 |
|
| 206 |
def diarize_audio(
|
| 207 |
-
|
| 208 |
min_segment_duration: float = 0.5,
|
| 209 |
max_segment_duration: float = 50.0,
|
| 210 |
) -> Tuple[List[str], List[Dict[str, Any]]]:
|
| 211 |
"""
|
| 212 |
-
|
| 213 |
-
-
|
| 214 |
-
-
|
| 215 |
-
-
|
| 216 |
"""
|
| 217 |
-
#
|
| 218 |
-
audio = AudioSegment.from_wav(
|
| 219 |
duration = len(audio) / 1000.0
|
| 220 |
|
| 221 |
-
#
|
| 222 |
pipeline = Pipeline.from_pretrained(
|
| 223 |
"pyannote/speaker-diarization-3.1",
|
| 224 |
use_auth_token=os.getenv('HF_TOKEN')
|
| 225 |
)
|
| 226 |
-
diarization = pipeline(
|
| 227 |
|
| 228 |
clip_buffers: List[Tuple[str, BytesIO]] = []
|
| 229 |
segments: List[Dict[str, Any]] = []
|
| 230 |
spk_map: Dict[str, int] = {}
|
| 231 |
prev_end = 0.0
|
| 232 |
|
| 233 |
-
#
|
| 234 |
for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
| 235 |
start, end = max(0.0, float(turn.start)), min(duration, float(turn.end))
|
| 236 |
|
|
@@ -245,7 +242,7 @@ def diarize_audio(
|
|
| 245 |
if seg_dur < min_segment_duration:
|
| 246 |
continue
|
| 247 |
|
| 248 |
-
#
|
| 249 |
if seg_dur > max_segment_duration:
|
| 250 |
n = int(math.ceil(seg_dur / max_segment_duration))
|
| 251 |
sub_d = seg_dur / n
|
|
@@ -276,27 +273,27 @@ def diarize_audio(
|
|
| 276 |
segments.append({"start": start, "end": end, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| 277 |
prev_end = end
|
| 278 |
|
| 279 |
-
#
|
| 280 |
if not segments:
|
| 281 |
buf = BytesIO()
|
| 282 |
audio.export(buf, format="wav")
|
| 283 |
buf.seek(0)
|
| 284 |
return [{"name": "segment_000.wav", "data": base64.b64encode(buf.read()).decode("utf-8")}], [{"start": 0.0, "end": duration, "speaker": "SPEAKER_00"}]
|
| 285 |
|
| 286 |
-
#
|
| 287 |
print("Clip buffers:")
|
| 288 |
print(clip_buffers)
|
| 289 |
|
| 290 |
gr_clips = []
|
| 291 |
for i, (name, buf) in enumerate(clip_buffers, start=1):
|
| 292 |
buf.seek(0)
|
| 293 |
-
#
|
| 294 |
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
|
| 295 |
tmp_file.write(buf.read())
|
| 296 |
tmp_file.close()
|
| 297 |
|
| 298 |
-
#
|
| 299 |
-
new_name = f"
|
| 300 |
new_path = os.path.join(tempfile.gettempdir(), new_name)
|
| 301 |
os.rename(tmp_file.name, new_path)
|
| 302 |
|
|
@@ -313,7 +310,7 @@ from typing import List
|
|
| 313 |
import torchaudio
|
| 314 |
import torch
|
| 315 |
|
| 316 |
-
def voice_embedder(
|
| 317 |
print("======================================================")
|
| 318 |
model = SpeakerRecognition.from_hparams(
|
| 319 |
source="pretrained_models/spkrec-ecapa-voxceleb",
|
|
@@ -321,23 +318,26 @@ def voice_embedder(wav_archivo: str) -> List[float]:
|
|
| 321 |
)
|
| 322 |
model.eval()
|
| 323 |
print("======================================================")
|
| 324 |
-
|
| 325 |
-
|
| 326 |
-
waveform, sr = torchaudio.load(
|
| 327 |
target_sr = 16000
|
| 328 |
|
|
|
|
| 329 |
if sr != target_sr:
|
| 330 |
waveform = T.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
|
|
|
|
|
|
| 331 |
if waveform.shape[0] > 1:
|
| 332 |
waveform = waveform.mean(dim=0, keepdim=True)
|
| 333 |
|
| 334 |
-
#
|
| 335 |
-
|
| 336 |
min_samples = int(0.2 * target_sr)
|
| 337 |
if waveform.shape[1] < min_samples:
|
| 338 |
pad = min_samples - waveform.shape[1]
|
| 339 |
waveform = torch.cat([waveform, torch.zeros((1, pad))], dim=1)
|
| 340 |
|
|
|
|
| 341 |
with torch.no_grad():
|
| 342 |
emb = (
|
| 343 |
model.encode_batch(waveform)
|
|
@@ -347,13 +347,14 @@ def voice_embedder(wav_archivo: str) -> List[float]:
|
|
| 347 |
.astype(float)
|
| 348 |
)
|
| 349 |
|
|
|
|
| 350 |
emb = emb / np.linalg.norm(emb)
|
| 351 |
print(len(emb))
|
| 352 |
print(emb.tolist())
|
| 353 |
-
return
|
| 354 |
|
| 355 |
-
def identify_speaker(
|
| 356 |
-
voice_embedding = voice_embedder(
|
| 357 |
voice_col = json.loads(voice_col)
|
| 358 |
|
| 359 |
identity = "Desconegut"
|
|
@@ -374,7 +375,7 @@ def identify_speaker(wav_archivo: str, voice_col: List[Dict[str, Any]]) -> Dict[
|
|
| 374 |
|
| 375 |
distances_embedding = []
|
| 376 |
|
| 377 |
-
# Compute Euclidean distance between the detected
|
| 378 |
for voice_base_datos in voice_col:
|
| 379 |
voice_base_datos_embedding = np.array(voice_base_datos["embedding"])
|
| 380 |
distance = np.linalg.norm(voice_embedding - voice_base_datos_embedding)
|
|
@@ -387,18 +388,14 @@ def identify_speaker(wav_archivo: str, voice_col: List[Dict[str, Any]]) -> Dict[
|
|
| 387 |
distances_embedding = sorted(distances_embedding, key=lambda x: x["distance"])
|
| 388 |
knn = distances_embedding[:n_results]
|
| 389 |
|
| 390 |
-
# Assign identity if closest match
|
| 391 |
-
|
| 392 |
-
identity = knn[0]["identity"]
|
| 393 |
-
else:
|
| 394 |
-
identity = "Desconegut"'''
|
| 395 |
-
if knn: #puede llegar la distancia máxima a 1.414
|
| 396 |
identity = knn[0]["identity"]
|
| 397 |
else:
|
| 398 |
identity = "Desconegut"
|
| 399 |
|
| 400 |
except Exception as e:
|
| 401 |
-
print(f"
|
| 402 |
knn = []
|
| 403 |
identity = "Desconegut"
|
| 404 |
|
|
@@ -411,22 +408,22 @@ import os
|
|
| 411 |
import shutil
|
| 412 |
import tempfile
|
| 413 |
|
| 414 |
-
def
|
| 415 |
"""
|
| 416 |
-
|
| 417 |
-
|
| 418 |
"""
|
| 419 |
if not os.path.exists(original_file):
|
| 420 |
-
raise FileNotFoundError(f"{original_file}
|
| 421 |
|
| 422 |
-
#
|
| 423 |
temp_fd, temp_path = tempfile.mkstemp(suffix=os.path.splitext(original_file)[1])
|
| 424 |
-
os.close(temp_fd) #
|
| 425 |
|
| 426 |
-
#
|
| 427 |
shutil.copy2(original_file, temp_path)
|
| 428 |
|
| 429 |
-
#
|
| 430 |
os.remove(original_file)
|
| 431 |
|
| 432 |
return temp_path
|
|
@@ -450,17 +447,23 @@ def extract_audio_ffmpeg(video_file, sr: int = 16000, mono: bool = True):
|
|
| 450 |
str
|
| 451 |
Filepath to the extracted WAV audio file.
|
| 452 |
"""
|
| 453 |
-
if video_file is None:
|
| 454 |
-
|
|
|
|
|
|
|
| 455 |
base_name = os.path.splitext(os.path.basename(video_file))[0]
|
| 456 |
-
|
|
|
|
| 457 |
audio_out = f"./{base_name}.wav"
|
| 458 |
-
|
| 459 |
-
|
| 460 |
-
|
|
|
|
|
|
|
|
|
|
| 461 |
extract_audio(input_path=video_file, output_path=audio_out)
|
| 462 |
-
|
| 463 |
-
|
| 464 |
|
| 465 |
import torch
|
| 466 |
import torchaudio
|
|
@@ -480,32 +483,36 @@ def transcribe_wav(wav_path: str) -> str:
|
|
| 480 |
dev = device
|
| 481 |
if dev == "cuda" and not torch.cuda.is_available():
|
| 482 |
dev = "cpu"
|
|
|
|
|
|
|
| 483 |
processor, model = _lazy_load_whisper()
|
| 484 |
device = dev
|
| 485 |
-
|
|
|
|
| 486 |
waveform, sr = torchaudio.load(wav_path)
|
| 487 |
|
| 488 |
target_sr = 16000
|
| 489 |
if sr != target_sr:
|
|
|
|
| 490 |
waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| 491 |
sr = target_sr
|
| 492 |
-
|
|
|
|
| 493 |
inputs = processor(
|
| 494 |
waveform.numpy(), sampling_rate=sr, return_tensors="pt"
|
| 495 |
).input_features.to(model.device)
|
| 496 |
|
| 497 |
-
#
|
| 498 |
with torch.no_grad():
|
| 499 |
ids = model.generate(inputs, max_new_tokens=440)[0]
|
| 500 |
|
| 501 |
-
#
|
| 502 |
txt = processor.decode(ids)
|
| 503 |
|
| 504 |
-
#
|
| 505 |
norm = getattr(processor.tokenizer, "_normalize", None)
|
| 506 |
return norm(txt) if callable(norm) else txt
|
| 507 |
|
| 508 |
-
|
| 509 |
def transcribe_long_audio(
|
| 510 |
wav_path: str,
|
| 511 |
chunk_length_s: int = 20,
|
|
@@ -516,17 +523,21 @@ def transcribe_long_audio(
|
|
| 516 |
dev = device
|
| 517 |
if dev == "cuda" and not torch.cuda.is_available():
|
| 518 |
dev = "cpu"
|
|
|
|
|
|
|
| 519 |
processor, model = _lazy_load_whisper()
|
| 520 |
device = dev
|
| 521 |
-
|
|
|
|
| 522 |
waveform, sr = torchaudio.load(wav_path)
|
| 523 |
target_sr = 16000
|
| 524 |
if sr != target_sr:
|
|
|
|
| 525 |
waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| 526 |
sr = target_sr
|
| 527 |
total_samples = waveform.shape[1]
|
| 528 |
|
| 529 |
-
#
|
| 530 |
chunk_size = chunk_length_s * sr
|
| 531 |
overlap_size = overlap_s * sr
|
| 532 |
|
|
@@ -535,14 +546,16 @@ def transcribe_long_audio(
|
|
| 535 |
|
| 536 |
while start < total_samples:
|
| 537 |
end = min(start + chunk_size, total_samples)
|
| 538 |
-
chunk = waveform[:, start:end]
|
| 539 |
|
|
|
|
| 540 |
input_features = processor(
|
| 541 |
chunk.numpy(),
|
| 542 |
sampling_rate=sr,
|
| 543 |
return_tensors="pt"
|
| 544 |
).input_features.to(model.device)
|
| 545 |
|
|
|
|
| 546 |
with torch.no_grad():
|
| 547 |
predicted_ids = model.generate(
|
| 548 |
input_features,
|
|
@@ -550,15 +563,16 @@ def transcribe_long_audio(
|
|
| 550 |
num_beams=1,
|
| 551 |
)[0]
|
| 552 |
|
|
|
|
| 553 |
text = processor.decode(predicted_ids, skip_special_tokens=True)
|
| 554 |
transcriptions.append(text.strip())
|
| 555 |
|
| 556 |
-
#
|
| 557 |
start += chunk_size - overlap_size
|
| 558 |
|
|
|
|
| 559 |
return " ".join(transcriptions).strip()
|
| 560 |
|
| 561 |
-
|
| 562 |
"""
|
| 563 |
# ==============================================================================
|
| 564 |
# UI & Endpoints
|
|
@@ -593,45 +607,20 @@ h2 {
|
|
| 593 |
}
|
| 594 |
"""
|
| 595 |
with gr.Blocks(title="Aina faster-whisper (Català) · ZeroGPU", css=custom_css,theme=gr.themes.Soft()) as demo:
|
| 596 |
-
# Main transcription section
|
| 597 |
-
gr.Markdown('<h2 style="text-align:center">Aina faster-whisper (Català) · ZeroGPU - Reconeixement de veu en català finetune projecte-aina</h2>')
|
| 598 |
-
with gr.Row():
|
| 599 |
-
with gr.Column():
|
| 600 |
-
inp = gr.Audio(sources=["upload", "microphone"], type="filepath", label="Àudio (WAV/MP3/MP4, etc.)")
|
| 601 |
-
lang = gr.Textbox(label="Idioma", value="ca")
|
| 602 |
-
ts = gr.Checkbox(label="Marques de temps", value=True)
|
| 603 |
-
vad = gr.Checkbox(label="Filtre VAD", value=True)
|
| 604 |
-
with gr.Column():
|
| 605 |
-
out = gr.JSON(label="Sortida /predict")
|
| 606 |
-
with gr.Row():
|
| 607 |
-
btn = gr.Button("Transcriure (ENGINE /predict)", variant="primary")
|
| 608 |
|
| 609 |
-
#
|
| 610 |
-
|
| 611 |
-
|
| 612 |
-
# Advanced transcription section
|
| 613 |
-
gr.Markdown('<h2 style="text-align:center">Avançat (/transcribe)</h2>')
|
| 614 |
with gr.Row():
|
| 615 |
-
|
| 616 |
-
inp2 = gr.Audio(sources=["upload", "microphone"], type="filepath", label="Àudio")
|
| 617 |
-
lang2 = gr.Textbox(label="Idioma", value="ca")
|
| 618 |
-
task2 = gr.Dropdown(["transcribe", "translate"], value="transcribe", label="Tasques")
|
| 619 |
-
vad2 = gr.Checkbox(label="Filtre VAD", value=True)
|
| 620 |
-
beam2 = gr.Slider(1, 10, value=5, step=1, label="Mida del feix")
|
| 621 |
-
temp2 = gr.Slider(0.0, 1.5, value=0.0, step=0.1, label="Temperatura")
|
| 622 |
-
wts2 = gr.Checkbox(label="Marques de temps per paraula", value=False)
|
| 623 |
-
with gr.Column():
|
| 624 |
-
out2 = gr.JSON(label="Sortida /transcribe")
|
| 625 |
with gr.Row():
|
| 626 |
-
|
|
|
|
|
|
|
| 627 |
|
| 628 |
-
|
| 629 |
-
|
| 630 |
-
|
| 631 |
-
|
| 632 |
-
out2,
|
| 633 |
-
api_name="transcribe",
|
| 634 |
-
concurrency_limit=1
|
| 635 |
)
|
| 636 |
|
| 637 |
# Diarization section
|
|
@@ -692,21 +681,6 @@ with gr.Blocks(title="Aina faster-whisper (Català) · ZeroGPU", css=custom_css,
|
|
| 692 |
concurrency_limit=1
|
| 693 |
)
|
| 694 |
|
| 695 |
-
# Extract audio from video
|
| 696 |
-
gr.Markdown('<h2 style="text-align:center">Extreure àudio d\'un vídeo</h2>')
|
| 697 |
-
with gr.Row():
|
| 698 |
-
video_input = gr.Video(label="Puja un vídeo")
|
| 699 |
-
with gr.Row():
|
| 700 |
-
extract_btn = gr.Button("Extreure àudio", variant="primary")
|
| 701 |
-
with gr.Row():
|
| 702 |
-
audio_output = gr.Audio(label="Àudio extret (WAV)", type="filepath")
|
| 703 |
-
|
| 704 |
-
extract_btn.click(
|
| 705 |
-
fn=extract_audio_ffmpeg,
|
| 706 |
-
inputs=video_input,
|
| 707 |
-
outputs=audio_output
|
| 708 |
-
)
|
| 709 |
-
|
| 710 |
# Short audio transcription
|
| 711 |
gr.Markdown('<h2 style="text-align:center">Aina faster-whisper (Català) Àudio curt → text</h2>')
|
| 712 |
with gr.Row():
|
|
@@ -736,5 +710,46 @@ with gr.Blocks(title="Aina faster-whisper (Català) · ZeroGPU", css=custom_css,
|
|
| 736 |
inputs=audio_input,
|
| 737 |
outputs=output_text
|
| 738 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 739 |
|
| 740 |
demo.queue(max_size=8).launch(share=True,show_error=True)
|
|
|
|
| 1 |
+
# app.py — veureu/asr (Aina faster-whisper Catalan · ZeroGPU) — compatible with ENGINE
|
| 2 |
from __future__ import annotations
|
| 3 |
import os, json, tempfile
|
| 4 |
from typing import Dict, Any, List, Tuple, Optional
|
|
|
|
| 10 |
# faster-whisper (CTranslate2)
|
| 11 |
from faster_whisper import WhisperModel
|
| 12 |
|
|
|
|
|
|
|
|
|
|
| 13 |
# =========================
|
| 14 |
+
# Config and lazy loading
|
| 15 |
# =========================
|
| 16 |
+
# By default we use the Catalan finetune from projecte-aina on HF.
|
| 17 |
+
# Change MODEL_ID to the exact repo you are using (e.g.: "projecte-aina/faster-whisper-large-v3-ca-3catparla")
|
| 18 |
MODEL_ID = os.environ.get("MODEL_ID", "projecte-aina/faster-whisper-large-v3-ca-3catparla")
|
| 19 |
|
| 20 |
+
# Detect if there is a GPU (ZeroGPU) -> fp16, otherwise INT8
|
| 21 |
HAS_CUDA = os.environ.get("CUDA_VISIBLE_DEVICES") not in (None, "", "-1")
|
| 22 |
DEVICE = "cuda" if HAS_CUDA else "cpu"
|
| 23 |
+
COMPUTE_TYPE = "float16" if HAS_CUDA else "int8" # "int8_float16" also works on low-end GPUs
|
| 24 |
|
| 25 |
_model: Optional[WhisperModel] = None
|
| 26 |
|
|
|
|
| 31 |
MODEL_ID,
|
| 32 |
device=DEVICE,
|
| 33 |
compute_type=COMPUTE_TYPE,
|
| 34 |
+
download_root=os.environ.get("HF_HOME") or None, # optional
|
| 35 |
)
|
| 36 |
return _model
|
| 37 |
|
|
|
|
| 46 |
global _model_whis, _processor_whis
|
| 47 |
if _model_whis is None or _processor_whis is None:
|
| 48 |
model_name = "projecte-aina/whisper-large-v3-ca-3catparla"
|
| 49 |
+
|
| 50 |
+
# processor
|
| 51 |
_processor_whis = WhisperProcessor.from_pretrained(model_name)
|
| 52 |
|
| 53 |
+
# model
|
| 54 |
m = WhisperForConditionalGeneration.from_pretrained(
|
| 55 |
model_name,
|
| 56 |
low_cpu_mem_usage=True,
|
|
|
|
| 64 |
return _processor_whis, _model_whis
|
| 65 |
|
| 66 |
# ==================================
|
| 67 |
+
# Transcription core (Catalan)
|
| 68 |
# ==================================
|
| 69 |
@spaces.GPU
|
| 70 |
def _transcribe_core(
|
|
|
|
| 77 |
word_timestamps: bool = False,
|
| 78 |
) -> Dict[str, Any]:
|
| 79 |
"""
|
| 80 |
+
Returns:
|
| 81 |
{
|
| 82 |
+
"text": "transcription…",
|
| 83 |
"segments": [
|
| 84 |
{"start": 0.10, "end": 1.92, "text": "…"},
|
| 85 |
...
|
|
|
|
| 90 |
"""
|
| 91 |
model = _lazy_model()
|
| 92 |
|
| 93 |
+
# faster-whisper produces a generator of segments + info
|
| 94 |
segments, info = model.transcribe(
|
| 95 |
audio_path,
|
| 96 |
language=language or "ca",
|
|
|
|
| 201 |
import soundfile as sf
|
| 202 |
|
| 203 |
def diarize_audio(
|
| 204 |
+
wav_file: str,
|
| 205 |
min_segment_duration: float = 0.5,
|
| 206 |
max_segment_duration: float = 50.0,
|
| 207 |
) -> Tuple[List[str], List[Dict[str, Any]]]:
|
| 208 |
"""
|
| 209 |
+
Audio diarization that:
|
| 210 |
+
- Reads a WAV file
|
| 211 |
+
- Returns clips in memory as dicts for Gradio (without saving files)
|
| 212 |
+
- Returns the list of segments [{'start','end','speaker'}]
|
| 213 |
"""
|
| 214 |
+
# Load audio and calculate duration
|
| 215 |
+
audio = AudioSegment.from_wav(wav_file)
|
| 216 |
duration = len(audio) / 1000.0
|
| 217 |
|
| 218 |
+
# Diarization pipeline
|
| 219 |
pipeline = Pipeline.from_pretrained(
|
| 220 |
"pyannote/speaker-diarization-3.1",
|
| 221 |
use_auth_token=os.getenv('HF_TOKEN')
|
| 222 |
)
|
| 223 |
+
diarization = pipeline(wav_file)
|
| 224 |
|
| 225 |
clip_buffers: List[Tuple[str, BytesIO]] = []
|
| 226 |
segments: List[Dict[str, Any]] = []
|
| 227 |
spk_map: Dict[str, int] = {}
|
| 228 |
prev_end = 0.0
|
| 229 |
|
| 230 |
+
# Process each segment
|
| 231 |
for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
| 232 |
start, end = max(0.0, float(turn.start)), min(duration, float(turn.end))
|
| 233 |
|
|
|
|
| 242 |
if seg_dur < min_segment_duration:
|
| 243 |
continue
|
| 244 |
|
| 245 |
+
# Split very long segments
|
| 246 |
if seg_dur > max_segment_duration:
|
| 247 |
n = int(math.ceil(seg_dur / max_segment_duration))
|
| 248 |
sub_d = seg_dur / n
|
|
|
|
| 273 |
segments.append({"start": start, "end": end, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| 274 |
prev_end = end
|
| 275 |
|
| 276 |
+
# If no segments, use the entire audio
|
| 277 |
if not segments:
|
| 278 |
buf = BytesIO()
|
| 279 |
audio.export(buf, format="wav")
|
| 280 |
buf.seek(0)
|
| 281 |
return [{"name": "segment_000.wav", "data": base64.b64encode(buf.read()).decode("utf-8")}], [{"start": 0.0, "end": duration, "speaker": "SPEAKER_00"}]
|
| 282 |
|
| 283 |
+
# Convert all clips to dicts for Gradio
|
| 284 |
print("Clip buffers:")
|
| 285 |
print(clip_buffers)
|
| 286 |
|
| 287 |
gr_clips = []
|
| 288 |
for i, (name, buf) in enumerate(clip_buffers, start=1):
|
| 289 |
buf.seek(0)
|
| 290 |
+
# Create temporary file but with friendly name
|
| 291 |
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
|
| 292 |
tmp_file.write(buf.read())
|
| 293 |
tmp_file.close()
|
| 294 |
|
| 295 |
+
# Rename to something like "clip1.wav", "clip2.wav", ...
|
| 296 |
+
new_name = f"clip{i}.wav"
|
| 297 |
new_path = os.path.join(tempfile.gettempdir(), new_name)
|
| 298 |
os.rename(tmp_file.name, new_path)
|
| 299 |
|
|
|
|
| 310 |
import torchaudio
|
| 311 |
import torch
|
| 312 |
|
| 313 |
+
def voice_embedder(wav_file: str) -> List[float]:
|
| 314 |
print("======================================================")
|
| 315 |
model = SpeakerRecognition.from_hparams(
|
| 316 |
source="pretrained_models/spkrec-ecapa-voxceleb",
|
|
|
|
| 318 |
)
|
| 319 |
model.eval()
|
| 320 |
print("======================================================")
|
| 321 |
+
|
| 322 |
+
# Audio preprocessing
|
| 323 |
+
waveform, sr = torchaudio.load(wav_file)
|
| 324 |
target_sr = 16000
|
| 325 |
|
| 326 |
+
# Resample if needed
|
| 327 |
if sr != target_sr:
|
| 328 |
waveform = T.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| 329 |
+
|
| 330 |
+
# Convert to mono if stereo
|
| 331 |
if waveform.shape[0] > 1:
|
| 332 |
waveform = waveform.mean(dim=0, keepdim=True)
|
| 333 |
|
| 334 |
+
# Minimum duration of 0.2 seconds
|
|
|
|
| 335 |
min_samples = int(0.2 * target_sr)
|
| 336 |
if waveform.shape[1] < min_samples:
|
| 337 |
pad = min_samples - waveform.shape[1]
|
| 338 |
waveform = torch.cat([waveform, torch.zeros((1, pad))], dim=1)
|
| 339 |
|
| 340 |
+
# Compute speaker embedding
|
| 341 |
with torch.no_grad():
|
| 342 |
emb = (
|
| 343 |
model.encode_batch(waveform)
|
|
|
|
| 347 |
.astype(float)
|
| 348 |
)
|
| 349 |
|
| 350 |
+
# Normalize embedding
|
| 351 |
emb = emb / np.linalg.norm(emb)
|
| 352 |
print(len(emb))
|
| 353 |
print(emb.tolist())
|
| 354 |
+
return emb.tolist()
|
| 355 |
|
| 356 |
+
def identify_speaker(wav_file: str, voice_col: List[Dict[str, Any]]) -> Dict[str, Any]:
|
| 357 |
+
voice_embedding = voice_embedder(wav_file)
|
| 358 |
voice_col = json.loads(voice_col)
|
| 359 |
|
| 360 |
identity = "Desconegut"
|
|
|
|
| 375 |
|
| 376 |
distances_embedding = []
|
| 377 |
|
| 378 |
+
# Compute Euclidean distance between the detected voice and each stored embedding
|
| 379 |
for voice_base_datos in voice_col:
|
| 380 |
voice_base_datos_embedding = np.array(voice_base_datos["embedding"])
|
| 381 |
distance = np.linalg.norm(voice_embedding - voice_base_datos_embedding)
|
|
|
|
| 388 |
distances_embedding = sorted(distances_embedding, key=lambda x: x["distance"])
|
| 389 |
knn = distances_embedding[:n_results]
|
| 390 |
|
| 391 |
+
# Assign identity if closest match exists
|
| 392 |
+
if knn:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 393 |
identity = knn[0]["identity"]
|
| 394 |
else:
|
| 395 |
identity = "Desconegut"
|
| 396 |
|
| 397 |
except Exception as e:
|
| 398 |
+
print(f"Voice KNN failed: {e}")
|
| 399 |
knn = []
|
| 400 |
identity = "Desconegut"
|
| 401 |
|
|
|
|
| 408 |
import shutil
|
| 409 |
import tempfile
|
| 410 |
|
| 411 |
+
def convert_to_temporary(original_file):
|
| 412 |
"""
|
| 413 |
+
Converts a file to a temporary file, deletes the original, and returns
|
| 414 |
+
the path of the temporary file.
|
| 415 |
"""
|
| 416 |
if not os.path.exists(original_file):
|
| 417 |
+
raise FileNotFoundError(f"{original_file} does not exist")
|
| 418 |
|
| 419 |
+
# Create a temporary file in persistent mode
|
| 420 |
temp_fd, temp_path = tempfile.mkstemp(suffix=os.path.splitext(original_file)[1])
|
| 421 |
+
os.close(temp_fd) # Close the file descriptor; we'll use it as a normal file
|
| 422 |
|
| 423 |
+
# Copy the content to the temporary file
|
| 424 |
shutil.copy2(original_file, temp_path)
|
| 425 |
|
| 426 |
+
# Delete the original file
|
| 427 |
os.remove(original_file)
|
| 428 |
|
| 429 |
return temp_path
|
|
|
|
| 447 |
str
|
| 448 |
Filepath to the extracted WAV audio file.
|
| 449 |
"""
|
| 450 |
+
if video_file is None:
|
| 451 |
+
return None
|
| 452 |
+
|
| 453 |
+
# Extract the file name without extension
|
| 454 |
base_name = os.path.splitext(os.path.basename(video_file))[0]
|
| 455 |
+
|
| 456 |
+
# Build the output path with .wav extension
|
| 457 |
audio_out = f"./{base_name}.wav"
|
| 458 |
+
|
| 459 |
+
# If the file already exists, return it directly
|
| 460 |
+
if os.path.exists(audio_out+".mp3"):
|
| 461 |
+
return audio_out
|
| 462 |
+
|
| 463 |
+
# Call the function that performs the extraction
|
| 464 |
extract_audio(input_path=video_file, output_path=audio_out)
|
| 465 |
+
|
| 466 |
+
return convert_to_temporary(audio_out+".mp3")
|
| 467 |
|
| 468 |
import torch
|
| 469 |
import torchaudio
|
|
|
|
| 483 |
dev = device
|
| 484 |
if dev == "cuda" and not torch.cuda.is_available():
|
| 485 |
dev = "cpu"
|
| 486 |
+
|
| 487 |
+
# Lazy-load the Whisper processor and model
|
| 488 |
processor, model = _lazy_load_whisper()
|
| 489 |
device = dev
|
| 490 |
+
|
| 491 |
+
# Load the WAV file
|
| 492 |
waveform, sr = torchaudio.load(wav_path)
|
| 493 |
|
| 494 |
target_sr = 16000
|
| 495 |
if sr != target_sr:
|
| 496 |
+
# Resample audio if sample rate differs
|
| 497 |
waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| 498 |
sr = target_sr
|
| 499 |
+
|
| 500 |
+
# Preprocess the audio
|
| 501 |
inputs = processor(
|
| 502 |
waveform.numpy(), sampling_rate=sr, return_tensors="pt"
|
| 503 |
).input_features.to(model.device)
|
| 504 |
|
| 505 |
+
# Generate transcription with the model
|
| 506 |
with torch.no_grad():
|
| 507 |
ids = model.generate(inputs, max_new_tokens=440)[0]
|
| 508 |
|
| 509 |
+
# Decode the transcription
|
| 510 |
txt = processor.decode(ids)
|
| 511 |
|
| 512 |
+
# Normalize text if necessary
|
| 513 |
norm = getattr(processor.tokenizer, "_normalize", None)
|
| 514 |
return norm(txt) if callable(norm) else txt
|
| 515 |
|
|
|
|
| 516 |
def transcribe_long_audio(
|
| 517 |
wav_path: str,
|
| 518 |
chunk_length_s: int = 20,
|
|
|
|
| 523 |
dev = device
|
| 524 |
if dev == "cuda" and not torch.cuda.is_available():
|
| 525 |
dev = "cpu"
|
| 526 |
+
|
| 527 |
+
# Lazy-load the Whisper processor and model
|
| 528 |
processor, model = _lazy_load_whisper()
|
| 529 |
device = dev
|
| 530 |
+
|
| 531 |
+
# Load the full WAV file
|
| 532 |
waveform, sr = torchaudio.load(wav_path)
|
| 533 |
target_sr = 16000
|
| 534 |
if sr != target_sr:
|
| 535 |
+
# Resample if sample rate differs
|
| 536 |
waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| 537 |
sr = target_sr
|
| 538 |
total_samples = waveform.shape[1]
|
| 539 |
|
| 540 |
+
# Calculate chunk size and overlap in samples
|
| 541 |
chunk_size = chunk_length_s * sr
|
| 542 |
overlap_size = overlap_s * sr
|
| 543 |
|
|
|
|
| 546 |
|
| 547 |
while start < total_samples:
|
| 548 |
end = min(start + chunk_size, total_samples)
|
| 549 |
+
chunk = waveform[:, start:end] # Transcribe in small fragments
|
| 550 |
|
| 551 |
+
# Preprocess the chunk
|
| 552 |
input_features = processor(
|
| 553 |
chunk.numpy(),
|
| 554 |
sampling_rate=sr,
|
| 555 |
return_tensors="pt"
|
| 556 |
).input_features.to(model.device)
|
| 557 |
|
| 558 |
+
# Generate transcription for the chunk
|
| 559 |
with torch.no_grad():
|
| 560 |
predicted_ids = model.generate(
|
| 561 |
input_features,
|
|
|
|
| 563 |
num_beams=1,
|
| 564 |
)[0]
|
| 565 |
|
| 566 |
+
# Decode and store the chunk transcription
|
| 567 |
text = processor.decode(predicted_ids, skip_special_tokens=True)
|
| 568 |
transcriptions.append(text.strip())
|
| 569 |
|
| 570 |
+
# Move to the next chunk with overlap
|
| 571 |
start += chunk_size - overlap_size
|
| 572 |
|
| 573 |
+
# Join all chunks into a single string
|
| 574 |
return " ".join(transcriptions).strip()
|
| 575 |
|
|
|
|
| 576 |
"""
|
| 577 |
# ==============================================================================
|
| 578 |
# UI & Endpoints
|
|
|
|
| 607 |
}
|
| 608 |
"""
|
| 609 |
with gr.Blocks(title="Aina faster-whisper (Català) · ZeroGPU", css=custom_css,theme=gr.themes.Soft()) as demo:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 610 |
|
| 611 |
+
# Extract audio from video
|
| 612 |
+
gr.Markdown('<h2 style="text-align:center">Extreure àudio d\'un vídeo</h2>')
|
|
|
|
|
|
|
|
|
|
| 613 |
with gr.Row():
|
| 614 |
+
video_input = gr.Video(label="Puja un vídeo")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 615 |
with gr.Row():
|
| 616 |
+
extract_btn = gr.Button("Extreure àudio", variant="primary")
|
| 617 |
+
with gr.Row():
|
| 618 |
+
audio_output = gr.Audio(label="Àudio extret (WAV)", type="filepath")
|
| 619 |
|
| 620 |
+
extract_btn.click(
|
| 621 |
+
fn=extract_audio_ffmpeg,
|
| 622 |
+
inputs=video_input,
|
| 623 |
+
outputs=audio_output
|
|
|
|
|
|
|
|
|
|
| 624 |
)
|
| 625 |
|
| 626 |
# Diarization section
|
|
|
|
| 681 |
concurrency_limit=1
|
| 682 |
)
|
| 683 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 684 |
# Short audio transcription
|
| 685 |
gr.Markdown('<h2 style="text-align:center">Aina faster-whisper (Català) Àudio curt → text</h2>')
|
| 686 |
with gr.Row():
|
|
|
|
| 710 |
inputs=audio_input,
|
| 711 |
outputs=output_text
|
| 712 |
)
|
| 713 |
+
|
| 714 |
+
# Main transcription section
|
| 715 |
+
gr.Markdown('<h2 style="text-align:center">Aina faster-whisper (Català) · ZeroGPU - Reconeixement de veu en català finetune projecte-aina</h2>')
|
| 716 |
+
with gr.Row():
|
| 717 |
+
with gr.Column():
|
| 718 |
+
inp = gr.Audio(sources=["upload", "microphone"], type="filepath", label="Àudio (WAV/MP3/MP4, etc.)")
|
| 719 |
+
lang = gr.Textbox(label="Idioma", value="ca")
|
| 720 |
+
ts = gr.Checkbox(label="Marques de temps", value=True)
|
| 721 |
+
vad = gr.Checkbox(label="Filtre VAD", value=True)
|
| 722 |
+
with gr.Column():
|
| 723 |
+
out = gr.JSON(label="Sortida /predict")
|
| 724 |
+
with gr.Row():
|
| 725 |
+
btn = gr.Button("Transcriure (ENGINE /predict)", variant="primary")
|
| 726 |
+
|
| 727 |
+
# Button callback
|
| 728 |
+
btn.click(predict_for_engine, [inp, lang, ts, vad], out, api_name="predict", concurrency_limit=1)
|
| 729 |
+
|
| 730 |
+
# Advanced transcription section
|
| 731 |
+
gr.Markdown('<h2 style="text-align:center">Avançat (/transcribe)</h2>')
|
| 732 |
+
with gr.Row():
|
| 733 |
+
with gr.Column():
|
| 734 |
+
inp2 = gr.Audio(sources=["upload", "microphone"], type="filepath", label="Àudio")
|
| 735 |
+
lang2 = gr.Textbox(label="Idioma", value="ca")
|
| 736 |
+
task2 = gr.Dropdown(["transcribe", "translate"], value="transcribe", label="Tasques")
|
| 737 |
+
vad2 = gr.Checkbox(label="Filtre VAD", value=True)
|
| 738 |
+
beam2 = gr.Slider(1, 10, value=5, step=1, label="Mida del feix")
|
| 739 |
+
temp2 = gr.Slider(0.0, 1.5, value=0.0, step=0.1, label="Temperatura")
|
| 740 |
+
wts2 = gr.Checkbox(label="Marques de temps per paraula", value=False)
|
| 741 |
+
with gr.Column():
|
| 742 |
+
out2 = gr.JSON(label="Sortida /transcribe")
|
| 743 |
+
with gr.Row():
|
| 744 |
+
btn2 = gr.Button("Transcriure (avançat)", variant="primary")
|
| 745 |
+
|
| 746 |
+
# Button callback advanced
|
| 747 |
+
btn2.click(
|
| 748 |
+
transcribe_advanced,
|
| 749 |
+
[inp2, lang2, task2, vad2, beam2, temp2, wts2],
|
| 750 |
+
out2,
|
| 751 |
+
api_name="transcribe",
|
| 752 |
+
concurrency_limit=1
|
| 753 |
+
)
|
| 754 |
|
| 755 |
demo.queue(max_size=8).launch(share=True,show_error=True)
|