Tadeas Kosek commited on
Commit
37e59a0
·
1 Parent(s): d369cf2

add r2 file repository

Browse files
app.py CHANGED
@@ -34,6 +34,7 @@ async def lifespan(app: FastAPI):
34
 
35
  # Startup
36
  logger.info(f"Starting {settings.app_name} v{settings.app_version}")
 
37
 
38
  # Initialize containers
39
  service_container = ServiceContainer.get_instance()
 
34
 
35
  # Startup
36
  logger.info(f"Starting {settings.app_name} v{settings.app_version}")
37
+ logger.info(f"Storage type: {settings.storage_type}")
38
 
39
  # Initialize containers
40
  service_container = ServiceContainer.get_instance()
application/use_cases/extract_audio_async.py CHANGED
@@ -91,6 +91,49 @@ class ExtractAudioAsyncUseCase:
91
  file_size_mb=job.file_size.megabytes
92
  )
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO):
95
  """Process job in background."""
96
  try:
 
91
  file_size_mb=job.file_size.megabytes
92
  )
93
 
94
+ async def execute_with_job(self, job: Job, request: ExtractionRequestDTO,
95
+ background_tasks: BackgroundTaskRunner) -> JobCreationDTO:
96
+ """Execute with a pre-created job."""
97
+
98
+ # Create domain objects for validation
99
+ video = Video(
100
+ filename=request.video_filename,
101
+ file_path=request.video_file_path,
102
+ size=FileSize(request.video_file_size),
103
+ content_type=request.content_type
104
+ )
105
+
106
+ # Validate request
107
+ self.validation_service.validate_extraction_request(
108
+ video, request.output_format, request.quality
109
+ )
110
+
111
+ # Save job to repository (job already created)
112
+ await self.job_repository.create(
113
+ job_id=job.id,
114
+ filename=job.video_filename,
115
+ file_size_mb=job.file_size.megabytes,
116
+ output_format=job.output_format.value,
117
+ quality=job.quality.value
118
+ )
119
+
120
+ # Queue background processing
121
+ background_tasks.add_task(
122
+ self._process_job_background,
123
+ job.id,
124
+ request
125
+ )
126
+
127
+ logger.info(f"Created async job {job.id} for {video.filename}")
128
+
129
+ return JobCreationDTO(
130
+ job_id=job.id,
131
+ status=job.status.value,
132
+ message=f"Processing large file ({job.file_size.megabytes:.1f} MB)",
133
+ check_url=f"/api/v1/jobs/{job.id}",
134
+ file_size_mb=job.file_size.megabytes
135
+ )
136
+
137
  async def _process_job_background(self, job_id: str, request: ExtractionRequestDTO):
138
  """Process job in background."""
139
  try:
application/use_cases/process_job.py CHANGED
@@ -8,6 +8,7 @@ from domain.value_objects.audio_quality import AudioQuality
8
  from domain.value_objects.file_size import FileSize
9
  from domain.entities.video import Video
10
  from domain.entities.audio import Audio
 
11
 
12
  from ..dto.extraction_request import ExtractionRequestDTO
13
 
@@ -38,76 +39,56 @@ class FileRepository(Protocol):
38
  class ProcessJobUseCase:
39
  """Use case for processing a queued extraction job."""
40
 
41
- def __init__(self,
42
- job_repository: JobRepository,
43
- ffmpeg_service: FFmpegService,
44
- file_repository: FileRepository):
45
- self.job_repository = job_repository
46
- self.ffmpeg_service = ffmpeg_service
47
  self.file_repository = file_repository
 
 
 
48
 
49
  async def execute(self, job_id: str, request: ExtractionRequestDTO):
50
- """Process the extraction job."""
51
  start_time = time.time()
52
- output_path = None
53
 
54
  try:
55
- # Update job status to processing
56
  await self.job_repository.update_status(job_id, "processing")
57
 
58
- # Create domain objects
59
- video = Video(
60
- filename=request.video_filename,
61
- file_path=request.video_file_path,
62
- size=FileSize(request.video_file_size),
63
- content_type=request.content_type
64
- )
65
-
66
- audio_format = AudioFormat(request.output_format)
67
- audio_quality = AudioQuality(request.quality)
68
-
69
  # Create output path
70
- output_path = await self.file_repository.create_output_path(
71
- job_id, audio_format.value
 
72
  )
73
 
74
- # Extract audio
75
- logger.info(f"Processing job {job_id}: {video.filename} -> {audio_format.value}")
76
-
77
- result = await self.ffmpeg_service.extract_audio(
78
- video.file_path,
79
- output_path,
80
- audio_format.value,
81
- audio_quality.value
82
  )
83
 
84
- if not result.success:
85
- raise Exception(f"FFmpeg extraction failed: {result.error}")
86
-
87
  # Calculate processing time
88
  processing_time = time.time() - start_time
89
 
90
  # Update job as completed
91
  await self.job_repository.update_status(
92
- job_id=job_id,
93
- status="completed",
94
- output_path=output_path,
95
  processing_time=processing_time
96
  )
97
 
98
- logger.info(f"Job {job_id} completed in {processing_time:.2f}s")
99
 
100
  except Exception as e:
101
- # Clean up output file on error
102
- if output_path:
103
- await self.file_repository.delete_file(output_path)
104
 
105
- # Update job as failed
106
  await self.job_repository.update_status(
107
- job_id=job_id,
108
- status="failed",
109
- error=str(e)
 
110
  )
111
-
112
- logger.error(f"Job {job_id} failed: {str(e)}")
113
  raise
 
8
  from domain.value_objects.file_size import FileSize
9
  from domain.entities.video import Video
10
  from domain.entities.audio import Audio
11
+ from infrastructure.services.local_file_processor import LocalFileProcessor
12
 
13
  from ..dto.extraction_request import ExtractionRequestDTO
14
 
 
39
  class ProcessJobUseCase:
40
  """Use case for processing a queued extraction job."""
41
 
42
+ def __init__(self, file_repository, ffmpeg_service, job_repository):
 
 
 
 
 
43
  self.file_repository = file_repository
44
+ self.ffmpeg_service = ffmpeg_service
45
+ self.job_repository = job_repository
46
+ self.file_processor = LocalFileProcessor(file_repository)
47
 
48
  async def execute(self, job_id: str, request: ExtractionRequestDTO):
49
+ """Execute job processing with local file handling."""
50
  start_time = time.time()
 
51
 
52
  try:
