Spaces:
Running
Running
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Tuple, Optional | |
| import ffmpeg | |
| from pydub import AudioSegment | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """Handles audio extraction, conversion, and chunking""" | |
| SUPPORTED_FORMATS = { | |
| 'audio': ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma'], | |
| 'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm'] | |
| } | |
| CHUNK_DURATION_MS = 30 * 60 * 1000 # 30 minutes in milliseconds | |
| OVERLAP_MS = 2000 # 2 second overlap between chunks | |
| def is_supported_file(file_path: str) -> bool: | |
| """Check if file format is supported""" | |
| ext = Path(file_path).suffix.lower() | |
| all_formats = AudioProcessor.SUPPORTED_FORMATS['audio'] + AudioProcessor.SUPPORTED_FORMATS['video'] | |
| return ext in all_formats | |
| def extract_audio(input_file: str, output_format: str = 'wav', progress_callback=None) -> str: | |
| """ | |
| Extract audio from video or convert audio to desired format | |
| Args: | |
| input_file: Path to input file | |
| output_format: Desired output format (wav, mp3) | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| Path to extracted/converted audio file | |
| """ | |
| if progress_callback: | |
| progress_callback("Extracting audio from file...") | |
| output_file = tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix=f'.{output_format}' | |
| ).name | |
| try: | |
| # Use ffmpeg to extract audio | |
| stream = ffmpeg.input(input_file) | |
| stream = ffmpeg.output( | |
| stream, | |
| output_file, | |
| acodec='pcm_s16le' if output_format == 'wav' else 'libmp3lame', | |
| ar='16000', # 16kHz sample rate (Whisper's preference) | |
| ac=1 # Mono channel | |
| ) | |
| ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) | |
| if progress_callback: | |
| progress_callback("Audio extraction complete") | |
| logger.info(f"Audio extracted to: {output_file}") | |
| return output_file | |
| except ffmpeg.Error as e: | |
| logger.error(f"FFmpeg error: {e.stderr.decode()}") | |
| raise Exception(f"Failed to extract audio: {e.stderr.decode()}") | |
| def get_audio_duration(file_path: str) -> float: | |
| """Get audio duration in seconds""" | |
| try: | |
| probe = ffmpeg.probe(file_path) | |
| duration = float(probe['streams'][0]['duration']) | |
| return duration | |
| except Exception as e: | |
| logger.error(f"Failed to get duration: {e}") | |
| # Fallback to pydub | |
| audio = AudioSegment.from_file(file_path) | |
| return len(audio) / 1000.0 | |
| def chunk_audio(file_path: str, progress_callback=None) -> list: | |
| """ | |
| Split audio into chunks for processing large files | |
| Args: | |
| file_path: Path to audio file | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| List of tuples: [(chunk_file_path, start_time_offset), ...] | |
| """ | |
| if progress_callback: | |
| progress_callback("Loading audio file for chunking...") | |
| audio = AudioSegment.from_file(file_path) | |
| duration_ms = len(audio) | |
| # If audio is shorter than chunk duration, return as single chunk | |
| if duration_ms <= AudioProcessor.CHUNK_DURATION_MS: | |
| if progress_callback: | |
| progress_callback("File is small enough, no chunking needed") | |
| return [(file_path, 0.0)] | |
| chunks = [] | |
| chunk_index = 0 | |
| start_ms = 0 | |
| total_chunks = (duration_ms // AudioProcessor.CHUNK_DURATION_MS) + 1 | |
| while start_ms < duration_ms: | |
| if progress_callback: | |
| progress_callback(f"Creating chunk {chunk_index + 1}/{total_chunks}...") | |
| # Calculate end position | |
| end_ms = min(start_ms + AudioProcessor.CHUNK_DURATION_MS, duration_ms) | |
| # Extract chunk | |
| chunk = audio[start_ms:end_ms] | |
| # Save chunk to temporary file | |
| chunk_file = tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix='.wav', | |
| prefix=f'chunk_{chunk_index}_' | |
| ).name | |
| chunk.export(chunk_file, format='wav') | |
| # Store chunk with its time offset in seconds | |
| chunks.append((chunk_file, start_ms / 1000.0)) | |
| logger.info(f"Created chunk {chunk_index}: {start_ms/1000:.2f}s - {end_ms/1000:.2f}s") | |
| # Move to next chunk with overlap | |
| start_ms += AudioProcessor.CHUNK_DURATION_MS - AudioProcessor.OVERLAP_MS | |
| chunk_index += 1 | |
| if progress_callback: | |
| progress_callback(f"Created {len(chunks)} chunks for processing") | |
| return chunks | |
| def cleanup_temp_files(*file_paths): | |
| """Clean up temporary files""" | |
| for file_path in file_paths: | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"Cleaned up: {file_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up {file_path}: {e}") | |
| def get_file_size_mb(file_path: str) -> float: | |
| """Get file size in MB""" | |
| size_bytes = os.path.getsize(file_path) | |
| return size_bytes / (1024 * 1024) | |