Spaces:
Building
Building
File size: 8,073 Bytes
eab1374 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
"""
File Upload Handler for Hugging Face Spaces Compatibility
This module provides utilities for handling file uploads in a way that's compatible
with Hugging Face Spaces restrictions. It uses the /tmp directory as an intermediate
storage location to work around direct file streaming limitations.
"""
import os
import tempfile
import uuid
from pathlib import Path
from typing import Optional, Union, BinaryIO
import streamlit as st
from datetime import datetime
import atexit
import zipfile
from io import BytesIO
class FileUploadHandler:
"""Handle file uploads with /tmp directory approach for HF Spaces compatibility."""
# Track temporary files for cleanup
_temp_files = set()
@staticmethod
def save_to_temp(uploaded_file, prefix: str = "") -> Optional[str]:
"""
Save uploaded file to /tmp directory and return the path.
Args:
uploaded_file: Streamlit UploadedFile object
prefix: Optional prefix for the temporary filename
Returns:
Path to saved temporary file, or None if error
"""
try:
# Generate unique filename to avoid conflicts
unique_id = str(uuid.uuid4())[:8]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = Path(uploaded_file.name).name # Sanitize filename
# Construct temporary filename
if prefix:
temp_filename = f"{prefix}_{timestamp}_{unique_id}_{safe_filename}"
else:
temp_filename = f"{timestamp}_{unique_id}_{safe_filename}"
temp_path = os.path.join("/tmp", temp_filename)
# Save to /tmp using getbuffer() which is more reliable
with open(temp_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
# Track for cleanup
FileUploadHandler._temp_files.add(temp_path)
# Store in session state for persistence across reruns
if 'temp_files' not in st.session_state:
st.session_state.temp_files = set()
st.session_state.temp_files.add(temp_path)
return temp_path
except Exception as e:
st.error(f"Failed to save uploaded file: {str(e)}")
return None
@staticmethod
def read_from_temp(temp_path: str, mode: str = 'rb') -> Optional[Union[bytes, str]]:
"""
Read file from temporary location.
Args:
temp_path: Path to temporary file
mode: Read mode ('rb' for binary, 'r' for text)
Returns:
File content as bytes or string, or None if error
"""
try:
with open(temp_path, mode) as f:
return f.read()
except Exception as e:
st.error(f"Failed to read temporary file: {str(e)}")
return None
@staticmethod
def get_file_content(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]:
"""
Get file content using temp file approach.
Args:
uploaded_file: Streamlit UploadedFile object
as_text: Whether to return content as decoded text
encoding: Text encoding to use if as_text is True
Returns:
File content as bytes or string, or None if error
"""
temp_path = FileUploadHandler.save_to_temp(uploaded_file)
if not temp_path:
return None
try:
if as_text:
content = FileUploadHandler.read_from_temp(temp_path, mode='r')
else:
content = FileUploadHandler.read_from_temp(temp_path, mode='rb')
if as_text and content:
content = content.decode(encoding)
return content
finally:
# Optionally cleanup immediately after reading
# FileUploadHandler.cleanup_temp_file(temp_path)
pass
@staticmethod
def handle_zip_file(uploaded_file) -> Optional[zipfile.ZipFile]:
"""
Handle ZIP file uploads by saving to temp and returning ZipFile object.
Args:
uploaded_file: Streamlit UploadedFile object (should be a ZIP file)
Returns:
ZipFile object opened from temp location, or None if error
"""
temp_path = FileUploadHandler.save_to_temp(uploaded_file)
if not temp_path:
return None
try:
return zipfile.ZipFile(temp_path, 'r')
except Exception as e:
st.error(f"Failed to open ZIP file: {str(e)}")
FileUploadHandler.cleanup_temp_file(temp_path)
return None
@staticmethod
def cleanup_temp_file(temp_path: str):
"""
Remove a temporary file.
Args:
temp_path: Path to temporary file to remove
"""
try:
if os.path.exists(temp_path):
os.remove(temp_path)
FileUploadHandler._temp_files.discard(temp_path)
if 'temp_files' in st.session_state:
st.session_state.temp_files.discard(temp_path)
except Exception:
# Ignore cleanup errors
pass
@staticmethod
def cleanup_all_temp_files():
"""Cleanup all tracked temporary files."""
# Clean up class-tracked files
for temp_path in list(FileUploadHandler._temp_files):
FileUploadHandler.cleanup_temp_file(temp_path)
# Clean up session-tracked files
if 'temp_files' in st.session_state:
for temp_path in list(st.session_state.temp_files):
FileUploadHandler.cleanup_temp_file(temp_path)
@staticmethod
def cleanup_old_temp_files(max_age_hours: int = 1):
"""
Clean up old temporary files in /tmp directory.
Args:
max_age_hours: Maximum age of files to keep (in hours)
"""
try:
current_time = datetime.now()
temp_dir = "/tmp"
# Pattern to match our temporary files
for filename in os.listdir(temp_dir):
# Check if it matches our naming pattern
if filename.count('_') >= 3: # Our format has at least 3 underscores
filepath = os.path.join(temp_dir, filename)
# Check file age
if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
age_hours = (current_time - file_time).total_seconds() / 3600
if age_hours > max_age_hours:
try:
os.remove(filepath)
except:
pass
except Exception:
# Ignore cleanup errors
pass
@staticmethod
def validate_file_size(uploaded_file, max_size_mb: int = 300) -> bool:
"""
Validate file size before processing.
Args:
uploaded_file: Streamlit UploadedFile object
max_size_mb: Maximum allowed file size in MB
Returns:
True if file size is valid, False otherwise
"""
try:
file_size_mb = uploaded_file.size / (1024 * 1024)
if file_size_mb > max_size_mb:
st.error(f"File size ({file_size_mb:.1f} MB) exceeds maximum allowed size ({max_size_mb} MB)")
return False
return True
except Exception:
return True # Allow processing if we can't determine size
# Register cleanup on exit
atexit.register(FileUploadHandler.cleanup_all_temp_files) |