File size: 5,648 Bytes
4b36911
 
 
 
 
 
 
 
 
 
 
 
e7279e4
4b36911
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7279e4
 
4b36911
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7279e4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
Memory-based File Handler for Hugging Face Spaces Compatibility

This module provides an alternative to disk-based file handling by keeping
files in memory, avoiding 403 errors from filesystem restrictions.
"""

import streamlit as st
from io import BytesIO, StringIO
from typing import Optional, Union, Dict, Any
import pandas as pd
import zipfile
import csv


class MemoryFileHandler:
    """Handle files entirely in memory to avoid filesystem restrictions."""
    
    @staticmethod
    def process_uploaded_file(uploaded_file, as_text: bool = False, encoding: str = 'utf-8') -> Optional[Union[bytes, str]]:
        """
        Process uploaded file directly from Streamlit's UploadedFile object.
        
        Args:
            uploaded_file: Streamlit UploadedFile object
            as_text: Whether to return content as decoded text
            encoding: Text encoding to use if as_text is True
            
        Returns:
            File content as bytes or string, or None if error
        """
        try:
            # Reset file pointer to beginning
            uploaded_file.seek(0)
            
            # Read content directly from uploaded file
            if as_text:
                # For text mode, decode the bytes
                content = uploaded_file.read()
                if isinstance(content, bytes):
                    return content.decode(encoding)
                return content
            else:
                # For binary mode, return raw bytes
                return uploaded_file.read()
                
        except Exception as e:
            st.error(f"Failed to read file: {str(e)}")
            return None
    
    @staticmethod
    def process_csv_tsv_file(uploaded_file, delimiter: Optional[str] = None) -> Optional[pd.DataFrame]:
        """
        Process CSV/TSV file directly into pandas DataFrame.
        
        Args:
            uploaded_file: Streamlit UploadedFile object
            delimiter: Column delimiter (auto-detected if None)
            
        Returns:
            DataFrame or None if error
        """
        try:
            # Reset file pointer
            uploaded_file.seek(0)
            
            # Auto-detect delimiter if not provided
            if delimiter is None:
                # Read first few lines to detect delimiter
                uploaded_file.seek(0)
                sample = uploaded_file.read(1024).decode('utf-8', errors='ignore')
                uploaded_file.seek(0)
                
                if '\t' in sample:
                    delimiter = '\t'
                else:
                    delimiter = ','
            
            # Read directly into DataFrame
            df = pd.read_csv(uploaded_file, delimiter=delimiter, encoding='utf-8',
                           quoting=csv.QUOTE_MINIMAL, quotechar='"')
            return df
            
        except Exception as e:
            st.error(f"Failed to process CSV/TSV file: {str(e)}")
            return None
    
    @staticmethod
    def handle_zip_file(uploaded_file) -> Optional[Dict[str, bytes]]:
        """
        Handle ZIP file uploads by extracting contents to memory.
        
        Args:
            uploaded_file: Streamlit UploadedFile object (should be a ZIP file)
            
        Returns:
            Dictionary mapping filenames to file contents, or None if error
        """
        try:
            # Reset file pointer
            uploaded_file.seek(0)
            
            # Read ZIP file into memory
            zip_bytes = BytesIO(uploaded_file.read())
            
            # Extract files to memory
            file_contents = {}
            with zipfile.ZipFile(zip_bytes, 'r') as zip_file:
                for filename in zip_file.namelist():
                    if not filename.endswith('/'):  # Skip directories
                        file_contents[filename] = zip_file.read(filename)
            
            return file_contents
            
        except Exception as e:
            st.error(f"Failed to process ZIP file: {str(e)}")
            return None
    
    @staticmethod
    def create_download_content(content: Union[str, bytes], filename: str) -> bytes:
        """
        Prepare content for download.
        
        Args:
            content: Content to download (string or bytes)
            filename: Suggested filename for download
            
        Returns:
            Bytes ready for download
        """
        if isinstance(content, str):
            return content.encode('utf-8')
        return content
    
    @staticmethod
    def store_in_session(key: str, content: Any):
        """
        Store content in session state for persistence across reruns.
        
        Args:
            key: Session state key
            content: Content to store
        """
        st.session_state[key] = content
    
    @staticmethod
    def retrieve_from_session(key: str) -> Optional[Any]:
        """
        Retrieve content from session state.
        
        Args:
            key: Session state key
            
        Returns:
            Stored content or None
        """
        return st.session_state.get(key, None)
    
    @staticmethod
    def clear_session_storage(prefix: str = ""):
        """
        Clear session storage.
        
        Args:
            prefix: Only clear keys starting with this prefix
        """
        if prefix:
            keys_to_remove = [k for k in st.session_state.keys() if k.startswith(prefix)]
            for key in keys_to_remove:
                del st.session_state[key]
        else:
            st.session_state.clear()