File size: 13,559 Bytes
d02b81b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2dd6a60
d02b81b
 
 
2dd6a60
 
 
d02b81b
 
 
2dd6a60
d02b81b
 
 
 
 
 
2dd6a60
d02b81b
2dd6a60
d02b81b
 
 
2dd6a60
d02b81b
 
 
 
 
 
 
 
 
 
 
 
 
2dd6a60
 
 
 
 
 
d02b81b
 
2dd6a60
d02b81b
 
2dd6a60
d02b81b
 
2dd6a60
d02b81b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
"""
file_browser_manager.py - Integrated File Browser for Gradio
=============================================================

Provides real-time file access and management for all generated artifacts.
Works seamlessly with the existing artifact registry system.

Author: AI Lab Team
Version: 1.0
"""

import os
import json
import shutil
import zipfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import mimetypes

from logging_config import get_logger

log = get_logger(__name__)


class FileBrowserManager:
    """
    Manages file browsing, preview, and download functionality.
    Integrates with artifact_registry for complete file tracking.
    """
    
    def __init__(self, base_dirs: List[str] = None):
        """
        Initialize file browser.
        
        Args:
            base_dirs: List of directories to monitor (default: outputs, uploads)
        """
        self.base_dirs = base_dirs or [
            "outputs",
            "outputs/user_artifacts",
            "uploads",
            "/tmp"
        ]
        
        # Ensure all directories exist
        for directory in self.base_dirs:
            os.makedirs(directory, exist_ok=True)
        
        log.info(f"πŸ“‚ File Browser initialized: {len(self.base_dirs)} directories")
    
    def scan_all_files(self) -> List[Dict]:
        """
        Scan all monitored directories and return file information.
    
        Returns:
            List of file info dicts with path, size, type, etc.
        """
        all_files = []  # List to hold file information
        unique_files = set()  # Set to track unique file paths
    
        for base_dir in self.base_dirs:
            if not os.path.exists(base_dir):
                continue
    
            try:
                for root, dirs, files in os.walk(base_dir):
                    for filename in files:
                        # Skip hidden files and system files
                        if filename.startswith('.') or filename.endswith('.json'):
                            continue
    
                        filepath = os.path.join(root, filename)
    
                        try:
                            stat = os.stat(filepath)
                            
                            # Prepare file information
                            file_info = {
                                'filename': filename,
                                'path': filepath,
                                'relative_path': os.path.relpath(filepath, base_dir),
                                'directory': os.path.dirname(filepath),
                                'size_bytes': stat.st_size,
                                'size_kb': round(stat.st_size / 1000, 1),
                                'size_mb': round(stat.st_size / (1000 * 1000), 2),
                                'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
                                'extension': os.path.splitext(filename)[1],
                                'type': self._get_file_type(filename),
                                'base_dir': base_dir
                            }
    
                            # Check for duplicates using the correct variable
                            if filepath not in unique_files:
                                unique_files.add(filepath)
                                all_files.append(file_info)  # Append the full file_info dict
    
                        except Exception as e:
                            log.warning(f"Failed to stat {filepath}: {e}")
    
            except Exception as e:
                log.error(f"Failed to scan {base_dir}: {e}")
    
        # Sort by modified time (newest first)
        all_files.sort(key=lambda x: x['modified'], reverse=True)
    
        return all_files
    
    def _get_file_type(self, filename: str) -> str:
        """Determine file type from extension."""
        ext = os.path.splitext(filename)[1].lower()
        
        type_map = {
            '.py': 'Python Script',
            '.ipynb': 'Jupyter Notebook',
            '.js': 'JavaScript',
            '.ts': 'TypeScript',
            '.html': 'HTML',
            '.css': 'CSS',
            '.json': 'JSON',
            '.txt': 'Text',
            '.md': 'Markdown',
            '.docx': 'Word Document',
            '.xlsx': 'Excel Spreadsheet',
            '.pdf': 'PDF Document',
            '.png': 'PNG Image',
            '.jpg': 'JPEG Image',
            '.jpeg': 'JPEG Image',
            '.gif': 'GIF Image',
            '.svg': 'SVG Image',
            '.csv': 'CSV Data',
            '.zip': 'ZIP Archive'
        }
        
        return type_map.get(ext, 'File')
    
    def get_file_tree_markdown(self, max_files: int = 50) -> str:
        """
        Generate markdown representation of file tree.
        
        Args:
            max_files: Maximum files to display
            
        Returns:
            Markdown formatted file tree
        """
        files = self.scan_all_files()
        
        if not files:
            return "πŸ“ **No files found**\n\n*Upload files or generate artifacts to see them here.*"
        
        # Group by type
        by_type = {}
        for file in files:
            ftype = file['type']
            if ftype not in by_type:
                by_type[ftype] = []
            by_type[ftype].append(file)
        
        # Build markdown
        md = f"## πŸ“‚ File Browser\n\n"
        md += f"**Total: {len(files)} files**\n\n"
        
        # Summary by type
        md += "### πŸ“Š By Type\n\n"
        for ftype, type_files in sorted(by_type.items()):
            total_size = sum(f['size_kb'] for f in type_files)
            md += f"- **{ftype}**: {len(type_files)} files ({total_size:.1f} KB)\n"
        
        md += "\n### πŸ“„ Recent Files\n\n"
        
        # Show recent files
        display_files = files[:max_files]
        
        for file in display_files:
            icon = self._get_file_icon(file['type'])
            md += f"{icon} **{file['filename']}**\n"
            md += f"   - Type: {file['type']}\n"
            md += f"   - Size: {file['size_kb']} KB\n"
            md += f"   - Path: `{file['relative_path']}`\n"
            md += f"   - Modified: {file['modified'][:16]}\n\n"
        
        if len(files) > max_files:
            md += f"\n*... and {len(files) - max_files} more files*\n"
        
        return md
    
    def _get_file_icon(self, file_type: str) -> str:
        """Get emoji icon for file type."""
        icon_map = {
            'Python Script': '🐍',
            'Jupyter Notebook': 'πŸ““',
            'JavaScript': 'πŸ“œ',
            'TypeScript': 'πŸ“˜',
            'HTML': '🌐',
            'Word Document': 'πŸ“„',
            'Excel Spreadsheet': 'πŸ“Š',
            'PDF Document': 'πŸ“•',
            'Markdown': 'πŸ“',
            'Text': 'πŸ“ƒ',
            'Image': 'πŸ–ΌοΈ',
            'PNG Image': 'πŸ–ΌοΈ',
            'JPEG Image': 'πŸ–ΌοΈ',
            'CSV Data': 'πŸ“ˆ',
            'ZIP Archive': 'πŸ“¦'
        }
        return icon_map.get(file_type, 'πŸ“„')
    
    def get_files_for_download(self) -> List[str]:
        """
        Get list of file paths for Gradio Files component.
        
        Returns:
            List of absolute file paths
        """
        files = self.scan_all_files()
        return [f['path'] for f in files]
    
    def create_download_package(self, session_id: str = None) -> str:
        """
        Create a ZIP file containing all recent artifacts.
        
        Args:
            session_id: Optional session ID to filter files
            
        Returns:
            Path to created ZIP file
        """
        files = self.scan_all_files()
        
        if not files:
            raise ValueError("No files to package")
        
        # Create ZIP filename
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        zip_filename = f"artifacts_{timestamp}.zip"
        zip_path = os.path.join("outputs", zip_filename)
        
        # Create ZIP
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file_info in files:
                # Add file with relative path
                arcname = os.path.join(
                    file_info['type'].replace(' ', '_'),
                    file_info['filename']
                )
                zipf.write(file_info['path'], arcname)
        
        log.info(f"πŸ“¦ Created download package: {zip_filename} ({len(files)} files)")
        
        return zip_path
    
    def get_file_stats(self) -> Dict:
        """
        Get comprehensive file statistics.
        
        Returns:
            Dict with file counts, sizes, and types
        """
        files = self.scan_all_files()
        
        total_size = sum(f['size_bytes'] for f in files)
        
        # Count by type
        by_type = {}
        for file in files:
            ftype = file['type']
            by_type[ftype] = by_type.get(ftype, 0) + 1
        
        # Count by directory
        by_dir = {}
        for file in files:
            base = file['base_dir']
            by_dir[base] = by_dir.get(base, 0) + 1
        
        return {
            'total_files': len(files),
            'total_size_bytes': total_size,
            'total_size_mb': round(total_size / (1000 * 1000), 2),
            'by_type': by_type,
            'by_directory': by_dir,
            'most_recent': files[0] if files else None
        }
    
    def delete_file(self, filepath: str) -> Tuple[bool, str]:
        """
        Delete a specific file.
        
        Args:
            filepath: Absolute path to file
            
        Returns:
            (success, message) tuple
        """
        try:
            if not os.path.exists(filepath):
                return False, "File not found"
            
            os.remove(filepath)
            log.info(f"πŸ—‘οΈ Deleted file: {filepath}")
            return True, f"Deleted {os.path.basename(filepath)}"
            
        except Exception as e:
            log.error(f"Failed to delete {filepath}: {e}")
            return False, f"Error: {e}"
    
    def clear_all_files(self) -> Tuple[int, str]:
        """
        Clear all files from monitored directories.
        
        Returns:
            (count, message) tuple
        """
        files = self.scan_all_files()
        deleted = 0
        
        for file_info in files:
            try:
                os.remove(file_info['path'])
                deleted += 1
            except Exception as e:
                log.warning(f"Failed to delete {file_info['path']}: {e}")
        
        log.info(f"πŸ—‘οΈ Cleared {deleted} files")
        return deleted, f"Cleared {deleted} files"
    
    def get_file_preview(self, filepath: str, max_lines: int = 50) -> str:
        """
        Generate preview of file content.
        
        Args:
            filepath: Path to file
            max_lines: Maximum lines to show for text files
            
        Returns:
            Preview text or message
        """
        if not os.path.exists(filepath):
            return "❌ File not found"
        
        ext = os.path.splitext(filepath)[1].lower()
        
        # Text-based files
        text_extensions = ['.txt', '.md', '.py', '.js', '.ts', '.json', '.csv', '.html', '.css']
        
        if ext in text_extensions:
            try:
                with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                    lines = f.readlines()
                    
                if len(lines) <= max_lines:
                    return ''.join(lines)
                else:
                    preview = ''.join(lines[:max_lines])
                    return f"{preview}\n\n... ({len(lines) - max_lines} more lines)"
                    
            except Exception as e:
                return f"❌ Preview error: {e}"
        
        # Binary files
        size = os.path.getsize(filepath)
        return f"πŸ“¦ Binary file ({size / 1000:.1f} KB)\n\nDownload to view content."