53
+ # Update job status
54
  await self.job_repository.update_status(job_id, "processing")
55
 
 
 
 
 
 
 
 
 
 
 
 
56
  # Create output path
57
+ output_key = await self.file_repository.create_output_path(
58
+ job_id,
59
+ request.output_format
60
  )
61
 
62
+ # Process with local files
63
+ await self.file_processor.process_with_ffmpeg(
64
+ input_storage_key=request.video_file_path,
65
+ output_storage_key=output_key,
66
+ ffmpeg_func=self.ffmpeg_service.extract_audio, # Remove _async
67
+ format=request.output_format, # Change from output_format to format
68
+ quality=request.quality
 
69
  )
70
 
 
 
 
71
  # Calculate processing time
72
  processing_time = time.time() - start_time
73
 
74
  # Update job as completed
75
  await self.job_repository.update_status(
76
+ job_id,
77
+ "completed",
78
+ output_path=output_key,
79
  processing_time=processing_time
80
  )
81
 
82
+ logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
83
 
84
  except Exception as e:
85
+ processing_time = time.time() - start_time
86
+ logger.error(f"Job {job_id} failed after {processing_time:.2f} seconds: {str(e)}")
 
87
 
 
88
  await self.job_repository.update_status(
89
+ job_id,
90
+ "failed",
91
+ error=str(e),
92
+ processing_time=processing_time
93
  )
 
 
94
  raise
infrastructure/config/settings.py CHANGED
@@ -2,7 +2,7 @@
2
  from pydantic_settings import BaseSettings
3
  from pydantic import Field
4
  from pathlib import Path
5
- from typing import List, Dict, Any
6
  import os
7
 
8
  class Settings(BaseSettings):
@@ -13,12 +13,21 @@ class Settings(BaseSettings):
13
  app_version: str = "1.0.0"
14
  debug: bool = Field(default=False, env="DEBUG")
15
 
 
 
 
16
  # File processing settings
17
  temp_dir: Path = Field(default=Path("/tmp/audio_extractor"), env="TEMP_DIR")
18
  max_direct_file_size_mb: float = Field(default=10.0, env="MAX_DIRECT_FILE_SIZE_MB")
19
  cleanup_interval_seconds: int = Field(default=3600, env="CLEANUP_INTERVAL_SECONDS")
20
  file_retention_hours: int = Field(default=2, env="FILE_RETENTION_HOURS")
21
 
 
 
 
 
 
 
22
  # FFmpeg settings
23
  ffmpeg_path: str = Field(default="/usr/bin/ffmpeg", env="FFMPEG_PATH")
24
  ffmpeg_timeout_seconds: int = Field(default=1800, env="FFMPEG_TIMEOUT_SECONDS") # 30 minutes
@@ -84,8 +93,34 @@ class Settings(BaseSettings):
84
 
85
  def __init__(self, **kwargs):
86
  super().__init__(**kwargs)
87
- # Ensure temp directory exists
88
- self.temp_dir.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  # Singleton instance
91
  settings = Settings()
 
2
  from pydantic_settings import BaseSettings
3
  from pydantic import Field
4
  from pathlib import Path
5
+ from typing import List, Dict, Any, Optional
6
  import os
7
 
8
  class Settings(BaseSettings):
 
13
  app_version: str = "1.0.0"
14
  debug: bool = Field(default=False, env="DEBUG")
15
 
16
+ # Storage configuration
17
+ storage_type: str = Field(default="filesystem", env="STORAGE_TYPE") # "filesystem" or "r2"
18
+
19
  # File processing settings
20
  temp_dir: Path = Field(default=Path("/tmp/audio_extractor"), env="TEMP_DIR")
21
  max_direct_file_size_mb: float = Field(default=10.0, env="MAX_DIRECT_FILE_SIZE_MB")
22
  cleanup_interval_seconds: int = Field(default=3600, env="CLEANUP_INTERVAL_SECONDS")
23
  file_retention_hours: int = Field(default=2, env="FILE_RETENTION_HOURS")
24
 
25
+ # Cloudflare R2 storage settings (optional, only needed if storage_type == "r2")
26
+ cloudflare_r2_account_id: Optional[str] = Field(default=None, env="CLOUDFLARE_R2_ACCOUNT_ID")
27
+ cloudflare_r2_access_key_id: Optional[str] = Field(default=None, env="CLOUDFLARE_R2_ACCESS_KEY_ID")
28
+ cloudflare_r2_secret_access_key: Optional[str] = Field(default=None, env="CLOUDFLARE_R2_SECRET_ACCESS_KEY")
29
+ cloudflare_r2_bucket_name: Optional[str] = Field(default=None, env="CLOUDFLARE_R2_BUCKET_NAME")
30
+
31
  # FFmpeg settings
32
  ffmpeg_path: str = Field(default="/usr/bin/ffmpeg", env="FFMPEG_PATH")
33
  ffmpeg_timeout_seconds: int = Field(default=1800, env="FFMPEG_TIMEOUT_SECONDS") # 30 minutes
 
93
 
94
  def __init__(self, **kwargs):
95
  super().__init__(**kwargs)
96
+ # Ensure temp directory exists (only for filesystem storage)
97
+ if self.storage_type.lower() == "filesystem":
98
+ self.temp_dir.mkdir(parents=True, exist_ok=True)
99
+
100
+ # Validate R2 configuration if R2 storage is selected
101
+ if self.storage_type.lower() == "r2":
102
+ self._validate_r2_config()
103
+
104
+ def _validate_r2_config(self):
105
+ """Validate R2 configuration when R2 storage is selected."""
106
+ required_fields = [
107
+ 'cloudflare_r2_account_id',
108
+ 'cloudflare_r2_access_key_id',
109
+ 'cloudflare_r2_secret_access_key',
110
+ 'cloudflare_r2_bucket_name'
111
+ ]
112
+
113
+ missing_fields = []
114
+ for field in required_fields:
115
+ value = getattr(self, field)
116
+ if not value:
117
+ missing_fields.append(field.upper())
118
+
119
+ if missing_fields:
120
+ raise ValueError(
121
+ f"R2 storage selected but missing required environment variables: "
122
+ f"{', '.join(missing_fields)}"
123
+ )
124
 
125
  # Singleton instance
126
  settings = Settings()
