Spaces:
Sleeping
Sleeping
File size: 5,827 Bytes
92fd1a7 d369cf2 92fd1a7 dbe78dd 92fd1a7 d369cf2 92fd1a7 37e59a0 dbe78dd 37e59a0 dbe78dd 37e59a0 92fd1a7 dbe78dd 92fd1a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
"""Use case for asynchronous audio extraction."""
import asyncio
from typing import Protocol, Any
import logging
from application.use_cases.process_job import ProcessJobUseCase
from domain.entities.video import Video
from domain.entities.job import Job
from domain.value_objects.file_size import FileSize
from domain.services.validation_service import ValidationService
from domain.exceptions.domain_exceptions import DuplicateExternalJobIdError
from ..dto.extraction_request import ExtractionRequestDTO
from ..dto.extraction_response import JobCreationDTO
logger = logging.getLogger(__name__)
class JobRepository(Protocol):
"""Protocol for job repository."""
async def create(self, job_id: str, filename: str, file_size_mb: float,
output_format: str, quality: str) -> Any:
...
async def update_status(self, job_id: str, status: str,
error: str = None, output_path: str = None,
processing_time: float = None) -> Any:
...
class BackgroundTaskRunner(Protocol):
"""Protocol for background task execution."""
def add_task(self, func, *args, **kwargs):
...
class ExtractAudioAsyncUseCase:
"""Use case for asynchronous audio extraction (large files)."""
def __init__(self,
job_repository: JobRepository,
validation_service: ValidationService,
process_job_use_case: ProcessJobUseCase):
self.job_repository = job_repository
self.validation_service = validation_service
self.process_job_use_case = process_job_use_case
async def execute(self, request: ExtractionRequestDTO,
background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
"""Create and queue an async extraction job."""
# Create domain objects for validation
video = Video(
filename=request.video_filename,
file_path=request.video_file_path,
size=FileSize(request.video_file_size),
content_type=request.content_type
)
# Validate request
self.validation_service.validate_extraction_request(
video, request.output_format, request.quality
)
# Create job
job = Job.create_new(
video_filename=request.video_filename,
file_size_bytes=request.video_file_size,
output_format=request.output_format,
quality=request.quality
)
# Save job to repository
await self.job_repository.create(
job_id=job.id,
filename=job.video_filename,
file_size_mb=job.file_size.megabytes,
output_format=job.output_format.value,
quality=job.quality.value
)
# Queue background processing
background_tasks.add_task(
self._process_job_background,
job.id,
request
)
logger.info(f"Created async job {job.id} for {video.filename}")
return JobCreationDTO(
job_id=job.id,
external_job_id=job.external_job_id,
status=job.status.value,
message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
check_url=f"/api/v1/jobs/{job.id}",
file_size_mb=job.file_size.megabytes
)
async def execute_with_job(self, job: Job, request: ExtractionRequestDTO,
background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
"""Execute with a pre-created job."""
# Create domain objects for validation
video = Video(
filename=request.video_filename,
file_path=request.video_file_path,
size=FileSize(request.video_file_size),
content_type=request.content_type
)
# Validate request
self.validation_service.validate_extraction_request(
video, request.output_format, request.quality
)
# Save job to repository (job already created)
try:
await self.job_repository.create(
job_id=job.id,
filename=job.video_filename,
file_size_mb=job.file_size.megabytes,
output_format=job.output_format.value,
quality=job.quality.value,
external_job_id=job.external_job_id,
bearer_token=job.bearer_token
)
except DuplicateExternalJobIdError:
# This should not happen since we validate uniqueness before creating the job
# But handle it gracefully just in case
raise
# Queue background processing
background_tasks.add_task(
self._process_job_background,
job.id,
request
)
logger.info(f"Created async job {job.id} for {video.filename}")
return JobCreationDTO(
job_id=job.id,
external_job_id=job.external_job_id,
status=job.status.value,
message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
check_url=f"/api/v1/jobs/{job.id}",
file_size_mb=job.file_size.megabytes
)
async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO):
"""Process job in background."""
try:
await self.process_job_use_case.execute(job_id, request)
except Exception as e:
logger.error(f"Background job {job_id} failed: {str(e)}")
await self.job_repository.update_status(
job_id=job_id,
status="failed",
error=str(e)
) |