simple-text-analyzer / web_app /utils /memory_file_handler.py
egumasa's picture
emuTAALES
e7279e4
"""
Memory-based File Handler for Hugging Face Spaces Compatibility
This module provides an alternative to disk-based file handling by keeping
files in memory, avoiding 403 errors from filesystem restrictions.
"""
import streamlit as st
from io import BytesIO, StringIO
from typing import Optional, Union, Dict, Any
import pandas as pd
import zipfile
import csv
class MemoryFileHandler:
"""Handle files entirely in memory to avoid filesystem restrictions."""
@staticmethod
def process_uploaded_file(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]:
"""
Process uploaded file directly from Streamlit's UploadedFile object.
Args:
uploaded_file: Streamlit UploadedFile object
as_text: Whether to return content as decoded text
encoding: Text encoding to use if as_text is True
Returns:
File content as bytes or string, or None if error
"""
try:
# Reset file pointer to beginning
uploaded_file.seek(0)
# Read content directly from uploaded file
if as_text:
# For text mode, decode the bytes
content = uploaded_file.read()
if isinstance(content, bytes):
return content.decode(encoding)
return content
else:
# For binary mode, return raw bytes
return uploaded_file.read()
except Exception as e:
st.error(f"Failed to read file: {str(e)}")
return None
@staticmethod
def process_csv_tsv_file(uploaded_file, delimiter: Optional[str] = None) -> Optional[pd.DataFrame]:
"""
Process CSV/TSV file directly into pandas DataFrame.
Args:
uploaded_file: Streamlit UploadedFile object
delimiter: Column delimiter (auto-detected if None)
Returns:
DataFrame or None if error
"""
try:
# Reset file pointer
uploaded_file.seek(0)
# Auto-detect delimiter if not provided
if delimiter is None:
# Read first few lines to detect delimiter
uploaded_file.seek(0)
sample = uploaded_file.read(1024).decode('utf-8', errors='ignore')
uploaded_file.seek(0)
if '\t' in sample:
delimiter = '\t'
else:
delimiter = ','
# Read directly into DataFrame
df = pd.read_csv(uploaded_file, delimiter=delimiter, encoding='utf-8',
quoting=csv.QUOTE_MINIMAL, quotechar='"')
return df
except Exception as e:
st.error(f"Failed to process CSV/TSV file: {str(e)}")
return None
@staticmethod
def handle_zip_file(uploaded_file) -> Optional[Dict[str, bytes]]:
"""
Handle ZIP file uploads by extracting contents to memory.
Args:
uploaded_file: Streamlit UploadedFile object (should be a ZIP file)
Returns:
Dictionary mapping filenames to file contents, or None if error
"""
try:
# Reset file pointer
uploaded_file.seek(0)
# Read ZIP file into memory
zip_bytes = BytesIO(uploaded_file.read())
# Extract files to memory
file_contents = {}
with zipfile.ZipFile(zip_bytes, 'r') as zip_file:
for filename in zip_file.namelist():
if not filename.endswith('/'): # Skip directories
file_contents[filename] = zip_file.read(filename)
return file_contents
except Exception as e:
st.error(f"Failed to process ZIP file: {str(e)}")
return None
@staticmethod
def create_download_content(content: Union[str, bytes], filename: str) -> bytes:
"""
Prepare content for download.
Args:
content: Content to download (string or bytes)
filename: Suggested filename for download
Returns:
Bytes ready for download
"""
if isinstance(content, str):
return content.encode('utf-8')
return content
@staticmethod
def store_in_session(key: str, content: Any):
"""
Store content in session state for persistence across reruns.
Args:
key: Session state key
content: Content to store
"""
st.session_state[key] = content
@staticmethod
def retrieve_from_session(key: str) -> Optional[Any]:
"""
Retrieve content from session state.
Args:
key: Session state key
Returns:
Stored content or None
"""
return st.session_state.get(key, None)
@staticmethod
def clear_session_storage(prefix: str = ""):
"""
Clear session storage.
Args:
prefix: Only clear keys starting with this prefix
"""
if prefix:
keys_to_remove = [k for k in st.session_state.keys() if k.startswith(prefix)]
for key in keys_to_remove:
del st.session_state[key]
else:
st.session_state.clear()