""" File Upload Handler for Hugging Face Spaces Compatibility This module provides utilities for handling file uploads in a way that's compatible with Hugging Face Spaces restrictions. It uses the /tmp directory as an intermediate storage location to work around direct file streaming limitations. """ import os import tempfile import uuid from pathlib import Path from typing import Optional, Union, BinaryIO import streamlit as st from datetime import datetime import atexit import zipfile from io import BytesIO class FileUploadHandler: """Handle file uploads with /tmp directory approach for HF Spaces compatibility.""" # Track temporary files for cleanup _temp_files = set() @staticmethod def save_to_temp(uploaded_file, prefix: str = "") -> Optional[str]: """ Save uploaded file to /tmp directory and return the path. Args: uploaded_file: Streamlit UploadedFile object prefix: Optional prefix for the temporary filename Returns: Path to saved temporary file, or None if error """ try: # Generate unique filename to avoid conflicts unique_id = str(uuid.uuid4())[:8] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_filename = Path(uploaded_file.name).name # Sanitize filename # Construct temporary filename if prefix: temp_filename = f"{prefix}_{timestamp}_{unique_id}_{safe_filename}" else: temp_filename = f"{timestamp}_{unique_id}_{safe_filename}" temp_path = os.path.join("/tmp", temp_filename) # Save to /tmp using getbuffer() which is more reliable with open(temp_path, 'wb') as f: f.write(uploaded_file.getbuffer()) # Track for cleanup FileUploadHandler._temp_files.add(temp_path) # Store in session state for persistence across reruns if 'temp_files' not in st.session_state: st.session_state.temp_files = set() st.session_state.temp_files.add(temp_path) return temp_path except Exception as e: st.error(f"Failed to save uploaded file: {str(e)}") return None @staticmethod def read_from_temp(temp_path: str, mode: str = 'rb') -> Optional[Union[bytes, str]]: """ Read file from temporary location. Args: temp_path: Path to temporary file mode: Read mode ('rb' for binary, 'r' for text) Returns: File content as bytes or string, or None if error """ try: with open(temp_path, mode) as f: return f.read() except Exception as e: st.error(f"Failed to read temporary file: {str(e)}") return None @staticmethod def get_file_content(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]: """ Get file content using temp file approach. Args: uploaded_file: Streamlit UploadedFile object as_text: Whether to return content as decoded text encoding: Text encoding to use if as_text is True Returns: File content as bytes or string, or None if error """ temp_path = FileUploadHandler.save_to_temp(uploaded_file) if not temp_path: return None try: if as_text: content = FileUploadHandler.read_from_temp(temp_path, mode='r') else: content = FileUploadHandler.read_from_temp(temp_path, mode='rb') if as_text and content: content = content.decode(encoding) return content finally: # Optionally cleanup immediately after reading # FileUploadHandler.cleanup_temp_file(temp_path) pass @staticmethod def handle_zip_file(uploaded_file) -> Optional[zipfile.ZipFile]: """ Handle ZIP file uploads by saving to temp and returning ZipFile object. Args: uploaded_file: Streamlit UploadedFile object (should be a ZIP file) Returns: ZipFile object opened from temp location, or None if error """ temp_path = FileUploadHandler.save_to_temp(uploaded_file) if not temp_path: return None try: return zipfile.ZipFile(temp_path, 'r') except Exception as e: st.error(f"Failed to open ZIP file: {str(e)}") FileUploadHandler.cleanup_temp_file(temp_path) return None @staticmethod def cleanup_temp_file(temp_path: str): """ Remove a temporary file. Args: temp_path: Path to temporary file to remove """ try: if os.path.exists(temp_path): os.remove(temp_path) FileUploadHandler._temp_files.discard(temp_path) if 'temp_files' in st.session_state: st.session_state.temp_files.discard(temp_path) except Exception: # Ignore cleanup errors pass @staticmethod def cleanup_all_temp_files(): """Cleanup all tracked temporary files.""" # Clean up class-tracked files for temp_path in list(FileUploadHandler._temp_files): FileUploadHandler.cleanup_temp_file(temp_path) # Clean up session-tracked files if 'temp_files' in st.session_state: for temp_path in list(st.session_state.temp_files): FileUploadHandler.cleanup_temp_file(temp_path) @staticmethod def cleanup_old_temp_files(max_age_hours: int = 1): """ Clean up old temporary files in /tmp directory. Args: max_age_hours: Maximum age of files to keep (in hours) """ try: current_time = datetime.now() temp_dir = "/tmp" # Pattern to match our temporary files for filename in os.listdir(temp_dir): # Check if it matches our naming pattern if filename.count('_') >= 3: # Our format has at least 3 underscores filepath = os.path.join(temp_dir, filename) # Check file age if os.path.isfile(filepath): file_time = datetime.fromtimestamp(os.path.getmtime(filepath)) age_hours = (current_time - file_time).total_seconds() / 3600 if age_hours > max_age_hours: try: os.remove(filepath) except: pass except Exception: # Ignore cleanup errors pass @staticmethod def validate_file_size(uploaded_file, max_size_mb: int = 300) -> bool: """ Validate file size before processing. Args: uploaded_file: Streamlit UploadedFile object max_size_mb: Maximum allowed file size in MB Returns: True if file size is valid, False otherwise """ try: file_size_mb = uploaded_file.size / (1024 * 1024) if file_size_mb > max_size_mb: st.error(f"File size ({file_size_mb:.1f} MB) exceeds maximum allowed size ({max_size_mb} MB)") return False return True except Exception: return True # Allow processing if we can't determine size # Register cleanup on exit atexit.register(FileUploadHandler.cleanup_all_temp_files)