File size: 5,796 Bytes
92fd1a7
 
 
 
 
 
 
 
 
 
37e59a0
92fd1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fca9e8c
 
 
 
 
 
 
 
92fd1a7
 
 
fca9e8c
92fd1a7
37e59a0
 
fca9e8c
37e59a0
92fd1a7
 
37e59a0
92fd1a7
 
 
37e59a0
92fd1a7
 
 
37e59a0
 
 
92fd1a7
 
37e59a0
 
 
 
 
 
 
92fd1a7
 
 
 
 
 
 
37e59a0
 
 
92fd1a7
 
fca9e8c
dbe78dd
 
 
 
fca9e8c
 
 
dbe78dd
 
fca9e8c
92fd1a7
dbe78dd
 
 
37e59a0
92fd1a7
 
37e59a0
 
92fd1a7
 
37e59a0
 
 
 
92fd1a7
dbe78dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92fd1a7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Use case for processing extraction jobs."""
import time
import logging
from typing import Protocol, Any

from domain.value_objects.audio_format import AudioFormat
from domain.value_objects.audio_quality import AudioQuality
from domain.value_objects.file_size import FileSize
from domain.entities.video import Video
from domain.entities.audio import Audio
from infrastructure.services.local_file_processor import LocalFileProcessor

from ..dto.extraction_request import ExtractionRequestDTO

logger = logging.getLogger(__name__)

class JobRepository(Protocol):
    """Protocol for job repository."""
    async def update_status(self, job_id: str, status: str,
                           error: str = None, output_path: str = None,
                           processing_time: float = None) -> Any:
        ...

class FFmpegService(Protocol):
    """Protocol for FFmpeg service."""
    async def extract_audio(self, input_path: str, output_path: str,
                           format: str, quality: str) -> Any:
        ...

class FileRepository(Protocol):
    """Protocol for file repository."""
    async def create_output_path(self, job_id: str, format: str) -> str:
        ...
    async def get_file_size(self, file_path: str) -> int:
        ...
    async def delete_file(self, file_path: str) -> bool:
        ...

class NotificationService(Protocol):
    """Protocol for notification service."""
    async def send_job_completion_notification(self, 
                                             job_id: str, 
                                             status: str,
                                             processing_time: float) -> Any:
        ...

class ProcessJobUseCase:
    """Use case for processing a queued extraction job."""
    
    def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
        self.file_repository = file_repository
        self.ffmpeg_service = ffmpeg_service
        self.job_repository = job_repository
        self.notification_service = notification_service
        self.file_processor = LocalFileProcessor(file_repository)
    
    async def execute(self, job_id: str, request: ExtractionRequestDTO):
        """Execute job processing with local file handling."""
        start_time = time.time()
        
        try:
            # Update job status
            await self.job_repository.update_status(job_id, "processing")
            
            # Create output path
            output_key = await self.file_repository.create_output_path(
                job_id, 
                request.output_format
            )
            
            # Process with local files
            await self.file_processor.process_with_ffmpeg(
                input_storage_key=request.video_file_path,
                output_storage_key=output_key,
                ffmpeg_func=self.ffmpeg_service.extract_audio,  # Remove _async
                format=request.output_format,  # Change from output_format to format
                quality=request.quality
            )
            
            # Calculate processing time
            processing_time = time.time() - start_time
            
            # Update job as completed
            await self.job_repository.update_status(
                job_id, 
                "completed", 
                output_path=output_key,
                processing_time=processing_time
            )

            # Get job record to retrieve bearer token
            job_record = await self.job_repository.get(job_id)
            bearer_token = job_record.bearer_token if job_record else None

            await self.notification_service.send_job_completion_notification(
                job_id=job_id,
                status="completed",
                processing_time=processing_time,
                bearer_token=bearer_token
            )
            
            # Clear bearer token for security after notification is sent
            await self.job_repository.clear_bearer_token(job_id)
            
            logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
            
        except Exception as e:
            processing_time = time.time() - start_time
            logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
            
            await self.job_repository.update_status(
                job_id, 
                "failed", 
                error=str(e),
                processing_time=processing_time
            )
            
            # Get job record to retrieve bearer token for failure notification
            try:
                job_record = await self.job_repository.get(job_id)
                bearer_token = job_record.bearer_token if job_record else None
                
                await self.notification_service.send_job_completion_notification(
                    job_id=job_id,
                    status="failed",
                    processing_time=processing_time,
                    bearer_token=bearer_token
                )
                
                # Clear bearer token for security after notification is sent
                await self.job_repository.clear_bearer_token(job_id)
                
            except Exception as notify_error:
                # Don't let notification failures mask the original job failure
                logger.warning(f"Failed to send failure notification for job {job_id}: {notify_error}")
                
                # Still clear the token even if notification failed
                try:
                    await self.job_repository.clear_bearer_token(job_id)
                except Exception:
                    logger.warning(f"Failed to clear bearer token for failed job {job_id}")
            
            raise