Tadeas Kosek
add audio splitting
8b33160
"""FFmpeg service implementation."""
import asyncio
import subprocess
from pathlib import Path
from typing import Dict, Optional, Any
import logging
import json
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class FFmpegResult:
"""Result from FFmpeg operation."""
success: bool
output_path: Optional[str] = None
duration: Optional[float] = None
error: Optional[str] = None
metadata: Dict[str, Any] = None
class FFmpegService:
"""Service for audio extraction using FFmpeg."""
def __init__(self, ffmpeg_path: str, quality_presets: Dict[str, Dict[str, Dict]],
timeout_seconds: int = 1800):
self.ffmpeg_path = ffmpeg_path
self.quality_presets = quality_presets
self.timeout_seconds = timeout_seconds
async def extract_audio(self, input_path: str, output_path: str,
format: str, quality: str) -> FFmpegResult:
"""Extract audio from video file."""
try:
# Get quality settings
preset = self.quality_presets.get(format, {}).get(quality, {})
# Build FFmpeg command
cmd = self._build_command(input_path, output_path, format, preset)
logger.info(f"Running FFmpeg command: {' '.join(cmd)}")
# Run FFmpeg
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout_seconds
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return FFmpegResult(
success=False,
error=f"FFmpeg timeout after {self.timeout_seconds} seconds"
)
if process.returncode == 0:
# Extract duration from stderr
duration = self._extract_duration(stderr.decode())
return FFmpegResult(
success=True,
output_path=output_path,
duration=duration,
metadata={
"format": format,
"quality": quality,
"preset": preset
}
)
else:
return FFmpegResult(
success=False,
error=f"FFmpeg failed: {stderr.decode()}"
)
except Exception as e:
logger.error(f"FFmpeg error: {str(e)}")
return FFmpegResult(
success=False,
error=str(e)
)
async def trim_audio(self, input_path: str, output_path: str,
start_seconds: Optional[float] = None,
end_seconds: Optional[float] = None) -> FFmpegResult:
"""Trim audio file to specified time range.
Args:
input_path: Path to input audio file
output_path: Path for output trimmed file
start_seconds: Start time in seconds (None = start from beginning)
end_seconds: End time in seconds (None = continue to end)
Returns:
FFmpegResult with success status and details
"""
try:
# Build trim command
cmd = self._build_trim_command(input_path, output_path, start_seconds, end_seconds)
logger.info(f"Running FFmpeg trim command: {' '.join(cmd)}")
# Run FFmpeg
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout_seconds
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return FFmpegResult(
success=False,
error=f"FFmpeg trim timeout after {self.timeout_seconds} seconds"
)
if process.returncode == 0:
# Extract duration from stderr
duration = self._extract_duration(stderr.decode())
return FFmpegResult(
success=True,
output_path=output_path,
duration=duration,
metadata={
"operation": "trim",
"start_seconds": start_seconds,
"end_seconds": end_seconds
}
)
else:
return FFmpegResult(
success=False,
error=f"FFmpeg trim failed: {stderr.decode()}"
)
except Exception as e:
logger.error(f"FFmpeg trim error: {str(e)}")
return FFmpegResult(
success=False,
error=str(e)
)
async def get_media_info(self, file_path: str) -> Dict[str, Any]:
"""Get media file information using ffprobe."""
try:
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
file_path
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
if process.returncode == 0:
return json.loads(stdout.decode())
else:
return {}
except Exception as e:
logger.error(f"ffprobe error: {str(e)}")
return {}
def _build_command(self, input_path: str, output_path: str,
format: str, preset: Dict) -> list:
"""Build FFmpeg command based on format and preset."""
cmd = [
self.ffmpeg_path,
'-i', input_path,
'-vn', # No video
'-y' # Overwrite output
]
# Add codec
if 'codec' in preset:
cmd.extend(['-acodec', preset['codec']])
# Add bitrate
if 'bitrate' in preset:
cmd.extend(['-b:a', preset['bitrate']])
# Add compression level (for FLAC)
if 'compression_level' in preset:
cmd.extend(['-compression_level', str(preset['compression_level'])])
# Add output file
cmd.append(output_path)
return cmd
def _build_trim_command(self, input_path: str, output_path: str,
start_seconds: Optional[float],
end_seconds: Optional[float]) -> list:
"""Build FFmpeg command for audio trimming."""
cmd = [
self.ffmpeg_path,
'-i', input_path
]
# Add start time if specified
if start_seconds is not None:
cmd.extend(['-ss', str(start_seconds)])
# Add end time or duration if specified
if end_seconds is not None:
if start_seconds is not None:
# Calculate duration
duration = end_seconds - start_seconds
cmd.extend(['-t', str(duration)])
else:
# Use -to for end time from beginning
cmd.extend(['-to', str(end_seconds)])
# Copy streams without re-encoding for faster processing
cmd.extend([
'-c', 'copy', # Copy codec (no re-encoding)
'-y', # Overwrite output
output_path
])
return cmd
def _extract_duration(self, stderr_output: str) -> Optional[float]:
"""Extract duration from FFmpeg stderr output."""
import re
# Look for Duration: HH:MM:SS.ms
duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2})\.(\d{2})', stderr_output)
if duration_match:
hours = int(duration_match.group(1))
minutes = int(duration_match.group(2))
seconds = int(duration_match.group(3))
centiseconds = int(duration_match.group(4))
total_seconds = hours * 3600 + minutes * 60 + seconds + centiseconds / 100
return total_seconds
return None