infrastructure/providers/__init__.py ADDED
File without changes
infrastructure/providers/file_storage_provider.py ADDED
@@ -0,0 +1,167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """File storage provider factory."""
2
+ from abc import ABC, abstractmethod
3
+ from pathlib import Path
4
+ from typing import Protocol, Union
5
+ from enum import Enum
6
+
7
+ from ..repositories.file_repository import FileSystemRepository
8
+ from ..repositories.r2_file_repository import R2Repository
9
+
10
+
11
+ class StorageType(Enum):
12
+ """Available storage types."""
13
+ FILESYSTEM = "filesystem"
14
+ R2 = "r2"
15
+
16
+
17
+ class FileStorageRepository(Protocol):
18
+ """Protocol defining the file storage interface."""
19
+
20
+ async def save_uploaded_file(self, content: bytes, original_filename: str, job_id: str = None) -> str:
21
+ ...
22
+
23
+ async def save_stream(self, stream, original_filename: str, job_id: str = None, chunk_size: int = 1024 * 1024) -> str:
24
+ ...
25
+
26
+ async def create_output_path(self, job_id: str, format: str) -> str:
27
+ ...
28
+
29
+ async def create_temp_path(self, prefix: str, extension: str) -> str:
30
+ ...
31
+
32
+ async def create_deterministic_temp_path(self, job_id: str,
33
+ start_seconds: float = None,
34
+ end_seconds: float = None,
35
+ extension: str = None) -> str:
36
+ ...
37
+
38
+ async def read_file(self, file_path: str) -> bytes:
39
+ ...
40
+
41
+ async def file_exists(self, file_path: str) -> bool:
42
+ ...
43
+
44
+ async def get_file_size(self, file_path: str) -> int:
45
+ ...
46
+
47
+ async def delete_file(self, file_path: str) -> bool:
48
+ ...
49
+
50
+ async def cleanup_old_files(self, older_than_hours: int) -> int:
51
+ ...
52
+
53
+ # New methods for local file access
54
+ async def get_local_path(self, file_key: str) -> str:
55
+ """Get a local file path for processing. Downloads if remote storage."""
56
+ ...
57
+
58
+ async def save_local_file_to_storage(self, local_path: str, storage_key: str) -> str:
59
+ """Save a local file to storage and return the storage key."""
60
+ ...
61
+
62
+ async def cleanup_local_path(self, local_path: str, storage_key: str) -> bool:
63
+ """Clean up local temp file. No-op for filesystem storage."""
64
+ ...
65
+
66
+
67
+ class FileStorageConfig(ABC):
68
+ """Base configuration for file storage."""
69
+ pass
70
+
71
+
72
+ class FileSystemConfig(FileStorageConfig):
73
+ """Configuration for filesystem storage."""
74
+
75
+ def __init__(self, base_path: Union[str, Path]):
76
+ self.base_path = Path(base_path)
77
+
78
+
79
+ class R2Config(FileStorageConfig):
80
+ """Configuration for Cloudflare R2 storage."""
81
+
82
+ def __init__(self, account_id: str, access_key_id: str, secret_access_key: str, bucket_name: str):
83
+ self.account_id = account_id
84
+ self.access_key_id = access_key_id
85
+ self.secret_access_key = secret_access_key
86
+ self.bucket_name = bucket_name
87
+
88
+ # Validate required fields
89
+ if not all([account_id, access_key_id, secret_access_key, bucket_name]):
90
+ raise ValueError("All R2 configuration fields are required")
91
+
92
+
93
+ class FileStorageProvider:
94
+ """Factory for creating file storage repositories."""
95
+
96
+ @staticmethod
97
+ def create_filesystem_storage(config: FileSystemConfig) -> FileSystemRepository:
98
+ """Create a filesystem storage repository."""
99
+ return FileSystemRepository(base_path=config.base_path)
100
+
101
+ @staticmethod
102
+ def create_r2_storage(config: R2Config) -> R2Repository:
103
+ """Create an R2 storage repository."""
104
+ return R2Repository(
105
+ account_id=config.account_id,
106
+ access_key_id=config.access_key_id,
107
+ secret_access_key=config.secret_access_key,
108
+ bucket_name=config.bucket_name
109
+ )
110
+
111
+ @staticmethod
112
+ def create_storage(storage_type: StorageType, config: FileStorageConfig) -> FileStorageRepository:
113
+ """Create a file storage repository based on type and configuration."""
114
+ if storage_type == StorageType.FILESYSTEM:
115
+ if not isinstance(config, FileSystemConfig):
116
+ raise ValueError("FileSystemConfig required for filesystem storage")
117
+ return FileStorageProvider.create_filesystem_storage(config)
118
+
119
+ elif storage_type == StorageType.R2:
120
+ if not isinstance(config, R2Config):
121
+ raise ValueError("R2Config required for R2 storage")
122
+ return FileStorageProvider.create_r2_storage(config)
123
+
124
+ else:
125
+ raise ValueError(f"Unsupported storage type: {storage_type}")
126
+
127
+
128
+ # Convenience factory functions
129
+ def create_filesystem_storage(base_path: Union[str, Path]) -> FileSystemRepository:
130
+ """Convenience function to create filesystem storage."""
131
+ config = FileSystemConfig(base_path)
132
+ return FileStorageProvider.create_filesystem_storage(config)
133
+
134
+
135
+ def create_r2_storage(account_id: str, access_key_id: str,
136
+ secret_access_key: str, bucket_name: str) -> R2Repository:
137
+ """Convenience function to create R2 storage."""
138
+ config = R2Config(account_id, access_key_id, secret_access_key, bucket_name)
139
+ return FileStorageProvider.create_r2_storage(config)
140
+
141
+
142
+ def create_storage_from_settings(storage_type: str, **kwargs) -> FileStorageRepository:
143
+ """Create storage from string type and keyword arguments."""
144
+ try:
145
+ storage_enum = StorageType(storage_type.lower())
146
+ except ValueError:
147
+ raise ValueError(f"Invalid storage type: {storage_type}. Must be one of: {[t.value for t in StorageType]}")
148
+
149
+ if storage_enum == StorageType.FILESYSTEM:
150
+ if 'base_path' not in kwargs:
151
+ raise ValueError("base_path required for filesystem storage")
152
+ config = FileSystemConfig(kwargs['base_path'])
153
+
154
+ elif storage_enum == StorageType.R2:
155
+ required_fields = ['account_id', 'access_key_id', 'secret_access_key', 'bucket_name']
156
+ missing_fields = [field for field in required_fields if field not in kwargs]
157
+ if missing_fields:
158
+ raise ValueError(f"Missing required R2 configuration fields: {missing_fields}")
159
+
160
+ config = R2Config(
161
+ account_id=kwargs['account_id'],
162
+ access_key_id=kwargs['access_key_id'],
163
+ secret_access_key=kwargs['secret_access_key'],
164
+ bucket_name=kwargs['bucket_name']
165
+ )
166
+
167
+ return FileStorageProvider.create_storage(storage_enum, config)
infrastructure/repositories/file_repository.py CHANGED
@@ -1,4 +1,4 @@
1
- """File system repository implementation."""
2
  from pathlib import Path
