Spaces:
Building
Building
| """ | |
| Memory-based File Handler for Hugging Face Spaces Compatibility | |
| This module provides an alternative to disk-based file handling by keeping | |
| files in memory, avoiding 403 errors from filesystem restrictions. | |
| """ | |
| import streamlit as st | |
| from io import BytesIO, StringIO | |
| from typing import Optional, Union, Dict, Any | |
| import pandas as pd | |
| import zipfile | |
| import csv | |
| class MemoryFileHandler: | |
| """Handle files entirely in memory to avoid filesystem restrictions.""" | |
| def process_uploaded_file(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]: | |
| """ | |
| Process uploaded file directly from Streamlit's UploadedFile object. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object | |
| as_text: Whether to return content as decoded text | |
| encoding: Text encoding to use if as_text is True | |
| Returns: | |
| File content as bytes or string, or None if error | |
| """ | |
| try: | |
| # Reset file pointer to beginning | |
| uploaded_file.seek(0) | |
| # Read content directly from uploaded file | |
| if as_text: | |
| # For text mode, decode the bytes | |
| content = uploaded_file.read() | |
| if isinstance(content, bytes): | |
| return content.decode(encoding) | |
| return content | |
| else: | |
| # For binary mode, return raw bytes | |
| return uploaded_file.read() | |
| except Exception as e: | |
| st.error(f"Failed to read file: {str(e)}") | |
| return None | |
| def process_csv_tsv_file(uploaded_file, delimiter: Optional[str] = None) -> Optional[pd.DataFrame]: | |
| """ | |
| Process CSV/TSV file directly into pandas DataFrame. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object | |
| delimiter: Column delimiter (auto-detected if None) | |
| Returns: | |
| DataFrame or None if error | |
| """ | |
| try: | |
| # Reset file pointer | |
| uploaded_file.seek(0) | |
| # Auto-detect delimiter if not provided | |
| if delimiter is None: | |
| # Read first few lines to detect delimiter | |
| uploaded_file.seek(0) | |
| sample = uploaded_file.read(1024).decode('utf-8', errors='ignore') | |
| uploaded_file.seek(0) | |
| if '\t' in sample: | |
| delimiter = '\t' | |
| else: | |
| delimiter = ',' | |
| # Read directly into DataFrame | |
| df = pd.read_csv(uploaded_file, delimiter=delimiter, encoding='utf-8', | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| return df | |
| except Exception as e: | |
| st.error(f"Failed to process CSV/TSV file: {str(e)}") | |
| return None | |
| def handle_zip_file(uploaded_file) -> Optional[Dict[str, bytes]]: | |
| """ | |
| Handle ZIP file uploads by extracting contents to memory. | |
| Args: | |
| uploaded_file: Streamlit UploadedFile object (should be a ZIP file) | |
| Returns: | |
| Dictionary mapping filenames to file contents, or None if error | |
| """ | |
| try: | |
| # Reset file pointer | |
| uploaded_file.seek(0) | |
| # Read ZIP file into memory | |
| zip_bytes = BytesIO(uploaded_file.read()) | |
| # Extract files to memory | |
| file_contents = {} | |
| with zipfile.ZipFile(zip_bytes, 'r') as zip_file: | |
| for filename in zip_file.namelist(): | |
| if not filename.endswith('/'): # Skip directories | |
| file_contents[filename] = zip_file.read(filename) | |
| return file_contents | |
| except Exception as e: | |
| st.error(f"Failed to process ZIP file: {str(e)}") | |
| return None | |
| def create_download_content(content: Union[str, bytes], filename: str) -> bytes: | |
| """ | |
| Prepare content for download. | |
| Args: | |
| content: Content to download (string or bytes) | |
| filename: Suggested filename for download | |
| Returns: | |
| Bytes ready for download | |
| """ | |
| if isinstance(content, str): | |
| return content.encode('utf-8') | |
| return content | |
| def store_in_session(key: str, content: Any): | |
| """ | |
| Store content in session state for persistence across reruns. | |
| Args: | |
| key: Session state key | |
| content: Content to store | |
| """ | |
| st.session_state[key] = content | |
| def retrieve_from_session(key: str) -> Optional[Any]: | |
| """ | |
| Retrieve content from session state. | |
| Args: | |
| key: Session state key | |
| Returns: | |
| Stored content or None | |
| """ | |
| return st.session_state.get(key, None) | |
| def clear_session_storage(prefix: str = ""): | |
| """ | |
| Clear session storage. | |
| Args: | |
| prefix: Only clear keys starting with this prefix | |
| """ | |
| if prefix: | |
| keys_to_remove = [k for k in st.session_state.keys() if k.startswith(prefix)] | |
| for key in keys_to_remove: | |
| del st.session_state[key] | |
| else: | |
| st.session_state.clear() | |