File size: 6,629 Bytes
92fd1a7
 
 
 
 
 
 
 
 
 
37e59a0
92fd1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fca9e8c
 
 
 
 
 
 
 
92fd1a7
 
 
fca9e8c
92fd1a7
37e59a0
 
fca9e8c
37e59a0
92fd1a7
 
37e59a0
92fd1a7
 
 
37e59a0
92fd1a7
 
 
37e59a0
 
 
92fd1a7
 
37e59a0
 
 
 
 
 
 
92fd1a7
 
 
 
 
 
 
37e59a0
 
 
92fd1a7
 
fca9e8c
3216021
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92fd1a7
3216021
dbe78dd
 
37e59a0
92fd1a7
 
37e59a0
 
92fd1a7
 
37e59a0
 
 
 
92fd1a7
dbe78dd
3216021
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbe78dd
 
3216021
 
dbe78dd
92fd1a7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Use case for processing extraction jobs."""
import time
import logging
from typing import Protocol, Any

from domain.value_objects.audio_format import AudioFormat
from domain.value_objects.audio_quality import AudioQuality
from domain.value_objects.file_size import FileSize
from domain.entities.video import Video
from domain.entities.audio import Audio
from infrastructure.services.local_file_processor import LocalFileProcessor

from ..dto.extraction_request import ExtractionRequestDTO

logger = logging.getLogger(__name__)

class JobRepository(Protocol):
    """Protocol for job repository."""
    async def update_status(self, job_id: str, status: str,
                           error: str = None, output_path: str = None,
                           processing_time: float = None) -> Any:
        ...

class FFmpegService(Protocol):
    """Protocol for FFmpeg service."""
    async def extract_audio(self, input_path: str, output_path: str,
                           format: str, quality: str) -> Any:
        ...

class FileRepository(Protocol):
    """Protocol for file repository."""
    async def create_output_path(self, job_id: str, format: str) -> str:
        ...
    async def get_file_size(self, file_path: str) -> int:
        ...
    async def delete_file(self, file_path: str) -> bool:
        ...

class NotificationService(Protocol):
    """Protocol for notification service."""
    async def send_job_completion_notification(self, 
                                             job_id: str, 
                                             status: str,
                                             processing_time: float) -> Any:
        ...

class ProcessJobUseCase:
    """Use case for processing a queued extraction job."""
    
    def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
        self.file_repository = file_repository
        self.ffmpeg_service = ffmpeg_service
        self.job_repository = job_repository
        self.notification_service = notification_service
        self.file_processor = LocalFileProcessor(file_repository)
    
    async def execute(self, job_id: str, request: ExtractionRequestDTO):
        """Execute job processing with local file handling."""
        start_time = time.time()
        
        try:
            # Update job status
            await self.job_repository.update_status(job_id, "processing")
            
            # Create output path
            output_key = await self.file_repository.create_output_path(
                job_id, 
                request.output_format
            )
            
            # Process with local files
            await self.file_processor.process_with_ffmpeg(
                input_storage_key=request.video_file_path,
                output_storage_key=output_key,
                ffmpeg_func=self.ffmpeg_service.extract_audio,  # Remove _async
                format=request.output_format,  # Change from output_format to format
                quality=request.quality
            )
            
            # Calculate processing time
            processing_time = time.time() - start_time
            
            # Update job as completed
            await self.job_repository.update_status(
                job_id, 
                "completed", 
                output_path=output_key,
                processing_time=processing_time
            )

            # Send notification if service is available and we have a bearer token
            if self.notification_service:
                # Get job record to retrieve bearer token and external job ID
                job_record = await self.job_repository.get(job_id)
                bearer_token = job_record.bearer_token if job_record else None
                external_job_id = job_record.external_job_id if job_record else None
                
                if bearer_token:
                    await self.notification_service.send_job_completion_notification(
                        job_id=job_id,
                        status="completed",
                        processing_time=processing_time,
                        bearer_token=bearer_token,
                        external_job_id=external_job_id
                    )
                else:
                    logger.debug(f"Skipping N8N notification for job {job_id} - no bearer token available")
            
            # Clear bearer token for security (regardless of notification status)
            await self.job_repository.clear_bearer_token(job_id)
            
            logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
            
        except Exception as e:
            processing_time = time.time() - start_time
            logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
            
            await self.job_repository.update_status(
                job_id, 
                "failed", 
                error=str(e),
                processing_time=processing_time
            )
            
            # Send failure notification if service is available and we have a bearer token
            if self.notification_service:
                try:
                    job_record = await self.job_repository.get(job_id)
                    bearer_token = job_record.bearer_token if job_record else None
                    external_job_id = job_record.external_job_id if job_record else None
                    
                    if bearer_token:
                        await self.notification_service.send_job_completion_notification(
                            job_id=job_id,
                            status="failed",
                            processing_time=processing_time,
                            bearer_token=bearer_token,
                            external_job_id=external_job_id
                        )
                    else:
                        logger.debug(f"Skipping N8N failure notification for job {job_id} - no bearer token available")
                    
                except Exception as notify_error:
                    # Don't let notification failures mask the original job failure
                    logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}")
            
            # Clear bearer token for security (regardless of notification status)
            try:
                await self.job_repository.clear_bearer_token(job_id)
            except Exception:
                logger.warning(f"Failed to clear bearer token for failed job {job_id}")
            
            raise