3
  from typing import Optional, List, Tuple
4
  import aiofiles
@@ -12,18 +12,19 @@ import asyncio
12
  logger = logging.getLogger(__name__)
13
 
14
  class FileSystemRepository:
15
- """Repository for managing temporary files."""
16
 
17
  def __init__(self, base_path: Path):
18
  self.base_path = Path(base_path)
19
  self.base_path.mkdir(parents=True, exist_ok=True)
20
 
21
- async def save_uploaded_file(self, content: bytes, original_filename: str) -> str:
22
  """Save uploaded file and return the path."""
23
  # Generate unique filename
24
- file_id = str(uuid.uuid4())
 
25
  extension = Path(original_filename).suffix
26
- filename = f"{file_id}_input{extension}"
27
  file_path = self.base_path / filename
28
 
29
  async with aiofiles.open(file_path, 'wb') as f:
@@ -32,11 +33,12 @@ class FileSystemRepository:
32
  logger.info(f"Saved uploaded file: {file_path}")
33
  return str(file_path)
34
 
35
- async def save_stream(self, stream, original_filename: str, chunk_size: int = 1024 * 1024) -> str:
36
  """Save file from stream, handling both async and sync streams."""
37
- file_id = str(uuid.uuid4())
 
38
  extension = Path(original_filename).suffix
39
- filename = f"{file_id}_input{extension}"
40
  file_path = self.base_path / filename
41
 
42
  async with aiofiles.open(file_path, 'wb') as f:
@@ -94,15 +96,6 @@ class FileSystemRepository:
94
 
95
  This ensures that the same trimming parameters always result in the same filename,
96
  allowing for efficient reuse of previously trimmed files.
97
-
98
- Args:
99
- job_id: The job ID
100
- start_seconds: Start time in seconds (None for beginning)
101
- end_seconds: End time in seconds (None for end)
102
- extension: File extension (e.g., 'mp3')
103
-
104
- Returns:
105
- str: Deterministic file path
106
  """
107
  import hashlib
108
 
@@ -175,4 +168,34 @@ class FileSystemRepository:
175
  if deleted_count > 0:
176
  logger.info(f"Cleaned up {deleted_count} old files")
177
 
178
- return deleted_count
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """File system repository implementation with local path support."""
2
  from pathlib import Path
3
  from typing import Optional, List, Tuple
4
  import aiofiles
 
12
  logger = logging.getLogger(__name__)
13
 
14
  class FileSystemRepository:
15
+ """Repository for managing temporary files with local path support."""
16
 
17
  def __init__(self, base_path: Path):
18
  self.base_path = Path(base_path)
19
  self.base_path.mkdir(parents=True, exist_ok=True)
20
 
21
+ async def save_uploaded_file(self, content: bytes, original_filename: str, job_id: str = None) -> str:
22
  """Save uploaded file and return the path."""
23
  # Generate unique filename
24
+ if job_id is None:
25
+ job_id = str(uuid.uuid4())
26
  extension = Path(original_filename).suffix
27
+ filename = f"{job_id}_input{extension}"
28
  file_path = self.base_path / filename
29
 
30
  async with aiofiles.open(file_path, 'wb') as f:
 
33
  logger.info(f"Saved uploaded file: {file_path}")
34
  return str(file_path)
35
 
36
+ async def save_stream(self, stream, original_filename: str, job_id: str = None, chunk_size: int = 1024 * 1024) -> str:
37
  """Save file from stream, handling both async and sync streams."""
38
+ if job_id is None:
39
+ job_id = str(uuid.uuid4())
40
  extension = Path(original_filename).suffix
41
+ filename = f"{job_id}_input{extension}"
42
  file_path = self.base_path / filename
43
 
44
  async with aiofiles.open(file_path, 'wb') as f:
 
96
 
97
  This ensures that the same trimming parameters always result in the same filename,
98
  allowing for efficient reuse of previously trimmed files.
 
 
 
 
 
 
 
 
 
99
  """
100
  import hashlib
101
 
 
168
  if deleted_count > 0:
169
  logger.info(f"Cleaned up {deleted_count} old files")
170
 
