File size: 5,827 Bytes
92fd1a7
 
 
 
 
d369cf2
92fd1a7
 
 
 
dbe78dd
92fd1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d369cf2
92fd1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37e59a0
 
 
 
 
 
 
 
 
 
 
dbe78dd
37e59a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbe78dd
 
 
 
 
 
 
 
 
 
 
 
 
 
37e59a0
92fd1a7
 
 
 
 
 
 
 
 
 
 
dbe78dd
92fd1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Use case for asynchronous audio extraction."""
import asyncio
from typing import Protocol, Any
import logging

from application.use_cases.process_job import ProcessJobUseCase
from domain.entities.video import Video
from domain.entities.job import Job
from domain.value_objects.file_size import FileSize
from domain.services.validation_service import ValidationService
from domain.exceptions.domain_exceptions import DuplicateExternalJobIdError

from ..dto.extraction_request import ExtractionRequestDTO
from ..dto.extraction_response import JobCreationDTO

logger = logging.getLogger(__name__)

class JobRepository(Protocol):
    """Protocol for job repository."""
    async def create(self, job_id: str, filename: str, file_size_mb: float,
                    output_format: str, quality: str) -> Any:
        ...
    async def update_status(self, job_id: str, status: str,
                           error: str = None, output_path: str = None,
                           processing_time: float = None) -> Any:
        ...

class BackgroundTaskRunner(Protocol):
    """Protocol for background task execution."""
    def add_task(self, func, *args, **kwargs):
        ...

class ExtractAudioAsyncUseCase:
    """Use case for asynchronous audio extraction (large files)."""
    
    def __init__(self,
                 job_repository: JobRepository,
                 validation_service: ValidationService,
                 process_job_use_case: ProcessJobUseCase):
        self.job_repository = job_repository
        self.validation_service = validation_service
        self.process_job_use_case = process_job_use_case
    
    async def execute(self, request: ExtractionRequestDTO, 
                     background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
        """Create and queue an async extraction job."""
        
        # Create domain objects for validation
        video = Video(
            filename=request.video_filename,
            file_path=request.video_file_path,
            size=FileSize(request.video_file_size),
            content_type=request.content_type
        )
        
        # Validate request
        self.validation_service.validate_extraction_request(
            video, request.output_format, request.quality
        )
        
        # Create job
        job = Job.create_new(
            video_filename=request.video_filename,
            file_size_bytes=request.video_file_size,
            output_format=request.output_format,
            quality=request.quality
        )
        
        # Save job to repository
        await self.job_repository.create(
            job_id=job.id,
            filename=job.video_filename,
            file_size_mb=job.file_size.megabytes,
            output_format=job.output_format.value,
            quality=job.quality.value
        )
        
        # Queue background processing
        background_tasks.add_task(
            self._process_job_background,
            job.id,
            request
        )
        
        logger.info(f"Created async job {job.id} for {video.filename}")
        
        return JobCreationDTO(
            job_id=job.id,
            external_job_id=job.external_job_id,
            status=job.status.value,
            message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
            check_url=f"/api/v1/jobs/{job.id}",
            file_size_mb=job.file_size.megabytes
        )
    
    async def execute_with_job(self, job: Job, request: ExtractionRequestDTO, 
                              background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
        """Execute with a pre-created job."""
        
        # Create domain objects for validation
        video = Video(
            filename=request.video_filename,
            file_path=request.video_file_path,
            size=FileSize(request.video_file_size),
            content_type=request.content_type
        )
        
        # Validate request
        self.validation_service.validate_extraction_request(
            video, request.output_format, request.quality
        )
        
        # Save job to repository (job already created)
        try:
            await self.job_repository.create(
                job_id=job.id,
                filename=job.video_filename,
                file_size_mb=job.file_size.megabytes,
                output_format=job.output_format.value,
                quality=job.quality.value,
                external_job_id=job.external_job_id,
                bearer_token=job.bearer_token
            )
        except DuplicateExternalJobIdError:
            # This should not happen since we validate uniqueness before creating the job
            # But handle it gracefully just in case
            raise
        
        # Queue background processing
        background_tasks.add_task(
            self._process_job_background,
            job.id,
            request
        )
        
        logger.info(f"Created async job {job.id} for {video.filename}")
        
        return JobCreationDTO(
            job_id=job.id,
            external_job_id=job.external_job_id,
            status=job.status.value,
            message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
            check_url=f"/api/v1/jobs/{job.id}",
            file_size_mb=job.file_size.megabytes
        )
    
    async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO):
        """Process job in background."""
        try:
            await self.process_job_use_case.execute(job_id, request)
        except Exception as e:
            logger.error(f"Background job {job_id} failed: {str(e)}")
            await self.job_repository.update_status(
                job_id=job_id,
                status="failed",
                error=str(e)
            )