"""Local file processor for handling FFmpeg operations with remote storage.""" import logging import os from contextlib import asynccontextmanager from typing import AsyncGenerator, Tuple logger = logging.getLogger(__name__) class LocalFileProcessor: """Handles local file operations for FFmpeg processing with remote storage.""" def __init__(self, file_repository): self.file_repository = file_repository @asynccontextmanager async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]: """ Context manager that provides local file paths for input and output. For filesystem storage: Returns paths directly For R2 storage: Downloads input to temp, provides temp output path Usage: async with processor.get_local_files(input_key, output_key) as (local_input, local_output): # Run FFmpeg with local_input and local_output ffmpeg_command(local_input, local_output) # Files are automatically uploaded and cleaned up Args: input_storage_key: Storage key/path for input file output_storage_key: Storage key/path for output file Yields: Tuple[str, str]: (local_input_path, local_output_path) """ local_input_path = None local_output_path = None try: # Get local input path (downloads if remote) local_input_path = await self.file_repository.get_local_path(input_storage_key) logger.debug(f"Got local input path: {local_input_path}") # Create local output path if hasattr(self.file_repository, 'base_path'): # Filesystem storage - create output path directly local_output_path = output_storage_key else: # Remote storage - create temp file for output import tempfile _, ext = os.path.splitext(output_storage_key.split('/')[-1]) with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: local_output_path = temp_file.name logger.debug(f"Created local output path: {local_output_path}") # Yield the local paths for processing yield local_input_path, local_output_path # Upload output file to storage if it was created if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0: await self.file_repository.save_local_file_to_storage( local_output_path, output_storage_key ) logger.debug(f"Uploaded output file to storage: {output_storage_key}") else: logger.warning(f"Output file was not created or is empty: {local_output_path}") except Exception as e: logger.error(f"Error in local file processing: {e}") raise finally: # Clean up local files cleanup_tasks = [] if local_input_path: cleanup_tasks.append( self.file_repository.cleanup_local_path(local_input_path, input_storage_key) ) if local_output_path and hasattr(self.file_repository, '_get_client'): # Only clean up output file for remote storage (R2) cleanup_tasks.append( self.file_repository.cleanup_local_path(local_output_path, output_storage_key) ) # Execute cleanup tasks for task in cleanup_tasks: try: await task except Exception as e: logger.error(f"Error during cleanup: {e}") async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs): """ Helper method to process files with FFmpeg. Args: input_storage_key: Storage key for input file output_storage_key: Storage key for output file ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs) *args, **kwargs: Additional arguments for ffmpeg_func Returns: Result from ffmpeg_func """ async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output): logger.info(f"Processing {input_storage_key} -> {output_storage_key}") return await ffmpeg_func(local_input, local_output, *args, **kwargs)