171
+ return deleted_count
172
+
173
+ # Local file access methods (no-op for filesystem storage)
174
+ async def get_local_path(self, file_path: str) -> str:
175
+ """Return the file path directly since it's already local."""
176
+ if not await self.file_exists(file_path):
177
+ raise FileNotFoundError(f"File not found: {file_path}")
178
+ return file_path
179
+
180
+ async def save_local_file_to_storage(self, local_path: str, storage_path: str) -> str:
181
+ """Copy local file to storage path (filesystem to filesystem)."""
182
+ try:
183
+ # Read source file
184
+ content = await self.read_file(local_path)
185
+
186
+ # Write to destination
187
+ async with aiofiles.open(storage_path, 'wb') as f:
188
+ await f.write(content)
189
+
190
+ logger.debug(f"Copied local file {local_path} to {storage_path}")
191
+ return storage_path
192
+
193
+ except Exception as e:
194
+ logger.error(f"Failed to copy local file {local_path} to {storage_path}: {e}")
195
+ raise
196
+
197
+ async def cleanup_local_path(self, local_path: str, storage_path: str) -> bool:
198
+ """No-op for filesystem storage since local_path == storage_path."""
199
+ # For filesystem storage, the local path and storage path are the same,
200
+ # so we don't need to clean up anything extra
201
+ return True
infrastructure/repositories/r2_file_repository.py ADDED
@@ -0,0 +1,411 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Cloudflare R2 repository implementation."""
2
+ import asyncio
3
+ import hashlib
4
+ import inspect
5
+ import json
6
+ import logging
7
+ import os
8
+ import tempfile
9
+ import uuid
10
+ from datetime import datetime, timedelta
11
+ from io import BytesIO
12
+ from pathlib import Path
13
+ from typing import List, Optional, Tuple
14
+
15
+ import aioboto3
16
+ from botocore.config import Config
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+
21
+ class R2Repository:
22
+ """Repository for managing files in Cloudflare R2."""
23
+
24
+ def __init__(self, account_id: str, access_key_id: str, secret_access_key: str, bucket_name: str):
25
+ self.account_id = account_id
26
+ self.access_key_id = access_key_id
27
+ self.secret_access_key = secret_access_key
28
+ self.bucket_name = bucket_name
29
+ self.endpoint_url = f"https://{account_id}.r2.cloudflarestorage.com"
30
+
31
+ # Create session configuration
32
+ self.config = Config(
33
+ signature_version='s3v4',
34
+ retries={'max_attempts': 3, 'mode': 'adaptive'}
35
+ )
36
+
37
+ def _get_client(self):
38
+ """Get R2 client using aioboto3."""
39
+ session = aioboto3.Session()
40
+ return session.client(
41
+ 's3',
42
+ endpoint_url=self.endpoint_url,
43
+ aws_access_key_id=self.access_key_id,
44
+ aws_secret_access_key=self.secret_access_key,
45
+ config=self.config,
46
+ region_name='auto'
47
+ )
48
+
49
+ def _generate_job_key(self, job_id: str, filename: str) -> str:
50
+ """Generate R2 object key for job-specific file."""
51
+ return f"jobs/{job_id}/{filename}"
52
+
53
+ async def save_uploaded_file(self, content: bytes, original_filename: str, job_id: str = None) -> str:
54
+ """Save uploaded file and return the R2 object key."""
55
+ # Generate unique job ID if not provided
56
+ if job_id is None:
57
+ job_id = str(uuid.uuid4())
58
+ extension = Path(original_filename).suffix
59
+ filename = f"input{extension}"
60
+ object_key = self._generate_job_key(job_id, filename)
61
+
62
+ async with self._get_client() as s3:
63
+ await s3.put_object(
64
+ Bucket=self.bucket_name,
65
+ Key=object_key,
66
+ Body=content,
67
+ Metadata={
68
+ 'original_filename': original_filename,
69
+ 'upload_time': datetime.utcnow().isoformat(),
70
+ 'job_id': job_id
71
+ }
72
+ )
73
+
74
+ logger.info(f"Saved uploaded file to R2: {object_key}")
75
+ return object_key
76
+
77
+ async def save_stream(self, stream, original_filename: str, job_id: str = None, chunk_size: int = 1024 * 1024) -> str:
78
+ """Save file from stream to R2."""
79
+ if job_id is None:
80
+ job_id = str(uuid.uuid4())
81
+ extension = Path(original_filename).suffix
82
+ filename = f"input{extension}"
83
+ object_key = self._generate_job_key(job_id, filename)
84
+
85
+ # Buffer the stream content
86
+ buffer = BytesIO()
87
+ read_method = getattr(stream, 'read', None)
88
+
89
+ if read_method is None:
90
+ raise ValueError("Provided stream does not have a .read() method")
91
+
92
+ is_async = inspect.iscoroutinefunction(read_method)
93
+
94
+ # Read stream into buffer
95
+ while True:
96
+ if is_async:
97
+ chunk = await read_method(chunk_size)
98
+ else:
99
+ chunk = await asyncio.to_thread(read_method, chunk_size)
100
+
101
+ if not chunk:
102
+ break
103
+
104
+ buffer.write(chunk)
105
+
106
+ # Upload to R2
107
+ buffer.seek(0)
108
+ async with self._get_client() as s3:
109
+ await s3.put_object(
110
+ Bucket=self.bucket_name,
111
+ Key=object_key,
112
+ Body=buffer.getvalue(),
113
+ Metadata={
114
+ 'original_filename': original_filename,
115
+ 'upload_time': datetime.utcnow().isoformat(),
116
+ 'job_id': job_id
117
+ }
118
+ )
119
+
120
+ logger.info(f"Saved streamed file to R2: {object_key}")
121
+ return object_key
122
+
123
+ async def create_output_path(self, job_id: str, format: str) -> str:
124
+ """Create an R2 object key for output file."""
125
+ filename = f"output.{format}"
126
+ return self._generate_job_key(job_id, filename)
127
+
128
+ async def create_temp_path(self, prefix: str, extension: str) -> str:
129
+ """Create a unique temporary R2 object key."""
130
+ # Generate unique identifier
131
+ temp_id = str(uuid.uuid4())[:8]
132
+ job_id = str(uuid.uuid4())
133
+
134
+ # Clean the prefix to be filename-safe
135
+ safe_prefix = "".join(c for c in prefix if c.isalnum() or c in ('-', '_'))
136
+
137
+ # Ensure extension doesn't start with a dot
138
+ clean_extension = extension.lstrip('.')
139
+
140
+ # Create filename
141
+ filename = f"{safe_prefix}_{temp_id}.{clean_extension}"
142
+ return self._generate_job_key(job_id, filename)
143
+
144
+ async def create_deterministic_temp_path(self, job_id: str,
145
+ start_seconds: Optional[float],
146
+ end_seconds: Optional[float],
147
+ extension: str) -> str:
148
+ """Create a deterministic R2 object key based on parameters."""
149
+ # Create a unique string from the parameters
150
+ start_str = str(start_seconds) if start_seconds is not None else "start"
151
+ end_str = str(end_seconds) if end_seconds is not None else "end"
152
+
153
+ # Create hash input
154
+ hash_input = f"{job_id}_{start_str}_{end_str}"
155
+
156
+ # Generate MD5 hash
157
+ hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8]
158
+
159
+ # Clean extension
160
+ clean_extension = extension.lstrip('.')
161
+
162
+ # Create deterministic filename
163
+ filename = f"trim_{hash_suffix}.{clean_extension}"
164
+ object_key = self._generate_job_key(job_id, filename)
165
+
166
+ logger.debug(f"Created deterministic R2 key: {object_key} for params: {hash_input}")
167
+ return object_key
168
+
169
+ async def read_file(self, object_key: str) -> bytes:
170
+ """Read file content from R2."""
171
+ async with self._get_client() as s3:
172
+ try:
173
+ response = await s3.get_object(Bucket=self.bucket_name, Key=object_key)
174
+ content = await response['Body'].read()
175
+ return content
176
+ except Exception as e:
177
+ logger.error(f"Failed to read file from R2: {object_key}, error: {e}")
178
+ raise
179
+
180
+ async def file_exists(self, object_key: str) -> bool:
181
+ """Check if object exists in R2."""
182
+ async with self._get_client() as s3:
183
+ try:
184
+ await s3.head_object(Bucket=self.bucket_name, Key=object_key)
185
+ return True
186
+ except s3.exceptions.NoSuchKey:
187
+ return False
188
+ except Exception as e:
189
+ logger.error(f"Error checking if file exists: {object_key}, error: {e}")
190
+ return False
191
+
192
+ async def get_file_size(self, object_key: str) -> int:
193
+ """Get file size in bytes from R2."""
194
+ async with self._get_client() as s3:
195
+ try:
196
+ response = await s3.head_object(Bucket=self.bucket_name, Key=object_key)
197
+ return response['ContentLength']
198
+ except Exception as e:
199
+ logger.error(f"Failed to get file size: {object_key}, error: {e}")
200
+ raise
201
+
202
+ async def delete_file(self, object_key: str) -> bool:
203
+ """Delete an object from R2."""
204
+ async with self._get_client() as s3:
205
+ try:
206
+ await s3.delete_object(Bucket=self.bucket_name, Key=object_key)
207
+ logger.info(f"Deleted file from R2: {object_key}")
208
+ return True
209
+ except Exception as e:
210
+ logger.error(f"Failed to delete file from R2: {object_key}, error: {e}")
211
+ return False
212
+
213
+ async def delete_job_files(self, job_id: str) -> int:
214
+ """Delete all files for a specific job."""
215
+ prefix = f"jobs/{job_id}/"
216
+ deleted_count = 0
217
+
218
+ async with self._get_client() as s3:
219
+ try:
220
+ # List all objects with the job prefix
221
+ paginator = s3.get_paginator('list_objects_v2')
222
+ async for page in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
223
+ if 'Contents' in page:
224
+ # Delete objects in batches
225
+ objects_to_delete = [{'Key': obj['Key']} for obj in page['Contents']]
226
+
227
+ if objects_to_delete:
228
+ delete_response = await s3.delete_objects(
229
+ Bucket=self.bucket_name,
230
+ Delete={'Objects': objects_to_delete}
231
+ )
232
+ deleted_count += len(delete_response.get('Deleted', []))
233
+
234
+ if deleted_count > 0:
235
+ logger.info(f"Deleted {deleted_count} files for job {job_id}")
236
+
237
+ except Exception as e:
238
+ logger.error(f"Failed to delete job files for {job_id}: {e}")
239
+
240
+ return deleted_count
241
+
242
+ async def save_job_metadata(self, job_id: str, metadata: dict) -> str:
243
+ """Save job metadata as JSON."""
244
+ metadata_key = self._generate_job_key(job_id, "metadata.json")
245
+ metadata_with_timestamp = {
246
+ **metadata,
247
+ 'created_at': datetime.utcnow().isoformat(),
248
+ 'job_id': job_id
249
+ }
250
+
251
+ async with self._get_client() as s3:
252
+ await s3.put_object(
253
+ Bucket=self.bucket_name,
254
+ Key=metadata_key,
255
+ Body=json.dumps(metadata_with_timestamp, indent=2),
256
+ ContentType='application/json'
257
+ )
258
+
259
+ logger.debug(f"Saved metadata for job {job_id}")
260
+ return metadata_key
261
+
262
+ async def get_job_metadata(self, job_id: str) -> Optional[dict]:
263
+ """Get job metadata."""
264
+ metadata_key = self._generate_job_key(job_id, "metadata.json")
265
+
266
+ try:
267
+ content = await self.read_file(metadata_key)
268
+ return json.loads(content.decode('utf-8'))
269
+ except Exception as e:
270
+ logger.debug(f"No metadata found for job {job_id}: {e}")
271
+ return None
272
+
273
+ async def list_old_files(self, older_than_hours: int) -> List[Tuple[str, datetime]]:
274
+ """List jobs older than specified hours by checking metadata."""
275
+ cutoff_time = datetime.utcnow() - timedelta(hours=older_than_hours)
276
+ old_jobs = []
277
+
278
+ async with self._get_client() as s3:
279
+ try:
280
+ # List all metadata files
281
+ paginator = s3.get_paginator('list_objects_v2')
282
+ async for page in paginator.paginate(
283
+ Bucket=self.bucket_name,
284
+ Prefix="jobs/",
285
+ Delimiter="/"
286
+ ):
287
+ # Get job directories
288
+ if 'CommonPrefixes' in page:
289
+ for prefix_info in page['CommonPrefixes']:
290
+ job_prefix = prefix_info['Prefix']
291
+ # Extract job_id from prefix like "jobs/uuid/"
292
+ job_id = job_prefix.split('/')[1]
293
+
294
+ # Check if this job has metadata and if it's old
295
+ metadata = await self.get_job_metadata(job_id)
296
+ if metadata and 'created_at' in metadata:
297
+ created_time = datetime.fromisoformat(metadata['created_at'])
298
+ if created_time < cutoff_time:
299
+ old_jobs.append((job_id, created_time))
300
+
301
+ except Exception as e:
302
+ logger.error(f"Failed to list old files: {e}")
303
+
304
+ return old_jobs
305
+
306
+ async def cleanup_old_files(self, older_than_hours: int) -> int:
307
+ """Clean up jobs older than specified hours."""
308
+ old_jobs = await self.list_old_files(older_than_hours)
309
+ total_deleted = 0
310
+
311
+ for job_id, created_time in old_jobs:
312
+ deleted_count = await self.delete_job_files(job_id)
313
+ total_deleted += deleted_count
314
+
315
+ if total_deleted > 0:
316
+ logger.info(f"Cleaned up {total_deleted} files from {len(old_jobs)} old jobs")
317
+
318
+ return total_deleted
319
+
320
+ # Additional utility methods for job management
321
+ async def list_job_files(self, job_id: str) -> List[str]:
322
+ """List all files for a specific job."""
323
+ prefix = f"jobs/{job_id}/"
324
+ files = []
325
+
326
+ async with self._get_client() as s3:
327
+ try:
328
+ paginator = s3.get_paginator('list_objects_v2')
329
+ async for page in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
330
+ if 'Contents' in page:
331
+ files.extend([obj['Key'] for obj in page['Contents']])
332
+ except Exception as e:
333
+ logger.error(f"Failed to list job files for {job_id}: {e}")
334
+
335
+ return files
336
+
337
+ async def get_job_id_from_key(self, object_key: str) -> Optional[str]:
338
+ """Extract job ID from R2 object key."""
339
+ try:
340
+ # Expecting format: jobs/{job_id}/filename
341
+ parts = object_key.split('/')
342
+ if len(parts) >= 3 and parts[0] == 'jobs':
343
+ return parts[1]
344
+ except Exception:
345
+ pass
346
+ return None
347
+
348
+ # Local file access methods for FFmpeg processing
349
+ async def get_local_path(self, storage_key: str) -> str:
350
+ """Download file from R2 to local temp file and return local path."""
351
+ # Create temp file with appropriate extension
352
+ _, ext = os.path.splitext(storage_key.split('/')[-1])
353
+
354
+ with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
355
+ temp_path = temp_file.name
356
+
357
+ try:
358
+ # Download content from R2
359
+ content = await self.read_file(storage_key)
360
+
361
+ # Write to temp file
362
+ with open(temp_path, 'wb') as f:
363
+ f.write(content)
364
+
365
+ logger.debug(f"Downloaded {storage_key} to local temp file: {temp_path}")
366
+ return temp_path
367
+
368
+ except Exception as e:
369
+ # Clean up temp file if download failed
370
+ if os.path.exists(temp_path):
371
+ os.unlink(temp_path)
372
+ logger.error(f"Failed to download {storage_key} to local temp: {e}")
373
+ raise
374
+
375
+ async def save_local_file_to_storage(self, local_path: str, storage_key: str) -> str:
376
+ """Upload local file to R2 storage."""
377
+ try:
378
+ # Read local file
379
+ with open(local_path, 'rb') as f:
380
+ content = f.read()
381
+
382
+ # Upload to R2
383
+ async with self._get_client() as s3:
384
+ await s3.put_object(
385
+ Bucket=self.bucket_name,
386
+ Key=storage_key,
387
+ Body=content,
388
+ Metadata={
389
+ 'upload_time': datetime.utcnow().isoformat(),
390
+ 'source': 'local_file'
391
+ }
392
+ )
393
+
394
+ logger.debug(f"Uploaded local file {local_path} to R2: {storage_key}")
395
+ return storage_key
396
+
397
+ except Exception as e:
398
+ logger.error(f"Failed to upload local file {local_path} to R2: {e}")
399
+ raise
400
+
401
+ async def cleanup_local_path(self, local_path: str, storage_key: str) -> bool:
402
+ """Clean up local temp file."""
403
+ try:
404
+ if os.path.exists(local_path):
405
+ os.unlink(local_path)
406
+ logger.debug(f"Cleaned up local temp file: {local_path}")
407
+ return True
408
+ return False
409
+ except Exception as e:
410
+ logger.error(f"Failed to clean up local temp file {local_path}: {e}")
411
+ return False
infrastructure/services/container.py CHANGED
@@ -4,7 +4,7 @@ from pathlib import Path
4
 
5
  from ..config.settings import settings
6
  from ..repositories.job_repository import InMemoryJobRepository
7
- from ..repositories.file_repository import FileSystemRepository
8
  from .ffmpeg_service import FFmpegService
9
  from .file_cleanup_service import FileCleanupService
10
 
@@ -16,7 +16,9 @@ class ServiceContainer:
16
  def __init__(self):
17
  # Repositories
18
  self.job_repository = InMemoryJobRepository()
19
- self.file_repository = FileSystemRepository(settings.temp_dir)
 
 
20
 
21
  # Services
22
  self.ffmpeg_service = FFmpegService(
@@ -32,6 +34,24 @@ class ServiceContainer:
32
  retention_hours=settings.file_retention_hours
33
  )
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  @classmethod
36
  def get_instance(cls) -> 'ServiceContainer':
37
  """Get singleton instance of service container."""
 
4
 
5
  from ..config.settings import settings
6
  from ..repositories.job_repository import InMemoryJobRepository
7
+ from ..providers.file_storage_provider import create_storage_from_settings
8
  from .ffmpeg_service import FFmpegService
9
  from .file_cleanup_service import FileCleanupService
10
 
 
16
  def __init__(self):
17
  # Repositories
18
  self.job_repository = InMemoryJobRepository()
19
+
20
+ # Create file repository based on settings
21
+ self.file_repository = self._create_file_repository()
22
 
23
  # Services
24
  self.ffmpeg_service = FFmpegService(
 
34
  retention_hours=settings.file_retention_hours
35
  )
36
 
37
+ def _create_file_repository(self):
38
+ """Create file repository based on settings."""
39
+ if settings.storage_type.lower() == "filesystem":
40
+ return create_storage_from_settings(
41
+ storage_type="filesystem",
42
+ base_path=settings.temp_dir
43
+ )
44
+ elif settings.storage_type.lower() == "r2":
45
+ return create_storage_from_settings(
46
+ storage_type="r2",
47
+ account_id=settings.cloudflare_r2_account_id,
48
+ access_key_id=settings.cloudflare_r2_access_key_id,
49
+ secret_access_key=settings.cloudflare_r2_secret_access_key,
50
+ bucket_name=settings.cloudflare_r2_bucket_name
51
+ )
52
+ else:
53
+ raise ValueError(f"Unsupported storage type: {settings.storage_type}")
54
+
55
  @classmethod
56
  def get_instance(cls) -> 'ServiceContainer':
57
  """Get singleton instance of service container."""
infrastructure/services/local_file_processor.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Local file processor for handling FFmpeg operations with remote storage."""
2
+ import logging
3
+ import os
4
+ from contextlib import asynccontextmanager
5
+ from typing import AsyncGenerator, Tuple
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class LocalFileProcessor:
11
+ """Handles local file operations for FFmpeg processing with remote storage."""
12
+
13
+ def __init__(self, file_repository):
14
+ self.file_repository = file_repository
15
+
16
+ @asynccontextmanager
17
+ async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]:
18
+ """
19
+ Context manager that provides local file paths for input and output.
20
+
21
+ For filesystem storage: Returns paths directly
22
+ For R2 storage: Downloads input to temp, provides temp output path
23
+
24
+ Usage:
25
+ async with processor.get_local_files(input_key, output_key) as (local_input, local_output):
26
+ # Run FFmpeg with local_input and local_output
27
+ ffmpeg_command(local_input, local_output)
28
+ # Files are automatically uploaded and cleaned up
29
+
30
+ Args:
31
+ input_storage_key: Storage key/path for input file
32
+ output_storage_key: Storage key/path for output file
33
+
34
+ Yields:
35
+ Tuple[str, str]: (local_input_path, local_output_path)
36
+ """
37
+ local_input_path = None
38
+ local_output_path = None
39
+
40
+ try:
41
+ # Get local input path (downloads if remote)
42
+ local_input_path = await self.file_repository.get_local_path(input_storage_key)
43
+ logger.debug(f"Got local input path: {local_input_path}")
44
+
45
+ # Create local output path
46
+ if hasattr(self.file_repository, 'base_path'):
47
+ # Filesystem storage - create output path directly
48
+ local_output_path = output_storage_key
49
+ else:
50
+ # Remote storage - create temp file for output
51
+ import tempfile
52
+ _, ext = os.path.splitext(output_storage_key.split('/')[-1])
53
+ with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
54
+ local_output_path = temp_file.name
55
+
56
+ logger.debug(f"Created local output path: {local_output_path}")
57
+
58
+ # Yield the local paths for processing
59
+ yield local_input_path, local_output_path
60
+
61
+ # Upload output file to storage if it was created
62
+ if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0:
63
+ await self.file_repository.save_local_file_to_storage(
64
+ local_output_path,
65
+ output_storage_key
66
+ )
67
+ logger.debug(f"Uploaded output file to storage: {output_storage_key}")
68
+ else:
69
+ logger.warning(f"Output file was not created or is empty: {local_output_path}")
70
+
71
+ except Exception as e:
72
+ logger.error(f"Error in local file processing: {e}")
73
+ raise
74
+
75
+ finally:
76
+ # Clean up local files
77
+ cleanup_tasks = []
78
+
79
+ if local_input_path:
80
+ cleanup_tasks.append(
81
+ self.file_repository.cleanup_local_path(local_input_path, input_storage_key)
82
+ )
83
+
84
+ if local_output_path and hasattr(self.file_repository, '_get_client'):
85
+ # Only clean up output file for remote storage (R2)
86
+ cleanup_tasks.append(
87
+ self.file_repository.cleanup_local_path(local_output_path, output_storage_key)
88
+ )
89
+
90
+ # Execute cleanup tasks
91
+ for task in cleanup_tasks:
92
+ try:
93
+ await task
94
+ except Exception as e:
95
+ logger.error(f"Error during cleanup: {e}")
96
+
97
+ async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs):
98
+ """
99
+ Helper method to process files with FFmpeg.
100
+
101
+ Args:
102
+ input_storage_key: Storage key for input file
103
+ output_storage_key: Storage key for output file
104
+ ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs)
105
+ *args, **kwargs: Additional arguments for ffmpeg_func
106
+
107
+ Returns:
108
+ Result from ffmpeg_func
109
+ """
110
+ async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output):
111
+ logger.info(f"Processing {input_storage_key} -> {output_storage_key}")
112
+ return await ffmpeg_func(local_input, local_output, *args, **kwargs)
interfaces/api/routes/extraction_routes.py CHANGED
@@ -6,6 +6,7 @@ from dataclasses import asdict
6
  from ..dependencies import ValidatedVideo, ExtractionParams, UseCases, Services
