"""Use case for asynchronous audio extraction.""" import asyncio from typing import Protocol, Any import logging from application.use_cases.process_job import ProcessJobUseCase from domain.entities.video import Video from domain.entities.job import Job from domain.value_objects.file_size import FileSize from domain.services.validation_service import ValidationService from domain.exceptions.domain_exceptions import DuplicateExternalJobIdError from ..dto.extraction_request import ExtractionRequestDTO from ..dto.extraction_response import JobCreationDTO logger = logging.getLogger(__name__) class JobRepository(Protocol): """Protocol for job repository.""" async def create(self, job_id: str, filename: str, file_size_mb: float, output_format: str, quality: str) -> Any: ... async def update_status(self, job_id: str, status: str, error: str = None, output_path: str = None, processing_time: float = None) -> Any: ... class BackgroundTaskRunner(Protocol): """Protocol for background task execution.""" def add_task(self, func, *args, **kwargs): ... class ExtractAudioAsyncUseCase: """Use case for asynchronous audio extraction (large files).""" def __init__(self, job_repository: JobRepository, validation_service: ValidationService, process_job_use_case: ProcessJobUseCase): self.job_repository = job_repository self.validation_service = validation_service self.process_job_use_case = process_job_use_case async def execute(self, request: ExtractionRequestDTO, background_tasks: BackgroundTaskRunner) -> JobCreationDTO: """Create and queue an async extraction job.""" # Create domain objects for validation video = Video( filename=request.video_filename, file_path=request.video_file_path, size=FileSize(request.video_file_size), content_type=request.content_type ) # Validate request self.validation_service.validate_extraction_request( video, request.output_format, request.quality ) # Create job job = Job.create_new( video_filename=request.video_filename, file_size_bytes=request.video_file_size, output_format=request.output_format, quality=request.quality ) # Save job to repository await self.job_repository.create( job_id=job.id, filename=job.video_filename, file_size_mb=job.file_size.megabytes, output_format=job.output_format.value, quality=job.quality.value ) # Queue background processing background_tasks.add_task( self._process_job_background, job.id, request ) logger.info(f"Created async job {job.id} for {video.filename}") return JobCreationDTO( job_id=job.id, external_job_id=job.external_job_id, status=job.status.value, message=f"Processing large file ({job.file_size.megabytes:.1f} MB)", check_url=f"/api/v1/jobs/{job.id}", file_size_mb=job.file_size.megabytes ) async def execute_with_job(self, job: Job, request: ExtractionRequestDTO, background_tasks: BackgroundTaskRunner) -> JobCreationDTO: """Execute with a pre-created job.""" # Create domain objects for validation video = Video( filename=request.video_filename, file_path=request.video_file_path, size=FileSize(request.video_file_size), content_type=request.content_type ) # Validate request self.validation_service.validate_extraction_request( video, request.output_format, request.quality ) # Save job to repository (job already created) try: await self.job_repository.create( job_id=job.id, filename=job.video_filename, file_size_mb=job.file_size.megabytes, output_format=job.output_format.value, quality=job.quality.value, external_job_id=job.external_job_id, bearer_token=job.bearer_token ) except DuplicateExternalJobIdError: # This should not happen since we validate uniqueness before creating the job # But handle it gracefully just in case raise # Queue background processing background_tasks.add_task( self._process_job_background, job.id, request ) logger.info(f"Created async job {job.id} for {video.filename}") return JobCreationDTO( job_id=job.id, external_job_id=job.external_job_id, status=job.status.value, message=f"Processing large file ({job.file_size.megabytes:.1f} MB)", check_url=f"/api/v1/jobs/{job.id}", file_size_mb=job.file_size.megabytes ) async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO): """Process job in background.""" try: await self.process_job_use_case.execute(job_id, request) except Exception as e: logger.error(f"Background job {job_id} failed: {str(e)}") await self.job_repository.update_status( job_id=job_id, status="failed", error=str(e) )