| """FFmpeg service implementation.""" |
| import asyncio |
| import subprocess |
| from pathlib import Path |
| from typing import Dict, Optional, Any |
| import logging |
| import json |
| from dataclasses import dataclass |
|
|
| logger = logging.getLogger(__name__) |
|
|
| @dataclass |
| class FFmpegResult: |
| """Result from FFmpeg operation.""" |
| success: bool |
| output_path: Optional[str] = None |
| duration: Optional[float] = None |
| error: Optional[str] = None |
| metadata: Dict[str, Any] = None |
|
|
| class FFmpegService: |
| """Service for audio extraction using FFmpeg.""" |
| |
| def __init__(self, ffmpeg_path: str, quality_presets: Dict[str, Dict[str, Dict]], |
| timeout_seconds: int = 1800): |
| self.ffmpeg_path = ffmpeg_path |
| self.quality_presets = quality_presets |
| self.timeout_seconds = timeout_seconds |
| |
| async def extract_audio(self, input_path: str, output_path: str, |
| format: str, quality: str) -> FFmpegResult: |
| """Extract audio from video file.""" |
| try: |
| |
| preset = self.quality_presets.get(format, {}).get(quality, {}) |
| |
| |
| cmd = self._build_command(input_path, output_path, format, preset) |
| |
| logger.info(f"Running FFmpeg command: {' '.join(cmd)}") |
| |
| |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| |
| try: |
| stdout, stderr = await asyncio.wait_for( |
| process.communicate(), |
| timeout=self.timeout_seconds |
| ) |
| except asyncio.TimeoutError: |
| process.kill() |
| await process.wait() |
| return FFmpegResult( |
| success=False, |
| error=f"FFmpeg timeout after {self.timeout_seconds} seconds" |
| ) |
| |
| if process.returncode == 0: |
| |
| duration = self._extract_duration(stderr.decode()) |
| |
| return FFmpegResult( |
| success=True, |
| output_path=output_path, |
| duration=duration, |
| metadata={ |
| "format": format, |
| "quality": quality, |
| "preset": preset |
| } |
| ) |
| else: |
| return FFmpegResult( |
| success=False, |
| error=f"FFmpeg failed: {stderr.decode()}" |
| ) |
| |
| except Exception as e: |
| logger.error(f"FFmpeg error: {str(e)}") |
| return FFmpegResult( |
| success=False, |
| error=str(e) |
| ) |
| |
| async def trim_audio(self, input_path: str, output_path: str, |
| start_seconds: Optional[float] = None, |
| end_seconds: Optional[float] = None) -> FFmpegResult: |
| """Trim audio file to specified time range. |
| |
| Args: |
| input_path: Path to input audio file |
| output_path: Path for output trimmed file |
| start_seconds: Start time in seconds (None = start from beginning) |
| end_seconds: End time in seconds (None = continue to end) |
| |
| Returns: |
| FFmpegResult with success status and details |
| """ |
| try: |
| |
| cmd = self._build_trim_command(input_path, output_path, start_seconds, end_seconds) |
| |
| logger.info(f"Running FFmpeg trim command: {' '.join(cmd)}") |
| |
| |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| |
| try: |
| stdout, stderr = await asyncio.wait_for( |
| process.communicate(), |
| timeout=self.timeout_seconds |
| ) |
| except asyncio.TimeoutError: |
| process.kill() |
| await process.wait() |
| return FFmpegResult( |
| success=False, |
| error=f"FFmpeg trim timeout after {self.timeout_seconds} seconds" |
| ) |
| |
| if process.returncode == 0: |
| |
| duration = self._extract_duration(stderr.decode()) |
| |
| return FFmpegResult( |
| success=True, |
| output_path=output_path, |
| duration=duration, |
| metadata={ |
| "operation": "trim", |
| "start_seconds": start_seconds, |
| "end_seconds": end_seconds |
| } |
| ) |
| else: |
| return FFmpegResult( |
| success=False, |
| error=f"FFmpeg trim failed: {stderr.decode()}" |
| ) |
| |
| except Exception as e: |
| logger.error(f"FFmpeg trim error: {str(e)}") |
| return FFmpegResult( |
| success=False, |
| error=str(e) |
| ) |
| |
| async def get_media_info(self, file_path: str) -> Dict[str, Any]: |
| """Get media file information using ffprobe.""" |
| try: |
| cmd = [ |
| 'ffprobe', |
| '-v', 'quiet', |
| '-print_format', 'json', |
| '-show_format', |
| '-show_streams', |
| file_path |
| ] |
| |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| |
| stdout, _ = await process.communicate() |
| |
| if process.returncode == 0: |
| return json.loads(stdout.decode()) |
| else: |
| return {} |
| |
| except Exception as e: |
| logger.error(f"ffprobe error: {str(e)}") |
| return {} |
| |
| def _build_command(self, input_path: str, output_path: str, |
| format: str, preset: Dict) -> list: |
| """Build FFmpeg command based on format and preset.""" |
| cmd = [ |
| self.ffmpeg_path, |
| '-i', input_path, |
| '-vn', |
| '-y' |
| ] |
| |
| |
| if 'codec' in preset: |
| cmd.extend(['-acodec', preset['codec']]) |
| |
| |
| if 'bitrate' in preset: |
| cmd.extend(['-b:a', preset['bitrate']]) |
| |
| |
| if 'compression_level' in preset: |
| cmd.extend(['-compression_level', str(preset['compression_level'])]) |
| |
| |
| cmd.append(output_path) |
| |
| return cmd |
| |
| def _build_trim_command(self, input_path: str, output_path: str, |
| start_seconds: Optional[float], |
| end_seconds: Optional[float]) -> list: |
| """Build FFmpeg command for audio trimming.""" |
| cmd = [ |
| self.ffmpeg_path, |
| '-i', input_path |
| ] |
| |
| |
| if start_seconds is not None: |
| cmd.extend(['-ss', str(start_seconds)]) |
| |
| |
| if end_seconds is not None: |
| if start_seconds is not None: |
| |
| duration = end_seconds - start_seconds |
| cmd.extend(['-t', str(duration)]) |
| else: |
| |
| cmd.extend(['-to', str(end_seconds)]) |
| |
| |
| cmd.extend([ |
| '-c', 'copy', |
| '-y', |
| output_path |
| ]) |
| |
| return cmd |
| |
| def _extract_duration(self, stderr_output: str) -> Optional[float]: |
| """Extract duration from FFmpeg stderr output.""" |
| import re |
| |
| |
| duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2})\.(\d{2})', stderr_output) |
| if duration_match: |
| hours = int(duration_match.group(1)) |
| minutes = int(duration_match.group(2)) |
| seconds = int(duration_match.group(3)) |
| centiseconds = int(duration_match.group(4)) |
| |
| total_seconds = hours * 3600 + minutes * 60 + seconds + centiseconds / 100 |
| return total_seconds |
| |
| return None |