Spaces:
Sleeping
Sleeping
| """Use case for processing extraction jobs.""" | |
| import time | |
| import logging | |
| from typing import Protocol, Any | |
| from domain.value_objects.audio_format import AudioFormat | |
| from domain.value_objects.audio_quality import AudioQuality | |
| from domain.value_objects.file_size import FileSize | |
| from domain.entities.video import Video | |
| from domain.entities.audio import Audio | |
| from infrastructure.services.local_file_processor import LocalFileProcessor | |
| from ..dto.extraction_request import ExtractionRequestDTO | |
| logger = logging.getLogger(__name__) | |
| class JobRepository(Protocol): | |
| """Protocol for job repository.""" | |
| async def update_status(self, job_id: str, status: str, | |
| error: str = None, output_path: str = None, | |
| processing_time: float = None) -> Any: | |
| ... | |
| class FFmpegService(Protocol): | |
| """Protocol for FFmpeg service.""" | |
| async def extract_audio(self, input_path: str, output_path: str, | |
| format: str, quality: str) -> Any: | |
| ... | |
| class FileRepository(Protocol): | |
| """Protocol for file repository.""" | |
| async def create_output_path(self, job_id: str, format: str) -> str: | |
| ... | |
| async def get_file_size(self, file_path: str) -> int: | |
| ... | |
| async def delete_file(self, file_path: str) -> bool: | |
| ... | |
| class NotificationService(Protocol): | |
| """Protocol for notification service.""" | |
| async def send_job_completion_notification(self, | |
| job_id: str, | |
| status: str, | |
| processing_time: float) -> Any: | |
| ... | |
| class ProcessJobUseCase: | |
| """Use case for processing a queued extraction job.""" | |
| def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service): | |
| self.file_repository = file_repository | |
| self.ffmpeg_service = ffmpeg_service | |
| self.job_repository = job_repository | |
| self.notification_service = notification_service | |
| self.file_processor = LocalFileProcessor(file_repository) | |
| async def execute(self, job_id: str, request: ExtractionRequestDTO): | |
| """Execute job processing with local file handling.""" | |
| start_time = time.time() | |
| try: | |
| # Update job status | |
| await self.job_repository.update_status(job_id, "processing") | |
| # Create output path | |
| output_key = await self.file_repository.create_output_path( | |
| job_id, | |
| request.output_format | |
| ) | |
| # Process with local files | |
| await self.file_processor.process_with_ffmpeg( | |
| input_storage_key=request.video_file_path, | |
| output_storage_key=output_key, | |
| ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async | |
| format=request.output_format, # Change from output_format to format | |
| quality=request.quality | |
| ) | |
| # Calculate processing time | |
| processing_time = time.time() - start_time | |
| # Update job as completed | |
| await self.job_repository.update_status( | |
| job_id, | |
| "completed", | |
| output_path=output_key, | |
| processing_time=processing_time | |
| ) | |
| # Send notification if service is available and we have a bearer token | |
| if self.notification_service: | |
| # Get job record to retrieve bearer token and external job ID | |
| job_record = await self.job_repository.get(job_id) | |
| bearer_token = job_record.bearer_token if job_record else None | |
| external_job_id = job_record.external_job_id if job_record else None | |
| if bearer_token: | |
| await self.notification_service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=processing_time, | |
| bearer_token=bearer_token, | |
| external_job_id=external_job_id | |
| ) | |
| else: | |
| logger.debug(f"Skipping N8N notification for job {job_id} - no bearer token available") | |
| # Clear bearer token for security (regardless of notification status) | |
| await self.job_repository.clear_bearer_token(job_id) | |
| logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds") | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}") | |
| await self.job_repository.update_status( | |
| job_id, | |
| "failed", | |
| error=str(e), | |
| processing_time=processing_time | |
| ) | |
| # Send failure notification if service is available and we have a bearer token | |
| if self.notification_service: | |
| try: | |
| job_record = await self.job_repository.get(job_id) | |
| bearer_token = job_record.bearer_token if job_record else None | |
| external_job_id = job_record.external_job_id if job_record else None | |
| if bearer_token: | |
| await self.notification_service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="failed", | |
| processing_time=processing_time, | |
| bearer_token=bearer_token, | |
| external_job_id=external_job_id | |
| ) | |
| else: | |
| logger.debug(f"Skipping N8N failure notification for job {job_id} - no bearer token available") | |
| except Exception as notify_error: | |
| # Don't let notification failures mask the original job failure | |
| logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}") | |
| # Clear bearer token for security (regardless of notification status) | |
| try: | |
| await self.job_repository.clear_bearer_token(job_id) | |
| except Exception: | |
| logger.warning(f"Failed to clear bearer token for failed job {job_id}") | |
| raise |