File size: 4,837 Bytes
37e59a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Local file processor for handling FFmpeg operations with remote storage."""
import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Tuple

logger = logging.getLogger(__name__)


class LocalFileProcessor:
    """Handles local file operations for FFmpeg processing with remote storage."""
    
    def __init__(self, file_repository):
        self.file_repository = file_repository
    
    @asynccontextmanager
    async def get_local_files(self, input_storage_key: str, output_storage_key: str) -> AsyncGenerator[Tuple[str, str], None]:
        """
        Context manager that provides local file paths for input and output.
        
        For filesystem storage: Returns paths directly
        For R2 storage: Downloads input to temp, provides temp output path
        
        Usage:
            async with processor.get_local_files(input_key, output_key) as (local_input, local_output):
                # Run FFmpeg with local_input and local_output
                ffmpeg_command(local_input, local_output)
                # Files are automatically uploaded and cleaned up
        
        Args:
            input_storage_key: Storage key/path for input file
            output_storage_key: Storage key/path for output file
            
        Yields:
            Tuple[str, str]: (local_input_path, local_output_path)
        """
        local_input_path = None
        local_output_path = None
        
        try:
            # Get local input path (downloads if remote)
            local_input_path = await self.file_repository.get_local_path(input_storage_key)
            logger.debug(f"Got local input path: {local_input_path}")
            
            # Create local output path
            if hasattr(self.file_repository, 'base_path'):
                # Filesystem storage - create output path directly
                local_output_path = output_storage_key
            else:
                # Remote storage - create temp file for output
                import tempfile
                _, ext = os.path.splitext(output_storage_key.split('/')[-1])
                with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
                    local_output_path = temp_file.name
            
            logger.debug(f"Created local output path: {local_output_path}")
            
            # Yield the local paths for processing
            yield local_input_path, local_output_path
            
            # Upload output file to storage if it was created
            if os.path.exists(local_output_path) and os.path.getsize(local_output_path) > 0:
                await self.file_repository.save_local_file_to_storage(
                    local_output_path, 
                    output_storage_key
                )
                logger.debug(f"Uploaded output file to storage: {output_storage_key}")
            else:
                logger.warning(f"Output file was not created or is empty: {local_output_path}")
                
        except Exception as e:
            logger.error(f"Error in local file processing: {e}")
            raise
            
        finally:
            # Clean up local files
            cleanup_tasks = []
            
            if local_input_path:
                cleanup_tasks.append(
                    self.file_repository.cleanup_local_path(local_input_path, input_storage_key)
                )
            
            if local_output_path and hasattr(self.file_repository, '_get_client'):
                # Only clean up output file for remote storage (R2)
                cleanup_tasks.append(
                    self.file_repository.cleanup_local_path(local_output_path, output_storage_key)
                )
            
            # Execute cleanup tasks
            for task in cleanup_tasks:
                try:
                    await task
                except Exception as e:
                    logger.error(f"Error during cleanup: {e}")
    
    async def process_with_ffmpeg(self, input_storage_key: str, output_storage_key: str, ffmpeg_func, *args, **kwargs):
        """
        Helper method to process files with FFmpeg.
        
        Args:
            input_storage_key: Storage key for input file
            output_storage_key: Storage key for output file  
            ffmpeg_func: Async function that takes (input_path, output_path, *args, **kwargs)
            *args, **kwargs: Additional arguments for ffmpeg_func
            
        Returns:
            Result from ffmpeg_func
        """
        async with self.get_local_files(input_storage_key, output_storage_key) as (local_input, local_output):
            logger.info(f"Processing {input_storage_key} -> {output_storage_key}")
            return await ffmpeg_func(local_input, local_output, *args, **kwargs)