audio-processor / domain /services /audio_extraction_service.py
Tadeas Kosek
apply generated DDD abstraction
92fd1a7
"""Domain service for audio extraction."""
from abc import ABC, abstractmethod
from typing import Protocol, Optional
from dataclasses import dataclass
from ..entities.video import Video
from ..entities.audio import Audio
from ..entities.job import Job
from ..value_objects.audio_format import AudioFormat
from ..value_objects.audio_quality import AudioQuality
@dataclass
class ExtractionResult:
"""Result of audio extraction."""
audio: Audio
processing_time: float
metadata: Optional[dict] = None
class AudioExtractionService(ABC):
"""Abstract domain service for audio extraction."""
@abstractmethod
async def extract_audio(self, video: Video, format: AudioFormat,
quality: AudioQuality) -> ExtractionResult:
"""Extract audio from video with specified format and quality."""
pass
@abstractmethod
async def supports_format(self, format: AudioFormat) -> bool:
"""Check if the service supports the given audio format."""
pass
@abstractmethod
async def estimate_processing_time(self, video: Video, format: AudioFormat) -> float:
"""Estimate processing time in seconds."""
pass
class JobService(Protocol):
"""Protocol for job-related operations."""
async def create_job(self, video: Video, format: AudioFormat,
quality: AudioQuality) -> Job:
"""Create a new extraction job."""
...
async def get_job(self, job_id: str) -> Optional[Job]:
"""Get job by ID."""
...
async def update_job_status(self, job: Job) -> None:
"""Update job status."""
...