File size: 5,796 Bytes
92fd1a7 37e59a0 92fd1a7 fca9e8c 92fd1a7 fca9e8c 92fd1a7 37e59a0 fca9e8c 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 fca9e8c dbe78dd fca9e8c dbe78dd fca9e8c 92fd1a7 dbe78dd 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 dbe78dd 92fd1a7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | """Use case for processing extraction jobs."""
import time
import logging
from typing import Protocol, Any
from domain.value_objects.audio_format import AudioFormat
from domain.value_objects.audio_quality import AudioQuality
from domain.value_objects.file_size import FileSize
from domain.entities.video import Video
from domain.entities.audio import Audio
from infrastructure.services.local_file_processor import LocalFileProcessor
from ..dto.extraction_request import ExtractionRequestDTO
logger = logging.getLogger(__name__)
class JobRepository(Protocol):
"""Protocol for job repository."""
async def update_status(self, job_id: str, status: str,
error: str = None, output_path: str = None,
processing_time: float = None) -> Any:
...
class FFmpegService(Protocol):
"""Protocol for FFmpeg service."""
async def extract_audio(self, input_path: str, output_path: str,
format: str, quality: str) -> Any:
...
class FileRepository(Protocol):
"""Protocol for file repository."""
async def create_output_path(self, job_id: str, format: str) -> str:
...
async def get_file_size(self, file_path: str) -> int:
...
async def delete_file(self, file_path: str) -> bool:
...
class NotificationService(Protocol):
"""Protocol for notification service."""
async def send_job_completion_notification(self,
job_id: str,
status: str,
processing_time: float) -> Any:
...
class ProcessJobUseCase:
"""Use case for processing a queued extraction job."""
def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
self.file_repository = file_repository
self.ffmpeg_service = ffmpeg_service
self.job_repository = job_repository
self.notification_service = notification_service
self.file_processor = LocalFileProcessor(file_repository)
async def execute(self, job_id: str, request: ExtractionRequestDTO):
"""Execute job processing with local file handling."""
start_time = time.time()
try:
# Update job status
await self.job_repository.update_status(job_id, "processing")
# Create output path
output_key = await self.file_repository.create_output_path(
job_id,
request.output_format
)
# Process with local files
await self.file_processor.process_with_ffmpeg(
input_storage_key=request.video_file_path,
output_storage_key=output_key,
ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async
format=request.output_format, # Change from output_format to format
quality=request.quality
)
# Calculate processing time
processing_time = time.time() - start_time
# Update job as completed
await self.job_repository.update_status(
job_id,
"completed",
output_path=output_key,
processing_time=processing_time
)
# Get job record to retrieve bearer token
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=processing_time,
bearer_token=bearer_token
)
# Clear bearer token for security after notification is sent
await self.job_repository.clear_bearer_token(job_id)
logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
await self.job_repository.update_status(
job_id,
"failed",
error=str(e),
processing_time=processing_time
)
# Get job record to retrieve bearer token for failure notification
try:
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="failed",
processing_time=processing_time,
bearer_token=bearer_token
)
# Clear bearer token for security after notification is sent
await self.job_repository.clear_bearer_token(job_id)
except Exception as notify_error:
# Don't let notification failures mask the original job failure
logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}")
# Still clear the token even if notification failed
try:
await self.job_repository.clear_bearer_token(job_id)
except Exception:
logger.warning(f"Failed to clear bearer token for failed job {job_id}")
raise |