Spaces:
Sleeping
Sleeping
| """FFmpeg service implementation.""" | |
| import asyncio | |
| import subprocess | |
| from pathlib import Path | |
| from typing import Dict, Optional, Any | |
| import logging | |
| import json | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| class FFmpegResult: | |
| """Result from FFmpeg operation.""" | |
| success: bool | |
| output_path: Optional[str] = None | |
| duration: Optional[float] = None | |
| error: Optional[str] = None | |
| metadata: Dict[str, Any] = None | |
| class FFmpegService: | |
| """Service for audio extraction using FFmpeg.""" | |
| def __init__(self, ffmpeg_path: str, quality_presets: Dict[str, Dict[str, Dict]], | |
| timeout_seconds: int = 1800): | |
| self.ffmpeg_path = ffmpeg_path | |
| self.quality_presets = quality_presets | |
| self.timeout_seconds = timeout_seconds | |
| async def extract_audio(self, input_path: str, output_path: str, | |
| format: str, quality: str) -> FFmpegResult: | |
| """Extract audio from video file.""" | |
| try: | |
| # Get quality settings | |
| preset = self.quality_presets.get(format, {}).get(quality, {}) | |
| # Build FFmpeg command | |
| cmd = self._build_command(input_path, output_path, format, preset) | |
| logger.info(f"Running FFmpeg command: {' '.join(cmd)}") | |
| # Run FFmpeg | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=self.timeout_seconds | |
| ) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| await process.wait() | |
| return FFmpegResult( | |
| success=False, | |
| error=f"FFmpeg timeout after {self.timeout_seconds} seconds" | |
| ) | |
| if process.returncode == 0: | |
| # Extract duration from stderr | |
| duration = self._extract_duration(stderr.decode()) | |
| return FFmpegResult( | |
| success=True, | |
| output_path=output_path, | |
| duration=duration, | |
| metadata={ | |
| "format": format, | |
| "quality": quality, | |
| "preset": preset | |
| } | |
| ) | |
| else: | |
| return FFmpegResult( | |
| success=False, | |
| error=f"FFmpeg failed: {stderr.decode()}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"FFmpeg error: {str(e)}") | |
| return FFmpegResult( | |
| success=False, | |
| error=str(e) | |
| ) | |
| async def trim_audio(self, input_path: str, output_path: str, | |
| start_seconds: Optional[float] = None, | |
| end_seconds: Optional[float] = None) -> FFmpegResult: | |
| """Trim audio file to specified time range. | |
| Args: | |
| input_path: Path to input audio file | |
| output_path: Path for output trimmed file | |
| start_seconds: Start time in seconds (None = start from beginning) | |
| end_seconds: End time in seconds (None = continue to end) | |
| Returns: | |
| FFmpegResult with success status and details | |
| """ | |
| try: | |
| # Build trim command | |
| cmd = self._build_trim_command(input_path, output_path, start_seconds, end_seconds) | |
| logger.info(f"Running FFmpeg trim command: {' '.join(cmd)}") | |
| # Run FFmpeg | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=self.timeout_seconds | |
| ) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| await process.wait() | |
| return FFmpegResult( | |
| success=False, | |
| error=f"FFmpeg trim timeout after {self.timeout_seconds} seconds" | |
| ) | |
| if process.returncode == 0: | |
| # Extract duration from stderr | |
| duration = self._extract_duration(stderr.decode()) | |
| return FFmpegResult( | |
| success=True, | |
| output_path=output_path, | |
| duration=duration, | |
| metadata={ | |
| "operation": "trim", | |
| "start_seconds": start_seconds, | |
| "end_seconds": end_seconds | |
| } | |
| ) | |
| else: | |
| return FFmpegResult( | |
| success=False, | |
| error=f"FFmpeg trim failed: {stderr.decode()}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"FFmpeg trim error: {str(e)}") | |
| return FFmpegResult( | |
| success=False, | |
| error=str(e) | |
| ) | |
| async def get_media_info(self, file_path: str) -> Dict[str, Any]: | |
| """Get media file information using ffprobe.""" | |
| try: | |
| cmd = [ | |
| 'ffprobe', | |
| '-v', 'quiet', | |
| '-print_format', 'json', | |
| '-show_format', | |
| '-show_streams', | |
| file_path | |
| ] | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, _ = await process.communicate() | |
| if process.returncode == 0: | |
| return json.loads(stdout.decode()) | |
| else: | |
| return {} | |
| except Exception as e: | |
| logger.error(f"ffprobe error: {str(e)}") | |
| return {} | |
| def _build_command(self, input_path: str, output_path: str, | |
| format: str, preset: Dict) -> list: | |
| """Build FFmpeg command based on format and preset.""" | |
| cmd = [ | |
| self.ffmpeg_path, | |
| '-i', input_path, | |
| '-vn', # No video | |
| '-y' # Overwrite output | |
| ] | |
| # Add codec | |
| if 'codec' in preset: | |
| cmd.extend(['-acodec', preset['codec']]) | |
| # Add bitrate | |
| if 'bitrate' in preset: | |
| cmd.extend(['-b:a', preset['bitrate']]) | |
| # Add compression level (for FLAC) | |
| if 'compression_level' in preset: | |
| cmd.extend(['-compression_level', str(preset['compression_level'])]) | |
| # Add output file | |
| cmd.append(output_path) | |
| return cmd | |
| def _build_trim_command(self, input_path: str, output_path: str, | |
| start_seconds: Optional[float], | |
| end_seconds: Optional[float]) -> list: | |
| """Build FFmpeg command for audio trimming.""" | |
| cmd = [ | |
| self.ffmpeg_path, | |
| '-i', input_path | |
| ] | |
| # Add start time if specified | |
| if start_seconds is not None: | |
| cmd.extend(['-ss', str(start_seconds)]) | |
| # Add end time or duration if specified | |
| if end_seconds is not None: | |
| if start_seconds is not None: | |
| # Calculate duration | |
| duration = end_seconds - start_seconds | |
| cmd.extend(['-t', str(duration)]) | |
| else: | |
| # Use -to for end time from beginning | |
| cmd.extend(['-to', str(end_seconds)]) | |
| # Copy streams without re-encoding for faster processing | |
| cmd.extend([ | |
| '-c', 'copy', # Copy codec (no re-encoding) | |
| '-y', # Overwrite output | |
| output_path | |
| ]) | |
| return cmd | |
| def _extract_duration(self, stderr_output: str) -> Optional[float]: | |
| """Extract duration from FFmpeg stderr output.""" | |
| import re | |
| # Look for Duration: HH:MM:SS.ms | |
| duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2})\.(\d{2})', stderr_output) | |
| if duration_match: | |
| hours = int(duration_match.group(1)) | |
| minutes = int(duration_match.group(2)) | |
| seconds = int(duration_match.group(3)) | |
| centiseconds = int(duration_match.group(4)) | |
| total_seconds = hours * 3600 + minutes * 60 + seconds + centiseconds / 100 | |
| return total_seconds | |
| return None |