voice-tools / src /web /handlers.py
jcudit's picture
jcudit HF Staff
feat: implement cross-mode robustness fixes (phases 1-8)
95e1515
"""
Event handlers for Gradio web interface.
Implements the business logic for web UI interactions including
file validation, processing, and result generation.
"""
import logging
import tempfile
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
from src.lib.audio_io import read_audio
from src.models.error_report import ErrorReport
from src.models.processing_job import ExtractionMode, ProcessingJob
from src.services.batch_processor import BatchProcessor
logger = logging.getLogger(__name__)
# File size limits (in bytes)
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB
MAX_REFERENCE_DURATION = 60 # 60 seconds
def validate_files_handler(
reference_file: Optional[str], input_files: Optional[List[str]]
) -> Tuple[bool, str]:
"""
Validate uploaded files.
Args:
reference_file: Path to reference audio file
input_files: List of input audio file paths
Returns:
Tuple of (is_valid, error_message)
"""
if not reference_file:
return False, "❌ Please upload a reference voice file"
if not input_files or len(input_files) == 0:
return False, "❌ Please upload at least one audio file to process"
# Validate reference file
ref_path = Path(reference_file)
if not ref_path.exists():
return False, f"❌ Reference file not found: {ref_path}"
if ref_path.stat().st_size == 0:
return False, "❌ Reference file is empty"
if ref_path.stat().st_size > MAX_FILE_SIZE:
return False, f"❌ Reference file too large (max {MAX_FILE_SIZE // (1024 * 1024)} MB)"
# Check reference duration
try:
audio, sample_rate = read_audio(str(ref_path))
duration = len(audio) / sample_rate
if duration > MAX_REFERENCE_DURATION:
return (
False,
f"❌ Reference file too long ({duration:.1f}s). Please use a clip under {MAX_REFERENCE_DURATION}s",
)
if duration < 2:
return (
False,
f"❌ Reference file too short ({duration:.1f}s). Please use a clip at least 2 seconds long",
)
except Exception as e:
error_report: ErrorReport = {
"status": "failed",
"error": f"Could not read reference file: {e}",
"error_type": "audio_io",
}
return False, f"❌ {error_report['error']}"
# Validate input files
for input_file in input_files:
input_path = Path(input_file)
if not input_path.exists():
return False, f"❌ Input file not found: {input_path.name}"
if input_path.stat().st_size == 0:
return False, f"❌ Input file is empty: {input_path.name}"
if input_path.stat().st_size > MAX_FILE_SIZE:
return (
False,
f"❌ Input file too large: {input_path.name} (max {MAX_FILE_SIZE // (1024 * 1024)} MB)",
)
# Check file format
if input_path.suffix.lower() not in [".m4a", ".wav", ".mp3", ".flac"]:
return False, f"❌ Unsupported file format: {input_path.name}"
return True, "βœ… All files validated successfully"
def estimate_time_handler(
reference_file: Optional[str],
input_files: Optional[List[str]],
vad_threshold: float,
enable_vad: bool,
) -> Tuple[Dict[str, Any], str]:
"""
Estimate processing time for uploaded files.
Args:
reference_file: Path to reference audio file
input_files: List of input audio file paths
vad_threshold: VAD threshold
enable_vad: Whether to use VAD optimization
Returns:
Tuple of (estimates_dict, status_message)
"""
# Validate files first
is_valid, message = validate_files_handler(reference_file, input_files)
if not is_valid:
return {}, message
try:
processor = BatchProcessor(vad_threshold=vad_threshold, enable_vad=enable_vad)
total_estimates = {
"num_files": len(input_files),
"total_duration": 0.0,
"total_voice_duration": 0.0,
"estimated_processing_time": 0.0,
"per_file_estimates": [],
}
for input_file in input_files:
estimates = processor.estimate_processing_time(Path(input_file), enable_vad=enable_vad)
total_estimates["total_duration"] += estimates["total_duration"]
total_estimates["total_voice_duration"] += estimates["voice_duration"]
total_estimates["estimated_processing_time"] += estimates["estimated_processing_time"]
total_estimates["per_file_estimates"].append(
{
"filename": Path(input_file).name,
"duration": f"{estimates['total_duration']:.1f}s",
"voice_duration": f"{estimates['voice_duration']:.1f}s",
"estimated_time": f"{estimates['estimated_minutes']:.1f} min",
}
)
# Format summary
total_estimates["total_duration_formatted"] = (
f"{total_estimates['total_duration'] / 60:.1f} minutes"
)
total_estimates["total_voice_duration_formatted"] = (
f"{total_estimates['total_voice_duration'] / 60:.1f} minutes"
)
total_estimates["estimated_processing_time_formatted"] = (
f"{total_estimates['estimated_processing_time'] / 60:.1f} minutes"
)
total_estimates["voice_percentage"] = (
(total_estimates["total_voice_duration"] / total_estimates["total_duration"]) * 100
if total_estimates["total_duration"] > 0
else 0
)
status = (
f"πŸ“Š Estimated processing time: {total_estimates['estimated_processing_time'] / 60:.1f} minutes\n"
f"πŸ“ {len(input_files)} file(s) | "
f"⏱️ {total_estimates['total_duration'] / 60:.1f} min total | "
f"🎀 {total_estimates['voice_percentage']:.1f}% voice activity"
)
return total_estimates, status
except Exception as e:
logger.exception("Estimation failed")
error_report: ErrorReport = {
"status": "failed",
"error": f"Estimation failed: {str(e)}",
"error_type": "processing",
}
return {}, f"❌ {error_report['error']}"
def process_batch_handler(
reference_file: Optional[str],
input_files: Optional[List[str]],
extraction_mode: str,
vad_threshold: float,
voice_threshold: float,
speech_threshold: float,
enable_vad: bool,
progress=gr.Progress(),
) -> Tuple[str, Dict[str, Any], List[str], Optional[str], Optional[str]]:
"""
Process batch of audio files.
Args:
reference_file: Path to reference audio file
input_files: List of input audio file paths
extraction_mode: Extraction mode (Speech/Nonverbal/Both)
vad_threshold: VAD threshold
voice_threshold: Voice similarity threshold
speech_threshold: Speech classification threshold
enable_vad: Whether to use VAD optimization
progress: Gradio progress tracker
Returns:
Tuple of (status_message, statistics_dict, output_files_list, zip_path, report_path)
"""
# Validate files
is_valid, message = validate_files_handler(reference_file, input_files)
if not is_valid:
return message, {}, [], None, None
try:
# Update progress
progress(0, desc="Initializing...")
# Create temporary output directory
output_dir = Path(tempfile.mkdtemp(prefix="voice_profiler_"))
# Map extraction mode string to enum
mode_map = {
"Speech": ExtractionMode.SPEECH,
"Nonverbal": ExtractionMode.NONVERBAL,
"Both": ExtractionMode.BOTH,
}
extraction_mode_enum = mode_map.get(extraction_mode, ExtractionMode.SPEECH)
# Create processing job
job = ProcessingJob(
reference_file=reference_file,
input_files=[str(f) for f in input_files],
output_dir=str(output_dir),
extraction_mode=extraction_mode_enum,
vad_threshold=vad_threshold,
apply_denoising=False,
)
# Initialize processor
processor = BatchProcessor(
vad_threshold=vad_threshold,
voice_similarity_threshold=voice_threshold,
speech_confidence_threshold=speech_threshold,
enable_vad=enable_vad,
)
# Process batch with progress updates
progress(0.1, desc="Extracting voice profile...")
# TODO: Add progress callbacks for file-by-file updates
# For now, just process the batch
job = processor.process_batch(job)
progress(0.9, desc="Finalizing results...")
# Get results
summary = job.get_summary()
# Collect output files
output_files = list(output_dir.glob("*.m4a"))
output_file_paths = [str(f) for f in output_files]
# Create ZIP file if multiple outputs
zip_path = None
if len(output_files) > 1:
zip_path = output_dir / "voice_extraction_results.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for output_file in output_files:
zipf.write(output_file, output_file.name)
zip_path = str(zip_path)
# Generate report
report_content = job.generate_report()
report_path = output_dir / "extraction_report.txt"
report_path.write_text(report_content)
report_path = str(report_path)
progress(1.0, desc="Complete!")
# Format status message
status = (
f"βœ… Extraction Complete!\n\n"
f"πŸ“Š Processed: {summary['files_processed']} files\n"
f"βœ… Success: {summary['files_processed'] - summary['files_failed']}\n"
f"❌ Failed: {summary['files_failed']}\n"
f"πŸ“ Segments: {len(output_files)}\n"
f"⏱️ Time: {summary['total_processing_time'] / 60:.1f} minutes\n"
f"πŸ“ˆ Yield: {summary['extraction_yield']:.1f}%"
)
if summary["files_failed"] > 0:
status += f"\n\n⚠️ Some files failed to process. Check the report for details."
return status, summary, output_file_paths, zip_path, report_path
except Exception as e:
logger.exception("Processing failed")
error_report: ErrorReport = {
"status": "failed",
"error": f"Processing failed: {str(e)}",
"error_type": "processing",
}
error_msg = f"❌ {error_report['error']}\n\nCheck the logs for more details."
return error_msg, {}, [], None, None