Tadeas Kosek commited on
Commit
b8ef32f
·
1 Parent(s): b7187f4

fix download job

Browse files
application/dto/extraction_response.py CHANGED
@@ -34,6 +34,7 @@ class DownloadResultDTO:
34
  media_type: str
35
  filename: str
36
  processing_time: float
 
37
 
38
  @dataclass
39
  class JobCreationDTO:
 
34
  media_type: str
35
  filename: str
36
  processing_time: float
37
+ storage_key: str = None
38
 
39
  @dataclass
40
  class JobCreationDTO:
application/use_cases/download_audio_result.py CHANGED
@@ -6,6 +6,7 @@ from pathlib import Path
6
 
7
  from domain.exceptions.domain_exceptions import JobNotFoundError, JobNotCompletedError, ValidationError
8
  from domain.services.validation_service import ValidationService
 
9
  from ..dto.extraction_response import DownloadResultDTO
10
 
11
  logger = logging.getLogger(__name__)
@@ -53,6 +54,7 @@ class DownloadAudioResultUseCase:
53
  self.validation_service = validation_service
54
  self.ffmpeg_service = ffmpeg_service
55
  self.audio_mime_types = audio_mime_types
 
56
 
57
  async def execute(self, job_id: str,
58
  start_seconds: Optional[float] = None,
@@ -77,50 +79,58 @@ class DownloadAudioResultUseCase:
77
 
78
  # Determine if trimming is needed
79
  needs_trimming = start_seconds is not None or end_seconds is not None
80
-
81
  if needs_trimming:
82
- # Get audio duration for validation
83
- media_info = await self.ffmpeg_service.get_media_info(job_record.output_path)
84
- audio_duration = self._extract_duration_from_media_info(media_info)
85
-
86
- if audio_duration is None:
87
- raise RuntimeError(f"Could not determine audio duration for job {job_id}")
88
 
89
- # Validate time range
90
- self.validation_service.validate_time_range(
91
- start_seconds, end_seconds, audio_duration
92
- )
93
-
94
- # Create trimmed audio
95
- file_path = await self._create_trimmed_audio(
96
- job_record, start_seconds, end_seconds
97
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
  else:
99
- # Use original file
100
- file_path = job_record.output_path
101
-
102
- # Get MIME type
103
- mime_type = self.audio_mime_types.get(
104
- job_record.output_format,
105
- 'application/octet-stream'
106
- )
107
 
108
- # Create filename
 
109
  filename = self._create_filename(job_record, start_seconds, end_seconds)
110
 
111
  return DownloadResultDTO(
112
  file_path=file_path,
113
  media_type=mime_type,
114
  filename=filename,
115
- processing_time=job_record.processing_time or 0
 
116
  )
117
 
118
  async def _create_trimmed_audio(self, job_record: Any,
119
  start_seconds: Optional[float],
120
  end_seconds: Optional[float]) -> str:
121
- """Create trimmed audio file and return its path."""
122
  # Create deterministic output path
123
- temp_path = await self.file_repository.create_deterministic_temp_path(
124
  job_id=job_record.id,
125
  start_seconds=start_seconds,
126
  end_seconds=end_seconds,
@@ -128,26 +138,24 @@ class DownloadAudioResultUseCase:
128
  )
129
 
130
  # Check if trimmed file already exists
131
- if await self.file_repository.file_exists(temp_path):
132
  logger.info(f"Reusing existing trimmed audio for job {job_record.id}: "
133
  f"start={start_seconds}, end={end_seconds}")
134
- return temp_path
135
 
136
  logger.info(f"Creating new trimmed audio for job {job_record.id}: "
137
- f"start={start_seconds}, end={end_seconds}")
138
 
139
- # Trim audio using FFmpeg
140
- result = await self.ffmpeg_service.trim_audio(
141
- input_path=job_record.output_path,
142
- output_path=temp_path,
 
143
  start_seconds=start_seconds,
144
  end_seconds=end_seconds
145
  )
146
 
147
- if not result.success:
148
- raise RuntimeError(f"Audio trimming failed: {result.error}")
149
-
150
- return temp_path
151
 
152
  def _extract_duration_from_media_info(self, media_info: dict) -> Optional[float]:
153
  """Extract duration from ffprobe media info."""
 
6
 
7
  from domain.exceptions.domain_exceptions import JobNotFoundError, JobNotCompletedError, ValidationError
8
  from domain.services.validation_service import ValidationService
9
+ from infrastructure.services.local_file_processor import LocalFileProcessor
10
  from ..dto.extraction_response import DownloadResultDTO
11
 
12
  logger = logging.getLogger(__name__)
 
54
  self.validation_service = validation_service
55
  self.ffmpeg_service = ffmpeg_service
56
  self.audio_mime_types = audio_mime_types
57
+
58
 
59
  async def execute(self, job_id: str,
60
  start_seconds: Optional[float] = None,
 
79
 
80
  # Determine if trimming is needed
81
  needs_trimming = start_seconds is not None or end_seconds is not None
82
+
83
  if needs_trimming:
84
+ # Get local path for media info analysis
85
+ local_original_path = await self.file_repository.get_local_path(job_record.output_path)
 
 
 
 
86
 
87
+ try:
88
+ # Get audio duration for validation
89
+ media_info = await self.ffmpeg_service.get_media_info(local_original_path)
90
+ audio_duration = self._extract_duration_from_media_info(media_info)
91
+
92
+ if audio_duration is None:
93
+ raise RuntimeError(f"Could not determine audio duration for job {job_id}")
94
+
95
+ # Validate time range
96
+ self.validation_service.validate_time_range(
97
+ start_seconds, end_seconds, audio_duration
98
+ )
99
+
100
+ # Create trimmed audio
101
+ storage_key = await self._create_trimmed_audio(
102
+ job_record, start_seconds, end_seconds
103
+ )
104
+
105
+ # Get local path for download
106
+ file_path = await self.file_repository.get_local_path(storage_key)
107
+
108
+ finally:
109
+ # Clean up the original local file
110
+ await self.file_repository.cleanup_local_path(local_original_path, job_record.output_path)
111
  else:
112
+ # Use original file - get local path for download
113
+ file_path = await self.file_repository.get_local_path(job_record.output_path)
114
+ storage_key = job_record.output_path
 
 
 
 
 
115
 
116
+ # Get MIME type and filename
117
+ mime_type = self.audio_mime_types.get(job_record.output_format, 'application/octet-stream')
118
  filename = self._create_filename(job_record, start_seconds, end_seconds)
119
 
120
  return DownloadResultDTO(
121
  file_path=file_path,
122
  media_type=mime_type,
123
  filename=filename,
124
+ processing_time=job_record.processing_time or 0,
125
+ storage_key=storage_key # Add this for cleanup later
126
  )
127
 
128
  async def _create_trimmed_audio(self, job_record: Any,
129
  start_seconds: Optional[float],
130
  end_seconds: Optional[float]) -> str:
131
+ """Create trimmed audio file and return its storage key."""
132
  # Create deterministic output path
133
+ output_storage_key = await self.file_repository.create_deterministic_temp_path(
134
  job_id=job_record.id,
135
  start_seconds=start_seconds,
136
  end_seconds=end_seconds,
 
138
  )
139
 
140
  # Check if trimmed file already exists
141
+ if await self.file_repository.file_exists(output_storage_key):
142
  logger.info(f"Reusing existing trimmed audio for job {job_record.id}: "
143
  f"start={start_seconds}, end={end_seconds}")
144
+ return output_storage_key
145
 
146
  logger.info(f"Creating new trimmed audio for job {job_record.id}: "
147
+ f"start={start_seconds}, end={end_seconds}")
148
 
149
+ # Use file processor for trimming with local files
150
+ await self.file_processor.process_with_ffmpeg(
151
+ input_storage_key=job_record.output_path,
152
+ output_storage_key=output_storage_key,
153
+ ffmpeg_func=self.ffmpeg_service.trim_audio,
154
  start_seconds=start_seconds,
155
  end_seconds=end_seconds
156
  )
157
 
158
+ return output_storage_key
 
 
 
159
 
160
  def _extract_duration_from_media_info(self, media_info: dict) -> Optional[float]:
161
  """Extract duration from ffprobe media info."""
interfaces/api/routes/job_routes.py CHANGED
@@ -1,10 +1,10 @@
1
  """Job management API routes."""
2
- from fastapi import APIRouter, Depends, HTTPException, Request, Path, Query
3
  from fastapi.responses import FileResponse
4
  from typing import Any, Optional
5
 
6
- from ..dependencies import UseCases
7
- from ..responses import JobStatusResponse, ErrorResponse
8
  from domain.exceptions.domain_exceptions import JobNotFoundError, JobNotCompletedError, ValidationError
9
 
10
  router = APIRouter()
@@ -65,7 +65,9 @@ async def get_job_status(
65
  404: {"description": "Job not found"}
66
  })
67
  async def download_job_result(
 
68
  use_cases: UseCases,
 
69
  job_id: str = Path(..., description="The job ID of the completed extraction"),
70
  start: Optional[str] = Query(
71
  None,
@@ -94,7 +96,8 @@ async def download_job_result(
94
  job_id, start_seconds, end_seconds
95
  )
96
 
97
- return FileResponse(
 
98
  path=result.file_path,
99
  media_type=result.media_type,
100
  filename=result.filename,
@@ -103,6 +106,19 @@ async def download_job_result(
103
  "X-Processing-Time": str(result.processing_time)
104
  }
105
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
106
  except JobNotFoundError as e:
107
  raise HTTPException(404, str(e))
108
  except JobNotCompletedError as e:
 
1
  """Job management API routes."""
2
+ from fastapi import APIRouter, HTTPException, Path, Query, BackgroundTasks
3
  from fastapi.responses import FileResponse
4
  from typing import Any, Optional
5
 
6
+ from ..dependencies import Services, UseCases
7
+ from ..responses import JobStatusResponse
8
  from domain.exceptions.domain_exceptions import JobNotFoundError, JobNotCompletedError, ValidationError
9
 
10
  router = APIRouter()
 
65
  404: {"description": "Job not found"}
66
  })
67
  async def download_job_result(
68
+ background_tasks: BackgroundTasks,
69
  use_cases: UseCases,
70
+ services: Services,
71
  job_id: str = Path(..., description="The job ID of the completed extraction"),
72
  start: Optional[str] = Query(
73
  None,
 
96
  job_id, start_seconds, end_seconds
97
  )
98
 
99
+ # Create file response
100
+ response = FileResponse(
101
  path=result.file_path,
102
  media_type=result.media_type,
103
  filename=result.filename,
 
106
  "X-Processing-Time": str(result.processing_time)
107
  }
108
  )
109
+
110
+ # Schedule cleanup for remote storage (R2)
111
+ # Check if this is remote storage by seeing if the repository has R2-specific methods
112
+ if hasattr(services.file_repository, '_get_client') and result.storage_key:
113
+ # This is R2 storage, schedule cleanup of the local temp file
114
+ background_tasks.add_task(
115
+ services.file_repository.cleanup_local_path,
116
+ result.file_path,
117
+ result.storage_key
118
+ )
119
+
120
+ return response
121
+
122
  except JobNotFoundError as e:
123
  raise HTTPException(404, str(e))
124
  except JobNotCompletedError as e: