audio-processor / infrastructure /services /local_file_processor.py
Tadeas Kosek
add r2 file repository
37e59a0
"""Local file processor for handling FFmpeg operations with remote storage."""
import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Tuple
logger = logging.getLogger(__name__)
class LocalFileProcessor:
"""Handles local file operations for FFmpeg processing with remote storage."""
def __init__(self, file_repository):
self.file_repository = file_repository
@asynccontextmanager
async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]:
"""
Context manager that provides local file paths for input and output.
For filesystem storage: Returns paths directly
For R2 storage: Downloads input to temp, provides temp output path
Usage:
async with processor.get_local_files(input_key, output_key) as (local_input, local_output):
# Run FFmpeg with local_input and local_output
ffmpeg_command(local_input, local_output)
# Files are automatically uploaded and cleaned up
Args:
input_storage_key: Storage key/path for input file
output_storage_key: Storage key/path for output file
Yields:
Tuple[str, str]: (local_input_path, local_output_path)
"""
local_input_path = None
local_output_path = None
try:
# Get local input path (downloads if remote)
local_input_path = await self.file_repository.get_local_path(input_storage_key)
logger.debug(f"Got local input path: {local_input_path}")
# Create local output path
if hasattr(self.file_repository, 'base_path'):
# Filesystem storage - create output path directly
local_output_path = output_storage_key
else:
# Remote storage - create temp file for output
import tempfile
_, ext = os.path.splitext(output_storage_key.split('/')[-1])
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
local_output_path = temp_file.name
logger.debug(f"Created local output path: {local_output_path}")
# Yield the local paths for processing
yield local_input_path, local_output_path
# Upload output file to storage if it was created
if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0:
await self.file_repository.save_local_file_to_storage(
local_output_path,
output_storage_key
)
logger.debug(f"Uploaded output file to storage: {output_storage_key}")
else:
logger.warning(f"Output file was not created or is empty: {local_output_path}")
except Exception as e:
logger.error(f"Error in local file processing: {e}")
raise
finally:
# Clean up local files
cleanup_tasks = []
if local_input_path:
cleanup_tasks.append(
self.file_repository.cleanup_local_path(local_input_path, input_storage_key)
)
if local_output_path and hasattr(self.file_repository, '_get_client'):
# Only clean up output file for remote storage (R2)
cleanup_tasks.append(
self.file_repository.cleanup_local_path(local_output_path, output_storage_key)
)
# Execute cleanup tasks
for task in cleanup_tasks:
try:
await task
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs):
"""
Helper method to process files with FFmpeg.
Args:
input_storage_key: Storage key for input file
output_storage_key: Storage key for output file
ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs)
*args, **kwargs: Additional arguments for ffmpeg_func
Returns:
Result from ffmpeg_func
"""
async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output):
logger.info(f"Processing {input_storage_key} -> {output_storage_key}")
return await ffmpeg_func(local_input, local_output, *args, **kwargs)