|
|
"""Use case for processing extraction jobs.""" |
|
|
import time |
|
|
import logging |
|
|
from typing import Protocol, Any |
|
|
|
|
|
from domain.value_objects.audio_format import AudioFormat |
|
|
from domain.value_objects.audio_quality import AudioQuality |
|
|
from domain.value_objects.file_size import FileSize |
|
|
from domain.entities.video import Video |
|
|
from domain.entities.audio import Audio |
|
|
from infrastructure.services.local_file_processor import LocalFileProcessor |
|
|
|
|
|
from ..dto.extraction_request import ExtractionRequestDTO |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class JobRepository(Protocol): |
|
|
"""Protocol for job repository.""" |
|
|
async def update_status(self, job_id: str, status: str, |
|
|
error: str = None, output_path: str = None, |
|
|
processing_time: float = None) -> Any: |
|
|
... |
|
|
|
|
|
class FFmpegService(Protocol): |
|
|
"""Protocol for FFmpeg service.""" |
|
|
async def extract_audio(self, input_path: str, output_path: str, |
|
|
format: str, quality: str) -> Any: |
|
|
... |
|
|
|
|
|
class FileRepository(Protocol): |
|
|
"""Protocol for file repository.""" |
|
|
async def create_output_path(self, job_id: str, format: str) -> str: |
|
|
... |
|
|
async def get_file_size(self, file_path: str) -> int: |
|
|
... |
|
|
async def delete_file(self, file_path: str) -> bool: |
|
|
... |
|
|
|
|
|
class NotificationService(Protocol): |
|
|
"""Protocol for notification service.""" |
|
|
async def send_job_completion_notification(self, |
|
|
job_id: str, |
|
|
status: str, |
|
|
processing_time: float) -> Any: |
|
|
... |
|
|
|
|
|
class ProcessJobUseCase: |
|
|
"""Use case for processing a queued extraction job.""" |
|
|
|
|
|
def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service): |
|
|
self.file_repository = file_repository |
|
|
self.ffmpeg_service = ffmpeg_service |
|
|
self.job_repository = job_repository |
|
|
self.notification_service = notification_service |
|
|
self.file_processor = LocalFileProcessor(file_repository) |
|
|
|
|
|
async def execute(self, job_id: str, request: ExtractionRequestDTO): |
|
|
"""Execute job processing with local file handling.""" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
await self.job_repository.update_status(job_id, "processing") |
|
|
|
|
|
|
|
|
output_key = await self.file_repository.create_output_path( |
|
|
job_id, |
|
|
request.output_format |
|
|
) |
|
|
|
|
|
|
|
|
await self.file_processor.process_with_ffmpeg( |
|
|
input_storage_key=request.video_file_path, |
|
|
output_storage_key=output_key, |
|
|
ffmpeg_func=self.ffmpeg_service.extract_audio, |
|
|
format=request.output_format, |
|
|
quality=request.quality |
|
|
) |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
|
|
|
await self.job_repository.update_status( |
|
|
job_id, |
|
|
"completed", |
|
|
output_path=output_key, |
|
|
processing_time=processing_time |
|
|
) |
|
|
|
|
|
|
|
|
job_record = await self.job_repository.get(job_id) |
|
|
bearer_token = job_record.bearer_token if job_record else None |
|
|
|
|
|
await self.notification_service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="completed", |
|
|
processing_time=processing_time, |
|
|
bearer_token=bearer_token |
|
|
) |
|
|
|
|
|
|
|
|
await self.job_repository.clear_bearer_token(job_id) |
|
|
|
|
|
logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds") |
|
|
|
|
|
except Exception as e: |
|
|
processing_time = time.time() - start_time |
|
|
logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}") |
|
|
|
|
|
await self.job_repository.update_status( |
|
|
job_id, |
|
|
"failed", |
|
|
error=str(e), |
|
|
processing_time=processing_time |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
job_record = await self.job_repository.get(job_id) |
|
|
bearer_token = job_record.bearer_token if job_record else None |
|
|
|
|
|
await self.notification_service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="failed", |
|
|
processing_time=processing_time, |
|
|
bearer_token=bearer_token |
|
|
) |
|
|
|
|
|
|
|
|
await self.job_repository.clear_bearer_token(job_id) |
|
|
|
|
|
except Exception as notify_error: |
|
|
|
|
|
logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}") |
|
|
|
|
|
|
|
|
try: |
|
|
await self.job_repository.clear_bearer_token(job_id) |
|
|
except Exception: |
|
|
logger.warning(f"Failed to clear bearer token for failed job {job_id}") |
|
|
|
|
|
raise |