"""FFmpeg service implementation.""" import asyncio import subprocess from pathlib import Path from typing import Dict, Optional, Any import logging import json from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class FFmpegResult: """Result from FFmpeg operation.""" success: bool output_path: Optional[str] = None duration: Optional[float] = None error: Optional[str] = None metadata: Dict[str, Any] = None class FFmpegService: """Service for audio extraction using FFmpeg.""" def __init__(self, ffmpeg_path: str, quality_presets: Dict[str, Dict[str, Dict]], timeout_seconds: int = 1800): self.ffmpeg_path = ffmpeg_path self.quality_presets = quality_presets self.timeout_seconds = timeout_seconds async def extract_audio(self, input_path: str, output_path: str, format: str, quality: str) -> FFmpegResult: """Extract audio from video file.""" try: # Get quality settings preset = self.quality_presets.get(format, {}).get(quality, {}) # Build FFmpeg command cmd = self._build_command(input_path, output_path, format, preset) logger.info(f"Running FFmpeg command: {' '.join(cmd)}") # Run FFmpeg process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout_seconds ) except asyncio.TimeoutError: process.kill() await process.wait() return FFmpegResult( success=False, error=f"FFmpeg timeout after {self.timeout_seconds} seconds" ) if process.returncode == 0: # Extract duration from stderr duration = self._extract_duration(stderr.decode()) return FFmpegResult( success=True, output_path=output_path, duration=duration, metadata={ "format": format, "quality": quality, "preset": preset } ) else: return FFmpegResult( success=False, error=f"FFmpeg failed: {stderr.decode()}" ) except Exception as e: logger.error(f"FFmpeg error: {str(e)}") return FFmpegResult( success=False, error=str(e) ) async def trim_audio(self, input_path: str, output_path: str, start_seconds: Optional[float] = None, end_seconds: Optional[float] = None) -> FFmpegResult: """Trim audio file to specified time range. Args: input_path: Path to input audio file output_path: Path for output trimmed file start_seconds: Start time in seconds (None = start from beginning) end_seconds: End time in seconds (None = continue to end) Returns: FFmpegResult with success status and details """ try: # Build trim command cmd = self._build_trim_command(input_path, output_path, start_seconds, end_seconds) logger.info(f"Running FFmpeg trim command: {' '.join(cmd)}") # Run FFmpeg process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout_seconds ) except asyncio.TimeoutError: process.kill() await process.wait() return FFmpegResult( success=False, error=f"FFmpeg trim timeout after {self.timeout_seconds} seconds" ) if process.returncode == 0: # Extract duration from stderr duration = self._extract_duration(stderr.decode()) return FFmpegResult( success=True, output_path=output_path, duration=duration, metadata={ "operation": "trim", "start_seconds": start_seconds, "end_seconds": end_seconds } ) else: return FFmpegResult( success=False, error=f"FFmpeg trim failed: {stderr.decode()}" ) except Exception as e: logger.error(f"FFmpeg trim error: {str(e)}") return FFmpegResult( success=False, error=str(e) ) async def get_media_info(self, file_path: str) -> Dict[str, Any]: """Get media file information using ffprobe.""" try: cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', file_path ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() if process.returncode == 0: return json.loads(stdout.decode()) else: return {} except Exception as e: logger.error(f"ffprobe error: {str(e)}") return {} def _build_command(self, input_path: str, output_path: str, format: str, preset: Dict) -> list: """Build FFmpeg command based on format and preset.""" cmd = [ self.ffmpeg_path, '-i', input_path, '-vn', # No video '-y' # Overwrite output ] # Add codec if 'codec' in preset: cmd.extend(['-acodec', preset['codec']]) # Add bitrate if 'bitrate' in preset: cmd.extend(['-b:a', preset['bitrate']]) # Add compression level (for FLAC) if 'compression_level' in preset: cmd.extend(['-compression_level', str(preset['compression_level'])]) # Add output file cmd.append(output_path) return cmd def _build_trim_command(self, input_path: str, output_path: str, start_seconds: Optional[float], end_seconds: Optional[float]) -> list: """Build FFmpeg command for audio trimming.""" cmd = [ self.ffmpeg_path, '-i', input_path ] # Add start time if specified if start_seconds is not None: cmd.extend(['-ss', str(start_seconds)]) # Add end time or duration if specified if end_seconds is not None: if start_seconds is not None: # Calculate duration duration = end_seconds - start_seconds cmd.extend(['-t', str(duration)]) else: # Use -to for end time from beginning cmd.extend(['-to', str(end_seconds)]) # Copy streams without re-encoding for faster processing cmd.extend([ '-c', 'copy', # Copy codec (no re-encoding) '-y', # Overwrite output output_path ]) return cmd def _extract_duration(self, stderr_output: str) -> Optional[float]: """Extract duration from FFmpeg stderr output.""" import re # Look for Duration: HH:MM:SS.ms duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2})\.(\d{2})', stderr_output) if duration_match: hours = int(duration_match.group(1)) minutes = int(duration_match.group(2)) seconds = int(duration_match.group(3)) centiseconds = int(duration_match.group(4)) total_seconds = hours * 3600 + minutes * 60 + seconds + centiseconds / 100 return total_seconds return None