simple-text-analyzer / web_app /utils /file_upload_handler.py
egumasa's picture
file upload handle by temp folder
eab1374
"""
File Upload Handler for Hugging Face Spaces Compatibility
This module provides utilities for handling file uploads in a way that's compatible
with Hugging Face Spaces restrictions. It uses the /tmp directory as an intermediate
storage location to work around direct file streaming limitations.
"""
import os
import tempfile
import uuid
from pathlib import Path
from typing import Optional, Union, BinaryIO
import streamlit as st
from datetime import datetime
import atexit
import zipfile
from io import BytesIO
class FileUploadHandler:
"""Handle file uploads with /tmp directory approach for HF Spaces compatibility."""
# Track temporary files for cleanup
_temp_files = set()
@staticmethod
def save_to_temp(uploaded_file, prefix: str = "") -> Optional[str]:
"""
Save uploaded file to /tmp directory and return the path.
Args:
uploaded_file: Streamlit UploadedFile object
prefix: Optional prefix for the temporary filename
Returns:
Path to saved temporary file, or None if error
"""
try:
# Generate unique filename to avoid conflicts
unique_id = str(uuid.uuid4())[:8]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = Path(uploaded_file.name).name # Sanitize filename
# Construct temporary filename
if prefix:
temp_filename = f"{prefix}_{timestamp}_{unique_id}_{safe_filename}"
else:
temp_filename = f"{timestamp}_{unique_id}_{safe_filename}"
temp_path = os.path.join("/tmp", temp_filename)
# Save to /tmp using getbuffer() which is more reliable
with open(temp_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
# Track for cleanup
FileUploadHandler._temp_files.add(temp_path)
# Store in session state for persistence across reruns
if 'temp_files' not in st.session_state:
st.session_state.temp_files = set()
st.session_state.temp_files.add(temp_path)
return temp_path
except Exception as e:
st.error(f"Failed to save uploaded file: {str(e)}")
return None
@staticmethod
def read_from_temp(temp_path: str, mode: str = 'rb') -> Optional[Union[bytes, str]]:
"""
Read file from temporary location.
Args:
temp_path: Path to temporary file
mode: Read mode ('rb' for binary, 'r' for text)
Returns:
File content as bytes or string, or None if error
"""
try:
with open(temp_path, mode) as f:
return f.read()
except Exception as e:
st.error(f"Failed to read temporary file: {str(e)}")
return None
@staticmethod
def get_file_content(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]:
"""
Get file content using temp file approach.
Args:
uploaded_file: Streamlit UploadedFile object
as_text: Whether to return content as decoded text
encoding: Text encoding to use if as_text is True
Returns:
File content as bytes or string, or None if error
"""
temp_path = FileUploadHandler.save_to_temp(uploaded_file)
if not temp_path:
return None
try:
if as_text:
content = FileUploadHandler.read_from_temp(temp_path, mode='r')
else:
content = FileUploadHandler.read_from_temp(temp_path, mode='rb')
if as_text and content:
content = content.decode(encoding)
return content
finally:
# Optionally cleanup immediately after reading
# FileUploadHandler.cleanup_temp_file(temp_path)
pass
@staticmethod
def handle_zip_file(uploaded_file) -> Optional[zipfile.ZipFile]:
"""
Handle ZIP file uploads by saving to temp and returning ZipFile object.
Args:
uploaded_file: Streamlit UploadedFile object (should be a ZIP file)
Returns:
ZipFile object opened from temp location, or None if error
"""
temp_path = FileUploadHandler.save_to_temp(uploaded_file)
if not temp_path:
return None
try:
return zipfile.ZipFile(temp_path, 'r')
except Exception as e:
st.error(f"Failed to open ZIP file: {str(e)}")
FileUploadHandler.cleanup_temp_file(temp_path)
return None
@staticmethod
def cleanup_temp_file(temp_path: str):
"""
Remove a temporary file.
Args:
temp_path: Path to temporary file to remove
"""
try:
if os.path.exists(temp_path):
os.remove(temp_path)
FileUploadHandler._temp_files.discard(temp_path)
if 'temp_files' in st.session_state:
st.session_state.temp_files.discard(temp_path)
except Exception:
# Ignore cleanup errors
pass
@staticmethod
def cleanup_all_temp_files():
"""Cleanup all tracked temporary files."""
# Clean up class-tracked files
for temp_path in list(FileUploadHandler._temp_files):
FileUploadHandler.cleanup_temp_file(temp_path)
# Clean up session-tracked files
if 'temp_files' in st.session_state:
for temp_path in list(st.session_state.temp_files):
FileUploadHandler.cleanup_temp_file(temp_path)
@staticmethod
def cleanup_old_temp_files(max_age_hours: int = 1):
"""
Clean up old temporary files in /tmp directory.
Args:
max_age_hours: Maximum age of files to keep (in hours)
"""
try:
current_time = datetime.now()
temp_dir = "/tmp"
# Pattern to match our temporary files
for filename in os.listdir(temp_dir):
# Check if it matches our naming pattern
if filename.count('_') >= 3: # Our format has at least 3 underscores
filepath = os.path.join(temp_dir, filename)
# Check file age
if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
age_hours = (current_time - file_time).total_seconds() / 3600
if age_hours > max_age_hours:
try:
os.remove(filepath)
except:
pass
except Exception:
# Ignore cleanup errors
pass
@staticmethod
def validate_file_size(uploaded_file, max_size_mb: int = 300) -> bool:
"""
Validate file size before processing.
Args:
uploaded_file: Streamlit UploadedFile object
max_size_mb: Maximum allowed file size in MB
Returns:
True if file size is valid, False otherwise
"""
try:
file_size_mb = uploaded_file.size / (1024 * 1024)
if file_size_mb > max_size_mb:
st.error(f"File size ({file_size_mb:.1f} MB) exceeds maximum allowed size ({max_size_mb} MB)")
return False
return True
except Exception:
return True # Allow processing if we can't determine size
# Register cleanup on exit
atexit.register(FileUploadHandler.cleanup_all_temp_files)