| | """ |
| | Audio Processing Module |
| | Handles audio extraction, processing, and integration with FFmpeg operations |
| | """ |
| |
|
| | import os |
| | import subprocess |
| | import tempfile |
| | import logging |
| | import time |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any, List, Tuple |
| |
|
| | |
| | from core.exceptions import AudioProcessingError |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class AudioProcessor: |
| | """ |
| | Comprehensive audio processing for video background replacement |
| | """ |
| | |
| | def __init__(self, temp_dir: Optional[str] = None): |
| | self.temp_dir = temp_dir or tempfile.gettempdir() |
| | self.ffmpeg_available = self._check_ffmpeg_availability() |
| | self.ffprobe_available = self._check_ffprobe_availability() |
| | |
| | |
| | self.stats = { |
| | 'audio_extractions': 0, |
| | 'audio_merges': 0, |
| | 'total_processing_time': 0.0, |
| | 'failed_operations': 0 |
| | } |
| | |
| | if not self.ffmpeg_available: |
| | logger.warning("FFmpeg not available - audio processing will be limited") |
| | |
| | logger.info(f"AudioProcessor initialized (FFmpeg: {self.ffmpeg_available}, FFprobe: {self.ffprobe_available})") |
| | |
| | def _check_ffmpeg_availability(self) -> bool: |
| | """Check if FFmpeg is available on the system""" |
| | try: |
| | result = subprocess.run( |
| | ['ffmpeg', '-version'], |
| | capture_output=True, |
| | text=True, |
| | timeout=10 |
| | ) |
| | return result.returncode == 0 |
| | except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): |
| | return False |
| | |
| | def _check_ffprobe_availability(self) -> bool: |
| | """Check if FFprobe is available on the system""" |
| | try: |
| | result = subprocess.run( |
| | ['ffprobe', '-version'], |
| | capture_output=True, |
| | text=True, |
| | timeout=10 |
| | ) |
| | return result.returncode == 0 |
| | except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): |
| | return False |
| | |
| | def get_audio_info(self, video_path: str) -> Dict[str, Any]: |
| | """ |
| | Get comprehensive audio information from video file |
| | |
| | Args: |
| | video_path: Path to the video file |
| | |
| | Returns: |
| | Dictionary containing audio information |
| | """ |
| | if not self.ffprobe_available: |
| | return {'has_audio': False, 'error': 'FFprobe not available'} |
| | |
| | try: |
| | |
| | result = subprocess.run([ |
| | 'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', |
| | '-show_entries', 'stream=codec_name,sample_rate,channels,duration,bit_rate', |
| | '-of', 'csv=p=0', video_path |
| | ], capture_output=True, text=True, timeout=30) |
| | |
| | if result.returncode != 0: |
| | return { |
| | 'has_audio': False, |
| | 'error': 'No audio stream found', |
| | 'ffprobe_error': result.stderr |
| | } |
| | |
| | |
| | audio_data = result.stdout.strip().split(',') |
| | |
| | if len(audio_data) >= 1 and audio_data[0]: |
| | info = { |
| | 'has_audio': True, |
| | 'codec': audio_data[0] if len(audio_data) > 0 else 'unknown', |
| | 'sample_rate': audio_data[1] if len(audio_data) > 1 else 'unknown', |
| | 'channels': audio_data[2] if len(audio_data) > 2 else 'unknown', |
| | 'duration': audio_data[3] if len(audio_data) > 3 else 'unknown', |
| | 'bit_rate': audio_data[4] if len(audio_data) > 4 else 'unknown' |
| | } |
| | |
| | |
| | try: |
| | if info['sample_rate'] != 'unknown': |
| | info['sample_rate'] = int(info['sample_rate']) |
| | if info['channels'] != 'unknown': |
| | info['channels'] = int(info['channels']) |
| | if info['duration'] != 'unknown': |
| | info['duration'] = float(info['duration']) |
| | if info['bit_rate'] != 'unknown': |
| | info['bit_rate'] = int(info['bit_rate']) |
| | except ValueError: |
| | pass |
| | |
| | return info |
| | else: |
| | return {'has_audio': False, 'error': 'Audio stream data empty'} |
| | |
| | except subprocess.TimeoutExpired: |
| | return {'has_audio': False, 'error': 'FFprobe timeout'} |
| | except Exception as e: |
| | logger.error(f"Error getting audio info: {e}") |
| | return {'has_audio': False, 'error': str(e)} |
| | |
| | def extract_audio(self, video_path: str, output_path: Optional[str] = None, |
| | audio_format: str = 'aac', quality: str = 'high') -> Optional[str]: |
| | """ |
| | Extract audio from video file |
| | |
| | Args: |
| | video_path: Path to input video |
| | output_path: Output path for audio (auto-generated if None) |
| | audio_format: Output audio format (aac, mp3, wav) |
| | quality: Audio quality (low, medium, high) |
| | |
| | Returns: |
| | Path to extracted audio file or None if failed |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("extract", "FFmpeg not available", video_path) |
| | |
| | start_time = time.time() |
| | |
| | try: |
| | |
| | audio_info = self.get_audio_info(video_path) |
| | if not audio_info.get('has_audio', False): |
| | logger.info(f"No audio found in {video_path}") |
| | return None |
| | |
| | |
| | if output_path is None: |
| | timestamp = int(time.time()) |
| | output_path = os.path.join( |
| | self.temp_dir, |
| | f"extracted_audio_{timestamp}.{audio_format}" |
| | ) |
| | |
| | |
| | quality_settings = { |
| | 'low': {'aac': ['-b:a', '96k'], 'mp3': ['-b:a', '128k'], 'wav': []}, |
| | 'medium': {'aac': ['-b:a', '192k'], 'mp3': ['-b:a', '192k'], 'wav': []}, |
| | 'high': {'aac': ['-b:a', '320k'], 'mp3': ['-b:a', '320k'], 'wav': []} |
| | } |
| | |
| | codec_settings = { |
| | 'aac': ['-c:a', 'aac'], |
| | 'mp3': ['-c:a', 'libmp3lame'], |
| | 'wav': ['-c:a', 'pcm_s16le'] |
| | } |
| | |
| | |
| | cmd = ['ffmpeg', '-y', '-i', video_path] |
| | cmd.extend(codec_settings.get(audio_format, ['-c:a', 'aac'])) |
| | cmd.extend(quality_settings.get(quality, {}).get(audio_format, [])) |
| | cmd.extend(['-vn', output_path]) |
| | |
| | |
| | result = subprocess.run( |
| | cmd, |
| | capture_output=True, |
| | text=True, |
| | timeout=300 |
| | ) |
| | |
| | if result.returncode != 0: |
| | raise AudioProcessingError( |
| | "extract", |
| | f"FFmpeg failed: {result.stderr}", |
| | video_path, |
| | output_path |
| | ) |
| | |
| | if not os.path.exists(output_path): |
| | raise AudioProcessingError( |
| | "extract", |
| | "Output audio file was not created", |
| | video_path, |
| | output_path |
| | ) |
| | |
| | |
| | processing_time = time.time() - start_time |
| | self.stats['audio_extractions'] += 1 |
| | self.stats['total_processing_time'] += processing_time |
| | |
| | logger.info(f"Audio extracted successfully in {processing_time:.1f}s: {output_path}") |
| | return output_path |
| | |
| | except subprocess.TimeoutExpired: |
| | self.stats['failed_operations'] += 1 |
| | raise AudioProcessingError("extract", "FFmpeg timeout during extraction", video_path) |
| | except Exception as e: |
| | self.stats['failed_operations'] += 1 |
| | if isinstance(e, AudioProcessingError): |
| | raise |
| | else: |
| | raise AudioProcessingError("extract", f"Unexpected error: {str(e)}", video_path) |
| | |
| | def add_audio_to_video(self, original_video: str, processed_video: str, |
| | output_path: Optional[str] = None, |
| | audio_quality: str = 'high') -> str: |
| | """ |
| | Add audio from original video to processed video |
| | |
| | Args: |
| | original_video: Path to original video with audio |
| | processed_video: Path to processed video without audio |
| | output_path: Output path (auto-generated if None) |
| | audio_quality: Audio quality setting |
| | |
| | Returns: |
| | Path to final video with audio |
| | """ |
| | if not self.ffmpeg_available: |
| | logger.warning("FFmpeg not available - returning processed video without audio") |
| | return processed_video |
| | |
| | start_time = time.time() |
| | |
| | try: |
| | |
| | audio_info = self.get_audio_info(original_video) |
| | if not audio_info.get('has_audio', False): |
| | logger.info("Original video has no audio - returning processed video") |
| | return processed_video |
| | |
| | |
| | if output_path is None: |
| | timestamp = int(time.time()) |
| | output_path = os.path.join( |
| | self.temp_dir, |
| | f"final_with_audio_{timestamp}.mp4" |
| | ) |
| | |
| | |
| | quality_settings = { |
| | 'low': ['-b:a', '96k'], |
| | 'medium': ['-b:a', '192k'], |
| | 'high': ['-b:a', '320k'] |
| | } |
| | |
| | |
| | cmd = [ |
| | 'ffmpeg', '-y', |
| | '-i', processed_video, |
| | '-i', original_video, |
| | '-c:v', 'copy', |
| | '-c:a', 'aac', |
| | ] |
| | |
| | |
| | cmd.extend(quality_settings.get(audio_quality, quality_settings['high'])) |
| | |
| | |
| | cmd.extend([ |
| | '-map', '0:v:0', |
| | '-map', '1:a:0', |
| | '-shortest', |
| | output_path |
| | ]) |
| | |
| | |
| | result = subprocess.run( |
| | cmd, |
| | capture_output=True, |
| | text=True, |
| | timeout=600 |
| | ) |
| | |
| | if result.returncode != 0: |
| | logger.warning(f"Audio merge failed: {result.stderr}") |
| | logger.warning("Returning processed video without audio") |
| | return processed_video |
| | |
| | if not os.path.exists(output_path): |
| | logger.warning("Output video with audio was not created") |
| | return processed_video |
| | |
| | |
| | if os.path.getsize(output_path) == 0: |
| | logger.warning("Output video file is empty") |
| | try: |
| | os.remove(output_path) |
| | except: |
| | pass |
| | return processed_video |
| | |
| | |
| | try: |
| | if output_path != processed_video: |
| | os.remove(processed_video) |
| | logger.debug("Cleaned up intermediate processed video") |
| | except Exception as e: |
| | logger.warning(f"Could not clean up intermediate file: {e}") |
| | |
| | |
| | processing_time = time.time() - start_time |
| | self.stats['audio_merges'] += 1 |
| | self.stats['total_processing_time'] += processing_time |
| | |
| | logger.info(f"Audio merged successfully in {processing_time:.1f}s: {output_path}") |
| | return output_path |
| | |
| | except subprocess.TimeoutExpired: |
| | self.stats['failed_operations'] += 1 |
| | logger.warning("Audio merge timeout - returning processed video without audio") |
| | return processed_video |
| | except Exception as e: |
| | self.stats['failed_operations'] += 1 |
| | logger.warning(f"Audio merge error: {e} - returning processed video without audio") |
| | return processed_video |
| | |
| | def sync_audio_video(self, video_path: str, audio_path: str, |
| | output_path: str, offset_ms: float = 0.0) -> bool: |
| | """ |
| | Synchronize separate audio and video files |
| | |
| | Args: |
| | video_path: Path to video file |
| | audio_path: Path to audio file |
| | output_path: Output path for synchronized file |
| | offset_ms: Audio offset in milliseconds (positive = delay audio) |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("sync", "FFmpeg not available") |
| | |
| | try: |
| | cmd = ['ffmpeg', '-y', '-i', video_path, '-i', audio_path] |
| | |
| | |
| | if offset_ms != 0.0: |
| | offset_seconds = offset_ms / 1000.0 |
| | cmd.extend(['-itsoffset', str(offset_seconds)]) |
| | |
| | cmd.extend([ |
| | '-c:v', 'copy', |
| | '-c:a', 'aac', |
| | '-b:a', '192k', |
| | '-shortest', |
| | output_path |
| | ]) |
| | |
| | result = subprocess.run( |
| | cmd, |
| | capture_output=True, |
| | text=True, |
| | timeout=600 |
| | ) |
| | |
| | if result.returncode != 0: |
| | raise AudioProcessingError( |
| | "sync", |
| | f"Synchronization failed: {result.stderr}", |
| | video_path |
| | ) |
| | |
| | return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
| | |
| | except subprocess.TimeoutExpired: |
| | raise AudioProcessingError("sync", "Synchronization timeout", video_path) |
| | except Exception as e: |
| | if isinstance(e, AudioProcessingError): |
| | raise |
| | else: |
| | raise AudioProcessingError("sync", f"Unexpected error: {str(e)}", video_path) |
| | |
| | def adjust_audio_levels(self, input_path: str, output_path: str, |
| | volume_factor: float = 1.0, normalize: bool = False) -> bool: |
| | """ |
| | Adjust audio levels in a video file |
| | |
| | Args: |
| | input_path: Input video path |
| | output_path: Output video path |
| | volume_factor: Volume multiplication factor (1.0 = no change) |
| | normalize: Whether to normalize audio levels |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("adjust_levels", "FFmpeg not available") |
| | |
| | try: |
| | cmd = ['ffmpeg', '-y', '-i', input_path, '-c:v', 'copy'] |
| | |
| | |
| | audio_filters = [] |
| | |
| | if volume_factor != 1.0: |
| | audio_filters.append(f"volume={volume_factor}") |
| | |
| | if normalize: |
| | audio_filters.append("loudnorm") |
| | |
| | if audio_filters: |
| | cmd.extend(['-af', ','.join(audio_filters)]) |
| | |
| | cmd.extend(['-c:a', 'aac', '-b:a', '192k', output_path]) |
| | |
| | result = subprocess.run( |
| | cmd, |
| | capture_output=True, |
| | text=True, |
| | timeout=600 |
| | ) |
| | |
| | if result.returncode != 0: |
| | raise AudioProcessingError( |
| | "adjust_levels", |
| | f"Level adjustment failed: {result.stderr}", |
| | input_path |
| | ) |
| | |
| | return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
| | |
| | except Exception as e: |
| | if isinstance(e, AudioProcessingError): |
| | raise |
| | else: |
| | raise AudioProcessingError("adjust_levels", f"Unexpected error: {str(e)}", input_path) |
| | |
| | def get_supported_formats(self) -> Dict[str, List[str]]: |
| | """Get supported audio and video formats""" |
| | if not self.ffmpeg_available: |
| | return {'audio': [], 'video': []} |
| | |
| | try: |
| | |
| | result = subprocess.run( |
| | ['ffmpeg', '-formats'], |
| | capture_output=True, |
| | text=True, |
| | timeout=30 |
| | ) |
| | |
| | if result.returncode != 0: |
| | return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} |
| | |
| | |
| | lines = result.stdout.split('\n') |
| | audio_formats = [] |
| | video_formats = [] |
| | |
| | for line in lines: |
| | if 'aac' in line.lower(): |
| | audio_formats.append('aac') |
| | elif 'mp3' in line.lower(): |
| | audio_formats.append('mp3') |
| | elif 'wav' in line.lower(): |
| | audio_formats.append('wav') |
| | elif 'mp4' in line.lower(): |
| | video_formats.append('mp4') |
| | elif 'avi' in line.lower(): |
| | video_formats.append('avi') |
| | elif 'mov' in line.lower(): |
| | video_formats.append('mov') |
| | |
| | return { |
| | 'audio': list(set(audio_formats)) or ['aac', 'mp3', 'wav'], |
| | 'video': list(set(video_formats)) or ['mp4', 'avi', 'mov'] |
| | } |
| | |
| | except Exception as e: |
| | logger.warning(f"Could not get supported formats: {e}") |
| | return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} |
| | |
| | def validate_audio_video_compatibility(self, video_path: str, audio_path: str) -> Dict[str, Any]: |
| | """ |
| | Validate compatibility between video and audio files |
| | |
| | Returns: |
| | Dictionary with compatibility information |
| | """ |
| | if not self.ffprobe_available: |
| | return {'compatible': False, 'error': 'FFprobe not available'} |
| | |
| | try: |
| | |
| | video_result = subprocess.run([ |
| | 'ffprobe', '-v', 'quiet', '-select_streams', 'v:0', |
| | '-show_entries', 'stream=duration', '-of', 'csv=p=0', video_path |
| | ], capture_output=True, text=True, timeout=30) |
| | |
| | |
| | audio_result = subprocess.run([ |
| | 'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', |
| | '-show_entries', 'stream=duration', '-of', 'csv=p=0', audio_path |
| | ], capture_output=True, text=True, timeout=30) |
| | |
| | if video_result.returncode != 0 or audio_result.returncode != 0: |
| | return {'compatible': False, 'error': 'Could not read file information'} |
| | |
| | try: |
| | video_duration = float(video_result.stdout.strip()) |
| | audio_duration = float(audio_result.stdout.strip()) |
| | |
| | duration_diff = abs(video_duration - audio_duration) |
| | duration_diff_percent = (duration_diff / max(video_duration, audio_duration)) * 100 |
| | |
| | return { |
| | 'compatible': duration_diff_percent < 5.0, |
| | 'video_duration': video_duration, |
| | 'audio_duration': audio_duration, |
| | 'duration_difference': duration_diff, |
| | 'duration_difference_percent': duration_diff_percent, |
| | 'recommendation': ( |
| | 'Compatible' if duration_diff_percent < 5.0 |
| | else 'Duration mismatch - consider trimming/extending' |
| | ) |
| | } |
| | |
| | except ValueError: |
| | return {'compatible': False, 'error': 'Invalid duration values'} |
| | |
| | except Exception as e: |
| | return {'compatible': False, 'error': str(e)} |
| | |
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get audio processing statistics""" |
| | return { |
| | 'ffmpeg_available': self.ffmpeg_available, |
| | 'ffprobe_available': self.ffprobe_available, |
| | 'audio_extractions': self.stats['audio_extractions'], |
| | 'audio_merges': self.stats['audio_merges'], |
| | 'total_processing_time': self.stats['total_processing_time'], |
| | 'failed_operations': self.stats['failed_operations'], |
| | 'success_rate': ( |
| | (self.stats['audio_extractions'] + self.stats['audio_merges']) / |
| | max(1, self.stats['audio_extractions'] + self.stats['audio_merges'] + self.stats['failed_operations']) |
| | ) * 100 |
| | } |
| | |
| | def cleanup_temp_files(self, max_age_hours: int = 24): |
| | """Clean up temporary audio files older than specified age""" |
| | try: |
| | temp_path = Path(self.temp_dir) |
| | current_time = time.time() |
| | cutoff_time = current_time - (max_age_hours * 3600) |
| | |
| | cleaned_files = 0 |
| | for file_path in temp_path.glob("*audio*.{aac,mp3,wav,mp4}"): |
| | if file_path.stat().st_mtime < cutoff_time: |
| | try: |
| | file_path.unlink() |
| | cleaned_files += 1 |
| | except Exception as e: |
| | logger.warning(f"Could not delete temp file {file_path}: {e}") |
| | |
| | if cleaned_files > 0: |
| | logger.info(f"Cleaned up {cleaned_files} temporary audio files") |
| | |
| | except Exception as e: |
| | logger.warning(f"Error during temp file cleanup: {e}") |