| """Space 2: Extract Audio |
| |
| Uploads videos -> extracts audio -> cleans/segments -> saves to Hub. |
| GPU: T4 medium (no ML model needed, pure signal processing) |
| """ |
| import logging |
| import os |
| import shutil |
| import subprocess |
| import traceback |
| from pathlib import Path |
|
|
| import gradio as gr |
| import numpy as np |
| import soundfile as sf |
|
|
| from hub_utils import upload_step |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| IS_HF_SPACE = os.environ.get("SPACE_ID") is not None |
| _data_path = Path("/data") |
| if IS_HF_SPACE and _data_path.exists() and os.access(_data_path, os.W_OK): |
| BASE_DIR = _data_path |
| else: |
| BASE_DIR = Path("data") |
|
|
| AUDIO_DIR = BASE_DIR / "audio" |
| TEMP_DIR = BASE_DIR / "temp" |
|
|
| for d in [AUDIO_DIR, TEMP_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| AUDIO_SAMPLE_RATE = 16000 |
| TARGET_AUDIO_DURATION_MIN = 15 |
| MAX_AUDIO_DURATION_MIN = 30 |
| VAD_AGGRESSIVENESS = 2 |
|
|
| APP_VERSION = "1.0.0" |
|
|
|
|
| |
|
|
| def _ffmpeg_extract_audio(video_path, output_path, sample_rate=16000): |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vn", "-acodec", "pcm_s16le", |
| "-ar", str(sample_rate), "-ac", "1", |
| output_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"FFmpeg failed: {result.stderr[-500:]}") |
|
|
|
|
| |
|
|
| def _apply_vad(audio, sr, aggressiveness=2): |
| frame_duration_ms = 30 |
| frame_size = int(sr * frame_duration_ms / 1000) |
| energies = [] |
| for i in range(0, len(audio) - frame_size, frame_size): |
| frame = audio[i:i + frame_size] |
| rms = np.sqrt(np.mean(frame ** 2)) |
| energies.append(rms) |
| if not energies: |
| return [] |
| energies = np.array(energies) |
| nonzero = energies[energies > 0] |
| threshold = np.percentile(nonzero, 15 + aggressiveness * 10) if len(nonzero) > 0 else 0.005 |
| threshold = max(threshold, 0.002) |
|
|
| segments = [] |
| is_speech = False |
| start = 0 |
| for i, energy in enumerate(energies): |
| sample_pos = i * frame_size |
| if energy > threshold and not is_speech: |
| start = sample_pos |
| is_speech = True |
| elif energy <= threshold and is_speech: |
| end = sample_pos |
| duration = (end - start) / sr |
| if duration >= 1.0: |
| segments.append({"start_sample": start, "end_sample": end, "duration_s": duration}) |
| is_speech = False |
| if is_speech: |
| end = len(audio) |
| duration = (end - start) / sr |
| if duration >= 1.0: |
| segments.append({"start_sample": start, "end_sample": end, "duration_s": duration}) |
| return segments |
|
|
|
|
| def _reduce_noise(audio, sr): |
| import noisereduce as nr |
| return nr.reduce_noise(y=audio, sr=sr, prop_decrease=0.7) |
|
|
|
|
| def _normalize_audio(audio): |
| peak = np.max(np.abs(audio)) |
| if peak > 0: |
| audio = audio / peak * 0.95 |
| return audio |
|
|
|
|
| def _split_into_segments(audio, sr, segment_sec=10.0): |
| seg_samples = int(segment_sec * sr) |
| min_samples = int(2.0 * sr) |
| parts = [] |
| for i in range(0, len(audio), seg_samples): |
| part = audio[i:i + seg_samples] |
| if len(part) >= min_samples: |
| parts.append(part) |
| return parts |
|
|
|
|
| def extract_and_clean_audio(video_paths, target_duration_min, clean_audio, progress_callback=None): |
| temp_audio_dir = TEMP_DIR / "raw_audio" |
| if temp_audio_dir.exists(): |
| shutil.rmtree(temp_audio_dir) |
| temp_audio_dir.mkdir(parents=True) |
|
|
| if AUDIO_DIR.exists(): |
| shutil.rmtree(AUDIO_DIR) |
| AUDIO_DIR.mkdir(parents=True) |
|
|
| all_audio = [] |
| for i, vpath in enumerate(video_paths): |
| if progress_callback: |
| progress_callback(i / len(video_paths) * 0.2, f"Extrayendo audio del video {i+1}...") |
| raw_path = str(temp_audio_dir / f"raw_{i}.wav") |
| _ffmpeg_extract_audio(vpath, raw_path, AUDIO_SAMPLE_RATE) |
| audio, sr = sf.read(raw_path) |
| if audio.ndim > 1: |
| audio = audio.mean(axis=1) |
| all_audio.append(audio) |
|
|
| full_audio = np.concatenate(all_audio) |
| full_audio = _normalize_audio(full_audio) |
|
|
| if clean_audio: |
| logger.info("Clean audio mode: skipping noise reduction and VAD") |
| if progress_callback: |
| progress_callback(0.5, "Dividiendo audio en segmentos...") |
| selected_parts = _split_into_segments(full_audio, AUDIO_SAMPLE_RATE, segment_sec=10.0) |
| else: |
| if progress_callback: |
| progress_callback(0.3, "Reduccion de ruido...") |
| full_audio = _reduce_noise(full_audio, AUDIO_SAMPLE_RATE) |
| full_audio = _normalize_audio(full_audio) |
|
|
| if progress_callback: |
| progress_callback(0.4, "Deteccion de actividad vocal...") |
| segments = _apply_vad(full_audio, AUDIO_SAMPLE_RATE, VAD_AGGRESSIVENESS) |
| segments.sort(key=lambda s: s["duration_s"], reverse=True) |
|
|
| target_samples = int(target_duration_min * 60 * AUDIO_SAMPLE_RATE) |
| max_samples = int(MAX_AUDIO_DURATION_MIN * 60 * AUDIO_SAMPLE_RATE) |
| selected_parts = [] |
| total_samples = 0 |
| for seg in segments: |
| if total_samples >= target_samples: |
| break |
| if total_samples + seg["end_sample"] - seg["start_sample"] > max_samples: |
| continue |
| part = full_audio[seg["start_sample"]:seg["end_sample"]] |
| selected_parts.append(part) |
| total_samples += len(part) |
|
|
| if not selected_parts: |
| raise ValueError("No se encontraron segmentos de audio. Revisa que los videos contengan audio.") |
|
|
| if progress_callback: |
| progress_callback(0.7, "Guardando segmentos...") |
|
|
| segment_paths = [] |
| for i, part in enumerate(selected_parts): |
| seg_path = AUDIO_DIR / f"segment_{i:04d}.wav" |
| sf.write(str(seg_path), part, AUDIO_SAMPLE_RATE) |
| segment_paths.append(str(seg_path)) |
|
|
| clean_full = np.concatenate(selected_parts) |
| full_path = AUDIO_DIR / "full_clean_audio.wav" |
| sf.write(str(full_path), clean_full, AUDIO_SAMPLE_RATE) |
|
|
| total_duration = len(clean_full) / AUDIO_SAMPLE_RATE |
| shutil.rmtree(temp_audio_dir, ignore_errors=True) |
|
|
| return { |
| "full_audio_path": str(full_path), |
| "segments": segment_paths, |
| "total_duration_s": total_duration, |
| } |
|
|
|
|
| |
|
|
| def process_videos(project_name, videos, audio_duration_min, clean_audio, progress=gr.Progress()): |
| if not project_name or not project_name.strip(): |
| return None, "Error: Debes introducir un nombre de proyecto" |
| if not videos: |
| return None, "Error: No se han subido videos" |
|
|
| video_paths = [v.name if hasattr(v, "name") else v for v in videos] |
| logger.info(f"=== Audio Extraction Started === Videos: {len(video_paths)}") |
|
|
| try: |
| result = extract_and_clean_audio( |
| video_paths, |
| target_duration_min=audio_duration_min, |
| clean_audio=clean_audio, |
| progress_callback=lambda p, m: progress(p, desc=m), |
| ) |
| status = ( |
| f"OK - {result['total_duration_s']/60:.1f} min audio, " |
| f"{len(result['segments'])} segmentos" |
| ) |
| logger.info(f"=== Audio Extraction Complete === {status}") |
| return result["full_audio_path"], status |
|
|
| except Exception as e: |
| logger.error(f"=== Audio Extraction Failed ===\n{traceback.format_exc()}") |
| return None, f"Error: {e}" |
|
|
|
|
| def save_to_hub(project_name): |
| if not project_name or not project_name.strip(): |
| return "Error: Debes introducir un nombre de proyecto" |
| name = project_name.strip() |
| segments = list(AUDIO_DIR.glob("segment_*.wav")) |
| if not segments: |
| return "Error: No hay audio para guardar. Procesa videos primero." |
| try: |
| return upload_step(name, "step2_audio", str(AUDIO_DIR)) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| |
|
|
| with gr.Blocks(title="Talking Head - Audio", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(f"# Talking Head - Extraer Audio `v{APP_VERSION}`\nExtrae y limpia audio de videos para entrenamiento de voz") |
|
|
| project_name = gr.Textbox( |
| label="Nombre del proyecto", |
| placeholder="mi_proyecto", |
| info="Obligatorio. Se usa como carpeta en el Hub.", |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| video_input = gr.File( |
| label="Videos (MP4/MOV/AVI/MKV)", file_count="multiple", |
| file_types=[".mp4", ".mov", ".avi", ".mkv"], |
| ) |
| audio_dur = gr.Slider(5, 30, value=TARGET_AUDIO_DURATION_MIN, step=1, label="Duracion audio objetivo (min)") |
| noise_red = gr.Checkbox(value=True, label="Audio limpio / Podcast (conservar todo, sin filtrar)") |
| process_btn = gr.Button("Procesar Videos", variant="primary") |
| with gr.Column(): |
| audio_output = gr.Audio(label="Audio extraido") |
| status_box = gr.Textbox(label="Estado", interactive=False) |
|
|
| save_btn = gr.Button("Guardar en Hub", variant="secondary") |
| save_status = gr.Textbox(label="Estado guardado", interactive=False) |
|
|
| process_btn.click( |
| process_videos, |
| inputs=[project_name, video_input, audio_dur, noise_red], |
| outputs=[audio_output, status_box], |
| ) |
| save_btn.click(save_to_hub, inputs=[project_name], outputs=[save_status]) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |
|
|