Spaces:
Sleeping
Sleeping
| """Use case for asynchronous audio extraction.""" | |
| import asyncio | |
| from typing import Protocol, Any | |
| import logging | |
| from application.use_cases.process_job import ProcessJobUseCase | |
| from domain.entities.video import Video | |
| from domain.entities.job import Job | |
| from domain.value_objects.file_size import FileSize | |
| from domain.services.validation_service import ValidationService | |
| from domain.exceptions.domain_exceptions import DuplicateExternalJobIdError | |
| from ..dto.extraction_request import ExtractionRequestDTO | |
| from ..dto.extraction_response import JobCreationDTO | |
| logger = logging.getLogger(__name__) | |
| class JobRepository(Protocol): | |
| """Protocol for job repository.""" | |
| async def create(self, job_id: str, filename: str, file_size_mb: float, | |
| output_format: str, quality: str) -> Any: | |
| ... | |
| async def update_status(self, job_id: str, status: str, | |
| error: str = None, output_path: str = None, | |
| processing_time: float = None) -> Any: | |
| ... | |
| class BackgroundTaskRunner(Protocol): | |
| """Protocol for background task execution.""" | |
| def add_task(self, func, *args, **kwargs): | |
| ... | |
| class ExtractAudioAsyncUseCase: | |
| """Use case for asynchronous audio extraction (large files).""" | |
| def __init__(self, | |
| job_repository: JobRepository, | |
| validation_service: ValidationService, | |
| process_job_use_case: ProcessJobUseCase): | |
| self.job_repository = job_repository | |
| self.validation_service = validation_service | |
| self.process_job_use_case = process_job_use_case | |
| async def execute(self, request: ExtractionRequestDTO, | |
| background_tasks: BackgroundTaskRunner) -> JobCreationDTO: | |
| """Create and queue an async extraction job.""" | |
| # Create domain objects for validation | |
| video = Video( | |
| filename=request.video_filename, | |
| file_path=request.video_file_path, | |
| size=FileSize(request.video_file_size), | |
| content_type=request.content_type | |
| ) | |
| # Validate request | |
| self.validation_service.validate_extraction_request( | |
| video, request.output_format, request.quality | |
| ) | |
| # Create job | |
| job = Job.create_new( | |
| video_filename=request.video_filename, | |
| file_size_bytes=request.video_file_size, | |
| output_format=request.output_format, | |
| quality=request.quality | |
| ) | |
| # Save job to repository | |
| await self.job_repository.create( | |
| job_id=job.id, | |
| filename=job.video_filename, | |
| file_size_mb=job.file_size.megabytes, | |
| output_format=job.output_format.value, | |
| quality=job.quality.value | |
| ) | |
| # Queue background processing | |
| background_tasks.add_task( | |
| self._process_job_background, | |
| job.id, | |
| request | |
| ) | |
| logger.info(f"Created async job {job.id} for {video.filename}") | |
| return JobCreationDTO( | |
| job_id=job.id, | |
| external_job_id=job.external_job_id, | |
| status=job.status.value, | |
| message=f"Processing large file ({job.file_size.megabytes:.1f} MB)", | |
| check_url=f"/api/v1/jobs/{job.id}", | |
| file_size_mb=job.file_size.megabytes | |
| ) | |
| async def execute_with_job(self, job: Job, request: ExtractionRequestDTO, | |
| background_tasks: BackgroundTaskRunner) -> JobCreationDTO: | |
| """Execute with a pre-created job.""" | |
| # Create domain objects for validation | |
| video = Video( | |
| filename=request.video_filename, | |
| file_path=request.video_file_path, | |
| size=FileSize(request.video_file_size), | |
| content_type=request.content_type | |
| ) | |
| # Validate request | |
| self.validation_service.validate_extraction_request( | |
| video, request.output_format, request.quality | |
| ) | |
| # Save job to repository (job already created) | |
| try: | |
| await self.job_repository.create( | |
| job_id=job.id, | |
| filename=job.video_filename, | |
| file_size_mb=job.file_size.megabytes, | |
| output_format=job.output_format.value, | |
| quality=job.quality.value, | |
| external_job_id=job.external_job_id, | |
| bearer_token=job.bearer_token | |
| ) | |
| except DuplicateExternalJobIdError: | |
| # This should not happen since we validate uniqueness before creating the job | |
| # But handle it gracefully just in case | |
| raise | |
| # Queue background processing | |
| background_tasks.add_task( | |
| self._process_job_background, | |
| job.id, | |
| request | |
| ) | |
| logger.info(f"Created async job {job.id} for {video.filename}") | |
| return JobCreationDTO( | |
| job_id=job.id, | |
| external_job_id=job.external_job_id, | |
| status=job.status.value, | |
| message=f"Processing large file ({job.file_size.megabytes:.1f} MB)", | |
| check_url=f"/api/v1/jobs/{job.id}", | |
| file_size_mb=job.file_size.megabytes | |
| ) | |
| async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO): | |
| """Process job in background.""" | |
| try: | |
| await self.process_job_use_case.execute(job_id, request) | |
| except Exception as e: | |
| logger.error(f"Background job {job_id} failed: {str(e)}") | |
| await self.job_repository.update_status( | |
| job_id=job_id, | |
| status="failed", | |
| error=str(e) | |
| ) |