File size: 8,073 Bytes
eab1374
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
File Upload Handler for Hugging Face Spaces Compatibility

This module provides utilities for handling file uploads in a way that's compatible
with Hugging Face Spaces restrictions. It uses the /tmp directory as an intermediate
storage location to work around direct file streaming limitations.
"""

import os
import tempfile
import uuid
from pathlib import Path
from typing import Optional, Union, BinaryIO
import streamlit as st
from datetime import datetime
import atexit
import zipfile
from io import BytesIO


class FileUploadHandler:
    """Handle file uploads with /tmp directory approach for HF Spaces compatibility."""
    
    # Track temporary files for cleanup
    _temp_files = set()
    
    @staticmethod
    def save_to_temp(uploaded_file, prefix: str = "") -> Optional[str]:
        """
        Save uploaded file to /tmp directory and return the path.
        
        Args:
            uploaded_file: Streamlit UploadedFile object
            prefix: Optional prefix for the temporary filename
            
        Returns:
            Path to saved temporary file, or None if error
        """
        try:
            # Generate unique filename to avoid conflicts
            unique_id = str(uuid.uuid4())[:8]
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            safe_filename = Path(uploaded_file.name).name  # Sanitize filename
            
            # Construct temporary filename
            if prefix:
                temp_filename = f"{prefix}_{timestamp}_{unique_id}_{safe_filename}"
            else:
                temp_filename = f"{timestamp}_{unique_id}_{safe_filename}"
            
            temp_path = os.path.join("/tmp", temp_filename)
            
            # Save to /tmp using getbuffer() which is more reliable
            with open(temp_path, 'wb') as f:
                f.write(uploaded_file.getbuffer())
            
            # Track for cleanup
            FileUploadHandler._temp_files.add(temp_path)
            
            # Store in session state for persistence across reruns
            if 'temp_files' not in st.session_state:
                st.session_state.temp_files = set()
            st.session_state.temp_files.add(temp_path)
            
            return temp_path
            
        except Exception as e:
            st.error(f"Failed to save uploaded file: {str(e)}")
            return None
    
    @staticmethod
    def read_from_temp(temp_path: str, mode: str = 'rb') -> Optional[Union[bytes, str]]:
        """
        Read file from temporary location.
        
        Args:
            temp_path: Path to temporary file
            mode: Read mode ('rb' for binary, 'r' for text)
            
        Returns:
            File content as bytes or string, or None if error
        """
        try:
            with open(temp_path, mode) as f:
                return f.read()
        except Exception as e:
            st.error(f"Failed to read temporary file: {str(e)}")
            return None
    
    @staticmethod
    def get_file_content(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]:
        """
        Get file content using temp file approach.
        
        Args:
            uploaded_file: Streamlit UploadedFile object
            as_text: Whether to return content as decoded text
            encoding: Text encoding to use if as_text is True
            
        Returns:
            File content as bytes or string, or None if error
        """
        temp_path = FileUploadHandler.save_to_temp(uploaded_file)
        if not temp_path:
            return None
        
        try:
            if as_text:
                content = FileUploadHandler.read_from_temp(temp_path, mode='r')
            else:
                content = FileUploadHandler.read_from_temp(temp_path, mode='rb')
                if as_text and content:
                    content = content.decode(encoding)
            
            return content
        finally:
            # Optionally cleanup immediately after reading
            # FileUploadHandler.cleanup_temp_file(temp_path)
            pass
    
    @staticmethod
    def handle_zip_file(uploaded_file) -> Optional[zipfile.ZipFile]:
        """
        Handle ZIP file uploads by saving to temp and returning ZipFile object.
        
        Args:
            uploaded_file: Streamlit UploadedFile object (should be a ZIP file)
            
        Returns:
            ZipFile object opened from temp location, or None if error
        """
        temp_path = FileUploadHandler.save_to_temp(uploaded_file)
        if not temp_path:
            return None
        
        try:
            return zipfile.ZipFile(temp_path, 'r')
        except Exception as e:
            st.error(f"Failed to open ZIP file: {str(e)}")
            FileUploadHandler.cleanup_temp_file(temp_path)
            return None
    
    @staticmethod
    def cleanup_temp_file(temp_path: str):
        """
        Remove a temporary file.
        
        Args:
            temp_path: Path to temporary file to remove
        """
        try:
            if os.path.exists(temp_path):
                os.remove(temp_path)
                FileUploadHandler._temp_files.discard(temp_path)
                if 'temp_files' in st.session_state:
                    st.session_state.temp_files.discard(temp_path)
        except Exception:
            # Ignore cleanup errors
            pass
    
    @staticmethod
    def cleanup_all_temp_files():
        """Cleanup all tracked temporary files."""
        # Clean up class-tracked files
        for temp_path in list(FileUploadHandler._temp_files):
            FileUploadHandler.cleanup_temp_file(temp_path)
        
        # Clean up session-tracked files
        if 'temp_files' in st.session_state:
            for temp_path in list(st.session_state.temp_files):
                FileUploadHandler.cleanup_temp_file(temp_path)
    
    @staticmethod
    def cleanup_old_temp_files(max_age_hours: int = 1):
        """
        Clean up old temporary files in /tmp directory.
        
        Args:
            max_age_hours: Maximum age of files to keep (in hours)
        """
        try:
            current_time = datetime.now()
            temp_dir = "/tmp"
            
            # Pattern to match our temporary files
            for filename in os.listdir(temp_dir):
                # Check if it matches our naming pattern
                if filename.count('_') >= 3:  # Our format has at least 3 underscores
                    filepath = os.path.join(temp_dir, filename)
                    
                    # Check file age
                    if os.path.isfile(filepath):
                        file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
                        age_hours = (current_time - file_time).total_seconds() / 3600
                        
                        if age_hours > max_age_hours:
                            try:
                                os.remove(filepath)
                            except:
                                pass
        except Exception:
            # Ignore cleanup errors
            pass
    
    @staticmethod
    def validate_file_size(uploaded_file, max_size_mb: int = 300) -> bool:
        """
        Validate file size before processing.
        
        Args:
            uploaded_file: Streamlit UploadedFile object
            max_size_mb: Maximum allowed file size in MB
            
        Returns:
            True if file size is valid, False otherwise
        """
        try:
            file_size_mb = uploaded_file.size / (1024 * 1024)
            if file_size_mb > max_size_mb:
                st.error(f"File size ({file_size_mb:.1f} MB) exceeds maximum allowed size ({max_size_mb} MB)")
                return False
            return True
        except Exception:
            return True  # Allow processing if we can't determine size


# Register cleanup on exit
atexit.register(FileUploadHandler.cleanup_all_temp_files)