7
  from ..responses import JobCreatedResponse
8
  from application.dto.extraction_request import ExtractionRequestDTO
 
9
 
10
  router = APIRouter()
11
 
@@ -44,33 +45,42 @@ async def extract_audio(
44
  # Get file size
45
  file_size = _get_file_size(video)
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  # Create DTO
48
  extraction_dto = ExtractionRequestDTO(
49
  video_filename=video.filename,
50
- video_file_path="", # Will be set by use case
51
  video_file_size=file_size,
52
  output_format=params.output_format,
53
  quality=params.quality,
54
  content_type=video.content_type
55
  )
56
 
57
- # Save uploaded file temporarily
58
- file_path = await services.file_repository.save_stream(
59
- video,
60
- video.filename
61
- )
62
- extraction_dto.video_file_path = file_path
63
-
64
- # Always use async processing
65
  try:
66
- result = await use_cases.extract_audio_async.execute(
 
67
  extraction_dto,
68
  background_tasks
69
  )
70
 
71
  return JSONResponse(
72
  content=asdict(result),
73
- status_code=202 # Accepted
74
  )
75
  except Exception as e:
76
  # Clean up input file on error
 
6
  from ..dependencies import ValidatedVideo, ExtractionParams, UseCases, Services
7
  from ..responses import JobCreatedResponse
8
  from application.dto.extraction_request import ExtractionRequestDTO
9
+ from domain.entities.job import Job
10
 
11
  router = APIRouter()
12
 
 
45
  # Get file size
46
  file_size = _get_file_size(video)
47
 
48
+ # Create job first to get the job ID
49
+ job = Job.create_new(
50
+ video_filename=video.filename,
51
+ file_size_bytes=file_size,
52
+ output_format=params.output_format,
53
+ quality=params.quality
54
+ )
55
+
56
+ # Save uploaded file with the job ID
57
+ file_path = await services.file_repository.save_stream(
58
+ video,
59
+ video.filename,
60
+ job_id=job.id # Pass the job ID here
61
+ )
62
+
63
  # Create DTO
64
  extraction_dto = ExtractionRequestDTO(
65
  video_filename=video.filename,
66
+ video_file_path=file_path,
67
  video_file_size=file_size,
68
  output_format=params.output_format,
69
  quality=params.quality,
70
  content_type=video.content_type
71
  )
72
 
73
+ # Execute use case with pre-created job
 
 
 
 
 
 
 
74
  try:
75
+ result = await use_cases.extract_audio_async.execute_with_job(
76
+ job, # Pass the job object
77
  extraction_dto,
78
  background_tasks
79
  )
80
 
81
  return JSONResponse(
82
  content=asdict(result),
83
+ status_code=202
84
  )
85
  except Exception as e:
86
  # Clean up input file on error
requirements.txt CHANGED
@@ -10,5 +10,9 @@ pydantic-settings==2.1.0
10
  aiofiles==23.2.1
11
  ffmpeg-python==0.2.0
12
 
 
 
 
 
13
  # Utilities
14
  python-dotenv==1.0.0
 
10
  aiofiles==23.2.1
11
  ffmpeg-python==0.2.0
12
 
13
+ # Cloud Storage (R2/S3 compatible)
14
+ aioboto3==12.3.0
15
+ botocore==1.34.34
16
+
17
  # Utilities
18
  python-dotenv==1.0.0