audio-processor / application /use_cases /extract_audio_async.py
tedowski's picture
n8n-improvements (#1)
dbe78dd verified
"""Use case for asynchronous audio extraction."""
import asyncio
from typing import Protocol, Any
import logging
from application.use_cases.process_job import ProcessJobUseCase
from domain.entities.video import Video
from domain.entities.job import Job
from domain.value_objects.file_size import FileSize
from domain.services.validation_service import ValidationService
from domain.exceptions.domain_exceptions import DuplicateExternalJobIdError
from ..dto.extraction_request import ExtractionRequestDTO
from ..dto.extraction_response import JobCreationDTO
logger = logging.getLogger(__name__)
class JobRepository(Protocol):
"""Protocol for job repository."""
async def create(self, job_id: str, filename: str, file_size_mb: float,
output_format: str, quality: str) -> Any:
...
async def update_status(self, job_id: str, status: str,
error: str = None, output_path: str = None,
processing_time: float = None) -> Any:
...
class BackgroundTaskRunner(Protocol):
"""Protocol for background task execution."""
def add_task(self, func, *args, **kwargs):
...
class ExtractAudioAsyncUseCase:
"""Use case for asynchronous audio extraction (large files)."""
def __init__(self,
job_repository: JobRepository,
validation_service: ValidationService,
process_job_use_case: ProcessJobUseCase):
self.job_repository = job_repository
self.validation_service = validation_service
self.process_job_use_case = process_job_use_case
async def execute(self, request: ExtractionRequestDTO,
background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
"""Create and queue an async extraction job."""
# Create domain objects for validation
video = Video(
filename=request.video_filename,
file_path=request.video_file_path,
size=FileSize(request.video_file_size),
content_type=request.content_type
)
# Validate request
self.validation_service.validate_extraction_request(
video, request.output_format, request.quality
)
# Create job
job = Job.create_new(
video_filename=request.video_filename,
file_size_bytes=request.video_file_size,
output_format=request.output_format,
quality=request.quality
)
# Save job to repository
await self.job_repository.create(
job_id=job.id,
filename=job.video_filename,
file_size_mb=job.file_size.megabytes,
output_format=job.output_format.value,
quality=job.quality.value
)
# Queue background processing
background_tasks.add_task(
self._process_job_background,
job.id,
request
)
logger.info(f"Created async job {job.id} for {video.filename}")
return JobCreationDTO(
job_id=job.id,
external_job_id=job.external_job_id,
status=job.status.value,
message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
check_url=f"/api/v1/jobs/{job.id}",
file_size_mb=job.file_size.megabytes
)
async def execute_with_job(self, job: Job, request: ExtractionRequestDTO,
background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
"""Execute with a pre-created job."""
# Create domain objects for validation
video = Video(
filename=request.video_filename,
file_path=request.video_file_path,
size=FileSize(request.video_file_size),
content_type=request.content_type
)
# Validate request
self.validation_service.validate_extraction_request(
video, request.output_format, request.quality
)
# Save job to repository (job already created)
try:
await self.job_repository.create(
job_id=job.id,
filename=job.video_filename,
file_size_mb=job.file_size.megabytes,
output_format=job.output_format.value,
quality=job.quality.value,
external_job_id=job.external_job_id,
bearer_token=job.bearer_token
)
except DuplicateExternalJobIdError:
# This should not happen since we validate uniqueness before creating the job
# But handle it gracefully just in case
raise
# Queue background processing
background_tasks.add_task(
self._process_job_background,
job.id,
request
)
logger.info(f"Created async job {job.id} for {video.filename}")
return JobCreationDTO(
job_id=job.id,
external_job_id=job.external_job_id,
status=job.status.value,
message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
check_url=f"/api/v1/jobs/{job.id}",
file_size_mb=job.file_size.megabytes
)
async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO):
"""Process job in background."""
try:
await self.process_job_use_case.execute(job_id, request)
except Exception as e:
logger.error(f"Background job {job_id} failed: {str(e)}")
await self.job_repository.update_status(
job_id=job_id,
status="failed",
error=str(e)
)