import os import tempfile from pathlib import Path from typing import Tuple, Optional import ffmpeg from pydub import AudioSegment import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AudioProcessor: """Handles audio extraction, conversion, and chunking""" SUPPORTED_FORMATS = { 'audio': ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma'], 'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm'] } CHUNK_DURATION_MS = 30 * 60 * 1000 # 30 minutes in milliseconds OVERLAP_MS = 2000 # 2 second overlap between chunks @staticmethod def is_supported_file(file_path: str) -> bool: """Check if file format is supported""" ext = Path(file_path).suffix.lower() all_formats = AudioProcessor.SUPPORTED_FORMATS['audio'] + AudioProcessor.SUPPORTED_FORMATS['video'] return ext in all_formats @staticmethod def extract_audio(input_file: str, output_format: str = 'wav', progress_callback=None) -> str: """ Extract audio from video or convert audio to desired format Args: input_file: Path to input file output_format: Desired output format (wav, mp3) progress_callback: Optional callback for progress updates Returns: Path to extracted/converted audio file """ if progress_callback: progress_callback("Extracting audio from file...") output_file = tempfile.NamedTemporaryFile( delete=False, suffix=f'.{output_format}' ).name try: # Use ffmpeg to extract audio stream = ffmpeg.input(input_file) stream = ffmpeg.output( stream, output_file, acodec='pcm_s16le' if output_format == 'wav' else 'libmp3lame', ar='16000', # 16kHz sample rate (Whisper's preference) ac=1 # Mono channel ) ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) if progress_callback: progress_callback("Audio extraction complete") logger.info(f"Audio extracted to: {output_file}") return output_file except ffmpeg.Error as e: logger.error(f"FFmpeg error: {e.stderr.decode()}") raise Exception(f"Failed to extract audio: {e.stderr.decode()}") @staticmethod def get_audio_duration(file_path: str) -> float: """Get audio duration in seconds""" try: probe = ffmpeg.probe(file_path) duration = float(probe['streams'][0]['duration']) return duration except Exception as e: logger.error(f"Failed to get duration: {e}") # Fallback to pydub audio = AudioSegment.from_file(file_path) return len(audio) / 1000.0 @staticmethod def chunk_audio(file_path: str, progress_callback=None) -> list: """ Split audio into chunks for processing large files Args: file_path: Path to audio file progress_callback: Optional callback for progress updates Returns: List of tuples: [(chunk_file_path, start_time_offset), ...] """ if progress_callback: progress_callback("Loading audio file for chunking...") audio = AudioSegment.from_file(file_path) duration_ms = len(audio) # If audio is shorter than chunk duration, return as single chunk if duration_ms <= AudioProcessor.CHUNK_DURATION_MS: if progress_callback: progress_callback("File is small enough, no chunking needed") return [(file_path, 0.0)] chunks = [] chunk_index = 0 start_ms = 0 total_chunks = (duration_ms // AudioProcessor.CHUNK_DURATION_MS) + 1 while start_ms < duration_ms: if progress_callback: progress_callback(f"Creating chunk {chunk_index + 1}/{total_chunks}...") # Calculate end position end_ms = min(start_ms + AudioProcessor.CHUNK_DURATION_MS, duration_ms) # Extract chunk chunk = audio[start_ms:end_ms] # Save chunk to temporary file chunk_file = tempfile.NamedTemporaryFile( delete=False, suffix='.wav', prefix=f'chunk_{chunk_index}_' ).name chunk.export(chunk_file, format='wav') # Store chunk with its time offset in seconds chunks.append((chunk_file, start_ms / 1000.0)) logger.info(f"Created chunk {chunk_index}: {start_ms/1000:.2f}s - {end_ms/1000:.2f}s") # Move to next chunk with overlap start_ms += AudioProcessor.CHUNK_DURATION_MS - AudioProcessor.OVERLAP_MS chunk_index += 1 if progress_callback: progress_callback(f"Created {len(chunks)} chunks for processing") return chunks @staticmethod def cleanup_temp_files(*file_paths): """Clean up temporary files""" for file_path in file_paths: try: if file_path and os.path.exists(file_path): os.remove(file_path) logger.info(f"Cleaned up: {file_path}") except Exception as e: logger.warning(f"Failed to clean up {file_path}: {e}") @staticmethod def get_file_size_mb(file_path: str) -> float: """Get file size in MB""" size_bytes = os.path.getsize(file_path) return size_bytes / (1024 * 1024)