Spaces:
Sleeping
Sleeping
File size: 4,837 Bytes
37e59a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
"""Local file processor for handling FFmpeg operations with remote storage."""
import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Tuple
logger = logging.getLogger(__name__)
class LocalFileProcessor:
"""Handles local file operations for FFmpeg processing with remote storage."""
def __init__(self, file_repository):
self.file_repository = file_repository
@asynccontextmanager
async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]:
"""
Context manager that provides local file paths for input and output.
For filesystem storage: Returns paths directly
For R2 storage: Downloads input to temp, provides temp output path
Usage:
async with processor.get_local_files(input_key, output_key) as (local_input, local_output):
# Run FFmpeg with local_input and local_output
ffmpeg_command(local_input, local_output)
# Files are automatically uploaded and cleaned up
Args:
input_storage_key: Storage key/path for input file
output_storage_key: Storage key/path for output file
Yields:
Tuple[str, str]: (local_input_path, local_output_path)
"""
local_input_path = None
local_output_path = None
try:
# Get local input path (downloads if remote)
local_input_path = await self.file_repository.get_local_path(input_storage_key)
logger.debug(f"Got local input path: {local_input_path}")
# Create local output path
if hasattr(self.file_repository, 'base_path'):
# Filesystem storage - create output path directly
local_output_path = output_storage_key
else:
# Remote storage - create temp file for output
import tempfile
_, ext = os.path.splitext(output_storage_key.split('/')[-1])
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
local_output_path = temp_file.name
logger.debug(f"Created local output path: {local_output_path}")
# Yield the local paths for processing
yield local_input_path, local_output_path
# Upload output file to storage if it was created
if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0:
await self.file_repository.save_local_file_to_storage(
local_output_path,
output_storage_key
)
logger.debug(f"Uploaded output file to storage: {output_storage_key}")
else:
logger.warning(f"Output file was not created or is empty: {local_output_path}")
except Exception as e:
logger.error(f"Error in local file processing: {e}")
raise
finally:
# Clean up local files
cleanup_tasks = []
if local_input_path:
cleanup_tasks.append(
self.file_repository.cleanup_local_path(local_input_path, input_storage_key)
)
if local_output_path and hasattr(self.file_repository, '_get_client'):
# Only clean up output file for remote storage (R2)
cleanup_tasks.append(
self.file_repository.cleanup_local_path(local_output_path, output_storage_key)
)
# Execute cleanup tasks
for task in cleanup_tasks:
try:
await task
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs):
"""
Helper method to process files with FFmpeg.
Args:
input_storage_key: Storage key for input file
output_storage_key: Storage key for output file
ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs)
*args, **kwargs: Additional arguments for ffmpeg_func
Returns:
Result from ffmpeg_func
"""
async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output):
logger.info(f"Processing {input_storage_key} -> {output_storage_key}")
return await ffmpeg_func(local_input, local_output, *args, **kwargs) |