Spaces:
Sleeping
Sleeping
File size: 6,629 Bytes
92fd1a7 37e59a0 92fd1a7 fca9e8c 92fd1a7 fca9e8c 92fd1a7 37e59a0 fca9e8c 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 fca9e8c 3216021 92fd1a7 3216021 dbe78dd 37e59a0 92fd1a7 37e59a0 92fd1a7 37e59a0 92fd1a7 dbe78dd 3216021 dbe78dd 3216021 dbe78dd 92fd1a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
"""Use case for processing extraction jobs."""
import time
import logging
from typing import Protocol, Any
from domain.value_objects.audio_format import AudioFormat
from domain.value_objects.audio_quality import AudioQuality
from domain.value_objects.file_size import FileSize
from domain.entities.video import Video
from domain.entities.audio import Audio
from infrastructure.services.local_file_processor import LocalFileProcessor
from ..dto.extraction_request import ExtractionRequestDTO
logger = logging.getLogger(__name__)
class JobRepository(Protocol):
"""Protocol for job repository."""
async def update_status(self, job_id: str, status: str,
error: str = None, output_path: str = None,
processing_time: float = None) -> Any:
...
class FFmpegService(Protocol):
"""Protocol for FFmpeg service."""
async def extract_audio(self, input_path: str, output_path: str,
format: str, quality: str) -> Any:
...
class FileRepository(Protocol):
"""Protocol for file repository."""
async def create_output_path(self, job_id: str, format: str) -> str:
...
async def get_file_size(self, file_path: str) -> int:
...
async def delete_file(self, file_path: str) -> bool:
...
class NotificationService(Protocol):
"""Protocol for notification service."""
async def send_job_completion_notification(self,
job_id: str,
status: str,
processing_time: float) -> Any:
...
class ProcessJobUseCase:
"""Use case for processing a queued extraction job."""
def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
self.file_repository = file_repository
self.ffmpeg_service = ffmpeg_service
self.job_repository = job_repository
self.notification_service = notification_service
self.file_processor = LocalFileProcessor(file_repository)
async def execute(self, job_id: str, request: ExtractionRequestDTO):
"""Execute job processing with local file handling."""
start_time = time.time()
try:
# Update job status
await self.job_repository.update_status(job_id, "processing")
# Create output path
output_key = await self.file_repository.create_output_path(
job_id,
request.output_format
)
# Process with local files
await self.file_processor.process_with_ffmpeg(
input_storage_key=request.video_file_path,
output_storage_key=output_key,
ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async
format=request.output_format, # Change from output_format to format
quality=request.quality
)
# Calculate processing time
processing_time = time.time() - start_time
# Update job as completed
await self.job_repository.update_status(
job_id,
"completed",
output_path=output_key,
processing_time=processing_time
)
# Send notification if service is available and we have a bearer token
if self.notification_service:
# Get job record to retrieve bearer token and external job ID
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
external_job_id = job_record.external_job_id if job_record else None
if bearer_token:
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=processing_time,
bearer_token=bearer_token,
external_job_id=external_job_id
)
else:
logger.debug(f"Skipping N8N notification for job {job_id} - no bearer token available")
# Clear bearer token for security (regardless of notification status)
await self.job_repository.clear_bearer_token(job_id)
logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
await self.job_repository.update_status(
job_id,
"failed",
error=str(e),
processing_time=processing_time
)
# Send failure notification if service is available and we have a bearer token
if self.notification_service:
try:
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
external_job_id = job_record.external_job_id if job_record else None
if bearer_token:
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="failed",
processing_time=processing_time,
bearer_token=bearer_token,
external_job_id=external_job_id
)
else:
logger.debug(f"Skipping N8N failure notification for job {job_id} - no bearer token available")
except Exception as notify_error:
# Don't let notification failures mask the original job failure
logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}")
# Clear bearer token for security (regardless of notification status)
try:
await self.job_repository.clear_bearer_token(job_id)
except Exception:
logger.warning(f"Failed to clear bearer token for failed job {job_id}")
raise |