Spaces:
Building
Building
| """ | |
| File Upload Handler for Hugging Face Spaces Compatibility | |
| This module provides utilities for handling file uploads in a way that's compatible | |
| with Hugging Face Spaces restrictions. It uses the /tmp directory as an intermediate | |
| storage location to work around direct file streaming limitations. | |
| """ | |
| import os | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional, Union, BinaryIO | |
| import streamlit as st | |
| from datetime import datetime | |
| import atexit | |
| import zipfile | |
| from io import BytesIO | |
| class FileUploadHandler: | |
| """Handle file uploads with /tmp directory approach for HF Spaces compatibility.""" | |
| # Track temporary files for cleanup | |
| _temp_files = set() | |
| def save_to_temp(uploaded_file, prefix: str = "") -> Optional[str]: | |
| """ | |
| Save uploaded file to /tmp directory and return the path. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object | |
| prefix: Optional prefix for the temporary filename | |
| Returns: | |
| Path to saved temporary file, or None if error | |
| """ | |
| try: | |
| # Generate unique filename to avoid conflicts | |
| unique_id = str(uuid.uuid4())[:8] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_filename = Path(uploaded_file.name).name # Sanitize filename | |
| # Construct temporary filename | |
| if prefix: | |
| temp_filename = f"{prefix}_{timestamp}_{unique_id}_{safe_filename}" | |
| else: | |
| temp_filename = f"{timestamp}_{unique_id}_{safe_filename}" | |
| temp_path = os.path.join("/tmp", temp_filename) | |
| # Save to /tmp using getbuffer() which is more reliable | |
| with open(temp_path, 'wb') as f: | |
| f.write(uploaded_file.getbuffer()) | |
| # Track for cleanup | |
| FileUploadHandler._temp_files.add(temp_path) | |
| # Store in session state for persistence across reruns | |
| if 'temp_files' not in st.session_state: | |
| st.session_state.temp_files = set() | |
| st.session_state.temp_files.add(temp_path) | |
| return temp_path | |
| except Exception as e: | |
| st.error(f"Failed to save uploaded file: {str(e)}") | |
| return None | |
| def read_from_temp(temp_path: str, mode: str = 'rb') -> Optional[Union[bytes, str]]: | |
| """ | |
| Read file from temporary location. | |
| Args: | |
| temp_path: Path to temporary file | |
| mode: Read mode ('rb' for binary, 'r' for text) | |
| Returns: | |
| File content as bytes or string, or None if error | |
| """ | |
| try: | |
| with open(temp_path, mode) as f: | |
| return f.read() | |
| except Exception as e: | |
| st.error(f"Failed to read temporary file: {str(e)}") | |
| return None | |
| def get_file_content(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]: | |
| """ | |
| Get file content using temp file approach. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object | |
| as_text: Whether to return content as decoded text | |
| encoding: Text encoding to use if as_text is True | |
| Returns: | |
| File content as bytes or string, or None if error | |
| """ | |
| temp_path = FileUploadHandler.save_to_temp(uploaded_file) | |
| if not temp_path: | |
| return None | |
| try: | |
| if as_text: | |
| content = FileUploadHandler.read_from_temp(temp_path, mode='r') | |
| else: | |
| content = FileUploadHandler.read_from_temp(temp_path, mode='rb') | |
| if as_text and content: | |
| content = content.decode(encoding) | |
| return content | |
| finally: | |
| # Optionally cleanup immediately after reading | |
| # FileUploadHandler.cleanup_temp_file(temp_path) | |
| pass | |
| def handle_zip_file(uploaded_file) -> Optional[zipfile.ZipFile]: | |
| """ | |
| Handle ZIP file uploads by saving to temp and returning ZipFile object. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object (should be a ZIP file) | |
| Returns: | |
| ZipFile object opened from temp location, or None if error | |
| """ | |
| temp_path = FileUploadHandler.save_to_temp(uploaded_file) | |
| if not temp_path: | |
| return None | |
| try: | |
| return zipfile.ZipFile(temp_path, 'r') | |
| except Exception as e: | |
| st.error(f"Failed to open ZIP file: {str(e)}") | |
| FileUploadHandler.cleanup_temp_file(temp_path) | |
| return None | |
| def cleanup_temp_file(temp_path: str): | |
| """ | |
| Remove a temporary file. | |
| Args: | |
| temp_path: Path to temporary file to remove | |
| """ | |
| try: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| FileUploadHandler._temp_files.discard(temp_path) | |
| if 'temp_files' in st.session_state: | |
| st.session_state.temp_files.discard(temp_path) | |
| except Exception: | |
| # Ignore cleanup errors | |
| pass | |
| def cleanup_all_temp_files(): | |
| """Cleanup all tracked temporary files.""" | |
| # Clean up class-tracked files | |
| for temp_path in list(FileUploadHandler._temp_files): | |
| FileUploadHandler.cleanup_temp_file(temp_path) | |
| # Clean up session-tracked files | |
| if 'temp_files' in st.session_state: | |
| for temp_path in list(st.session_state.temp_files): | |
| FileUploadHandler.cleanup_temp_file(temp_path) | |
| def cleanup_old_temp_files(max_age_hours: int = 1): | |
| """ | |
| Clean up old temporary files in /tmp directory. | |
| Args: | |
| max_age_hours: Maximum age of files to keep (in hours) | |
| """ | |
| try: | |
| current_time = datetime.now() | |
| temp_dir = "/tmp" | |
| # Pattern to match our temporary files | |
| for filename in os.listdir(temp_dir): | |
| # Check if it matches our naming pattern | |
| if filename.count('_') >= 3: # Our format has at least 3 underscores | |
| filepath = os.path.join(temp_dir, filename) | |
| # Check file age | |
| if os.path.isfile(filepath): | |
| file_time = datetime.fromtimestamp(os.path.getmtime(filepath)) | |
| age_hours = (current_time - file_time).total_seconds() / 3600 | |
| if age_hours > max_age_hours: | |
| try: | |
| os.remove(filepath) | |
| except: | |
| pass | |
| except Exception: | |
| # Ignore cleanup errors | |
| pass | |
| def validate_file_size(uploaded_file, max_size_mb: int = 300) -> bool: | |
| """ | |
| Validate file size before processing. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object | |
| max_size_mb: Maximum allowed file size in MB | |
| Returns: | |
| True if file size is valid, False otherwise | |
| """ | |
| try: | |
| file_size_mb = uploaded_file.size / (1024 * 1024) | |
| if file_size_mb > max_size_mb: | |
| st.error(f"File size ({file_size_mb:.1f} MB) exceeds maximum allowed size ({max_size_mb} MB)") | |
| return False | |
| return True | |
| except Exception: | |
| return True # Allow processing if we can't determine size | |
| # Register cleanup on exit | |
| atexit.register(FileUploadHandler.cleanup_all_temp_files) |