Tadeas Kosek
n8n use passed token
3216021
"""Use case for processing extraction jobs."""
import time
import logging
from typing import Protocol, Any
from domain.value_objects.audio_format import AudioFormat
from domain.value_objects.audio_quality import AudioQuality
from domain.value_objects.file_size import FileSize
from domain.entities.video import Video
from domain.entities.audio import Audio
from infrastructure.services.local_file_processor import LocalFileProcessor
from ..dto.extraction_request import ExtractionRequestDTO
logger = logging.getLogger(__name__)
class JobRepository(Protocol):
"""Protocol for job repository."""
async def update_status(self, job_id: str, status: str,
error: str = None, output_path: str = None,
processing_time: float = None) -> Any:
...
class FFmpegService(Protocol):
"""Protocol for FFmpeg service."""
async def extract_audio(self, input_path: str, output_path: str,
format: str, quality: str) -> Any:
...
class FileRepository(Protocol):
"""Protocol for file repository."""
async def create_output_path(self, job_id: str, format: str) -> str:
...
async def get_file_size(self, file_path: str) -> int:
...
async def delete_file(self, file_path: str) -> bool:
...
class NotificationService(Protocol):
"""Protocol for notification service."""
async def send_job_completion_notification(self,
job_id: str,
status: str,
processing_time: float) -> Any:
...
class ProcessJobUseCase:
"""Use case for processing a queued extraction job."""
def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
self.file_repository = file_repository
self.ffmpeg_service = ffmpeg_service
self.job_repository = job_repository
self.notification_service = notification_service
self.file_processor = LocalFileProcessor(file_repository)
async def execute(self, job_id: str, request: ExtractionRequestDTO):
"""Execute job processing with local file handling."""
start_time = time.time()
try:
# Update job status
await self.job_repository.update_status(job_id, "processing")
# Create output path
output_key = await self.file_repository.create_output_path(
job_id,
request.output_format
)
# Process with local files
await self.file_processor.process_with_ffmpeg(
input_storage_key=request.video_file_path,
output_storage_key=output_key,
ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async
format=request.output_format, # Change from output_format to format
quality=request.quality
)
# Calculate processing time
processing_time = time.time() - start_time
# Update job as completed
await self.job_repository.update_status(
job_id,
"completed",
output_path=output_key,
processing_time=processing_time
)
# Send notification if service is available and we have a bearer token
if self.notification_service:
# Get job record to retrieve bearer token and external job ID
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
external_job_id = job_record.external_job_id if job_record else None
if bearer_token:
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=processing_time,
bearer_token=bearer_token,
external_job_id=external_job_id
)
else:
logger.debug(f"Skipping N8N notification for job {job_id} - no bearer token available")
# Clear bearer token for security (regardless of notification status)
await self.job_repository.clear_bearer_token(job_id)
logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
await self.job_repository.update_status(
job_id,
"failed",
error=str(e),
processing_time=processing_time
)
# Send failure notification if service is available and we have a bearer token
if self.notification_service:
try:
job_record = await self.job_repository.get(job_id)
bearer_token = job_record.bearer_token if job_record else None
external_job_id = job_record.external_job_id if job_record else None
if bearer_token:
await self.notification_service.send_job_completion_notification(
job_id=job_id,
status="failed",
processing_time=processing_time,
bearer_token=bearer_token,
external_job_id=external_job_id
)
else:
logger.debug(f"Skipping N8N failure notification for job {job_id} - no bearer token available")
except Exception as notify_error:
# Don't let notification failures mask the original job failure
logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}")
# Clear bearer token for security (regardless of notification status)
try:
await self.job_repository.clear_bearer_token(job_id)
except Exception:
logger.warning(f"Failed to clear bearer token for failed job {job_id}")
raise