Spaces:
Sleeping
Sleeping
| """Local file processor for handling FFmpeg operations with remote storage.""" | |
| import logging | |
| import os | |
| from contextlib import asynccontextmanager | |
| from typing import AsyncGenerator, Tuple | |
| logger = logging.getLogger(__name__) | |
| class LocalFileProcessor: | |
| """Handles local file operations for FFmpeg processing with remote storage.""" | |
| def __init__(self, file_repository): | |
| self.file_repository = file_repository | |
| async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]: | |
| """ | |
| Context manager that provides local file paths for input and output. | |
| For filesystem storage: Returns paths directly | |
| For R2 storage: Downloads input to temp, provides temp output path | |
| Usage: | |
| async with processor.get_local_files(input_key, output_key) as (local_input, local_output): | |
| # Run FFmpeg with local_input and local_output | |
| ffmpeg_command(local_input, local_output) | |
| # Files are automatically uploaded and cleaned up | |
| Args: | |
| input_storage_key: Storage key/path for input file | |
| output_storage_key: Storage key/path for output file | |
| Yields: | |
| Tuple[str, str]: (local_input_path, local_output_path) | |
| """ | |
| local_input_path = None | |
| local_output_path = None | |
| try: | |
| # Get local input path (downloads if remote) | |
| local_input_path = await self.file_repository.get_local_path(input_storage_key) | |
| logger.debug(f"Got local input path: {local_input_path}") | |
| # Create local output path | |
| if hasattr(self.file_repository, 'base_path'): | |
| # Filesystem storage - create output path directly | |
| local_output_path = output_storage_key | |
| else: | |
| # Remote storage - create temp file for output | |
| import tempfile | |
| _, ext = os.path.splitext(output_storage_key.split('/')[-1]) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: | |
| local_output_path = temp_file.name | |
| logger.debug(f"Created local output path: {local_output_path}") | |
| # Yield the local paths for processing | |
| yield local_input_path, local_output_path | |
| # Upload output file to storage if it was created | |
| if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0: | |
| await self.file_repository.save_local_file_to_storage( | |
| local_output_path, | |
| output_storage_key | |
| ) | |
| logger.debug(f"Uploaded output file to storage: {output_storage_key}") | |
| else: | |
| logger.warning(f"Output file was not created or is empty: {local_output_path}") | |
| except Exception as e: | |
| logger.error(f"Error in local file processing: {e}") | |
| raise | |
| finally: | |
| # Clean up local files | |
| cleanup_tasks = [] | |
| if local_input_path: | |
| cleanup_tasks.append( | |
| self.file_repository.cleanup_local_path(local_input_path, input_storage_key) | |
| ) | |
| if local_output_path and hasattr(self.file_repository, '_get_client'): | |
| # Only clean up output file for remote storage (R2) | |
| cleanup_tasks.append( | |
| self.file_repository.cleanup_local_path(local_output_path, output_storage_key) | |
| ) | |
| # Execute cleanup tasks | |
| for task in cleanup_tasks: | |
| try: | |
| await task | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {e}") | |
| async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs): | |
| """ | |
| Helper method to process files with FFmpeg. | |
| Args: | |
| input_storage_key: Storage key for input file | |
| output_storage_key: Storage key for output file | |
| ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs) | |
| *args, **kwargs: Additional arguments for ffmpeg_func | |
| Returns: | |
| Result from ffmpeg_func | |
| """ | |
| async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output): | |
| logger.info(f"Processing {input_storage_key} -> {output_storage_key}") | |
| return await ffmpeg_func(local_input, local_output, *args, **kwargs) |