# Global instance
file_browser = FileBrowserManager()


def get_file_browser() -> FileBrowserManager:
    """Get global file browser instance."""
    return file_browser


# Convenience functions for Gradio integration
def refresh_file_list() -> str:
    """Refresh and return file tree markdown."""
    return file_browser.get_file_tree_markdown()


def get_download_files() -> List[str]:
    """Get file paths for download."""
    return file_browser.get_files_for_download()


def create_zip_package() -> str:
    """Create and return path to ZIP package."""
    try:
        return file_browser.create_download_package()
    except Exception as e:
        log.error(f"Failed to create ZIP: {e}")
        return ""


def get_stats_markdown() -> str:
    """Get file statistics as markdown."""
    stats = file_browser.get_file_stats()
    
    md = "### πŸ“Š Statistics\n\n"
    md += f"- **Total Files**: {stats['total_files']}\n"
    md += f"- **Total Size**: {stats['total_size_mb']} MB\n\n"
    
    if stats['by_type']:
        md += "**By Type:**\n"
        for ftype, count in sorted(stats['by_type'].items()):
            md += f"- {ftype}: {count}\n"
    
    return md


# Export all
__all__ = [
    'FileBrowserManager',
    'file_browser',
    'get_file_browser',
    'refresh_file_list',
    'get_download_files',
    'create_zip_package',
    'get_stats_markdown'
]