"""Use case for processing extraction jobs.""" import time import logging from typing import Protocol, Any from domain.value_objects.audio_format import AudioFormat from domain.value_objects.audio_quality import AudioQuality from domain.value_objects.file_size import FileSize from domain.entities.video import Video from domain.entities.audio import Audio from infrastructure.services.local_file_processor import LocalFileProcessor from ..dto.extraction_request import ExtractionRequestDTO logger = logging.getLogger(__name__) class JobRepository(Protocol): """Protocol for job repository.""" async def update_status(self, job_id: str, status: str, error: str = None, output_path: str = None, processing_time: float = None) -> Any: ... class FFmpegService(Protocol): """Protocol for FFmpeg service.""" async def extract_audio(self, input_path: str, output_path: str, format: str, quality: str) -> Any: ... class FileRepository(Protocol): """Protocol for file repository.""" async def create_output_path(self, job_id: str, format: str) -> str: ... async def get_file_size(self, file_path: str) -> int: ... async def delete_file(self, file_path: str) -> bool: ... class NotificationService(Protocol): """Protocol for notification service.""" async def send_job_completion_notification(self, job_id: str, status: str, processing_time: float) -> Any: ... class ProcessJobUseCase: """Use case for processing a queued extraction job.""" def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service): self.file_repository = file_repository self.ffmpeg_service = ffmpeg_service self.job_repository = job_repository self.notification_service = notification_service self.file_processor = LocalFileProcessor(file_repository) async def execute(self, job_id: str, request: ExtractionRequestDTO): """Execute job processing with local file handling.""" start_time = time.time() try: # Update job status await self.job_repository.update_status(job_id, "processing") # Create output path output_key = await self.file_repository.create_output_path( job_id, request.output_format ) # Process with local files await self.file_processor.process_with_ffmpeg( input_storage_key=request.video_file_path, output_storage_key=output_key, ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async format=request.output_format, # Change from output_format to format quality=request.quality ) # Calculate processing time processing_time = time.time() - start_time # Update job as completed await self.job_repository.update_status( job_id, "completed", output_path=output_key, processing_time=processing_time ) # Send notification if service is available and we have a bearer token if self.notification_service: # Get job record to retrieve bearer token and external job ID job_record = await self.job_repository.get(job_id) bearer_token = job_record.bearer_token if job_record else None external_job_id = job_record.external_job_id if job_record else None if bearer_token: await self.notification_service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=processing_time, bearer_token=bearer_token, external_job_id=external_job_id ) else: logger.debug(f"Skipping N8N notification for job {job_id} - no bearer token available") # Clear bearer token for security (regardless of notification status) await self.job_repository.clear_bearer_token(job_id) logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds") except Exception as e: processing_time = time.time() - start_time logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}") await self.job_repository.update_status( job_id, "failed", error=str(e), processing_time=processing_time ) # Send failure notification if service is available and we have a bearer token if self.notification_service: try: job_record = await self.job_repository.get(job_id) bearer_token = job_record.bearer_token if job_record else None external_job_id = job_record.external_job_id if job_record else None if bearer_token: await self.notification_service.send_job_completion_notification( job_id=job_id, status="failed", processing_time=processing_time, bearer_token=bearer_token, external_job_id=external_job_id ) else: logger.debug(f"Skipping N8N failure notification for job {job_id} - no bearer token available") except Exception as notify_error: # Don't let notification failures mask the original job failure logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}") # Clear bearer token for security (regardless of notification status) try: await self.job_repository.clear_bearer_token(job_id) except Exception: logger.warning(f"Failed to clear bearer token for failed job {job_id}") raise