Spaces:
Paused
Paused
| """ | |
| file_browser_manager.py - Integrated File Browser for Gradio | |
| ============================================================= | |
| Provides real-time file access and management for all generated artifacts. | |
| Works seamlessly with the existing artifact registry system. | |
| Author: AI Lab Team | |
| Version: 1.0 | |
| """ | |
| import os | |
| import json | |
| import shutil | |
| import zipfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple, Optional | |
| import mimetypes | |
| from logging_config import get_logger | |
| log = get_logger(__name__) | |
| class FileBrowserManager: | |
| """ | |
| Manages file browsing, preview, and download functionality. | |
| Integrates with artifact_registry for complete file tracking. | |
| """ | |
| def __init__(self, base_dirs: List[str] = None): | |
| """ | |
| Initialize file browser. | |
| Args: | |
| base_dirs: List of directories to monitor (default: outputs, uploads) | |
| """ | |
| self.base_dirs = base_dirs or [ | |
| "outputs", | |
| "outputs/user_artifacts", | |
| "uploads", | |
| "/tmp" | |
| ] | |
| # Ensure all directories exist | |
| for directory in self.base_dirs: | |
| os.makedirs(directory, exist_ok=True) | |
| log.info(f"π File Browser initialized: {len(self.base_dirs)} directories") | |
| def scan_all_files(self) -> List[Dict]: | |
| """ | |
| Scan all monitored directories and return file information. | |
| Returns: | |
| List of file info dicts with path, size, type, etc. | |
| """ | |
| all_files = [] # List to hold file information | |
| unique_files = set() # Set to track unique file paths | |
| for base_dir in self.base_dirs: | |
| if not os.path.exists(base_dir): | |
| continue | |
| try: | |
| for root, dirs, files in os.walk(base_dir): | |
| for filename in files: | |
| # Skip hidden files and system files | |
| if filename.startswith('.') or filename.endswith('.json'): | |
| continue | |
| filepath = os.path.join(root, filename) | |
| try: | |
| stat = os.stat(filepath) | |
| # Prepare file information | |
| file_info = { | |
| 'filename': filename, | |
| 'path': filepath, | |
| 'relative_path': os.path.relpath(filepath, base_dir), | |
| 'directory': os.path.dirname(filepath), | |
| 'size_bytes': stat.st_size, | |
| 'size_kb': round(stat.st_size / 1000, 1), | |
| 'size_mb': round(stat.st_size / (1000 * 1000), 2), | |
| 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| 'extension': os.path.splitext(filename)[1], | |
| 'type': self._get_file_type(filename), | |
| 'base_dir': base_dir | |
| } | |
| # Check for duplicates using the correct variable | |
| if filepath not in unique_files: | |
| unique_files.add(filepath) | |
| all_files.append(file_info) # Append the full file_info dict | |
| except Exception as e: | |
| log.warning(f"Failed to stat {filepath}: {e}") | |
| except Exception as e: | |
| log.error(f"Failed to scan {base_dir}: {e}") | |
| # Sort by modified time (newest first) | |
| all_files.sort(key=lambda x: x['modified'], reverse=True) | |
| return all_files | |
| def _get_file_type(self, filename: str) -> str: | |
| """Determine file type from extension.""" | |
| ext = os.path.splitext(filename)[1].lower() | |
| type_map = { | |
| '.py': 'Python Script', | |
| '.ipynb': 'Jupyter Notebook', | |
| '.js': 'JavaScript', | |
| '.ts': 'TypeScript', | |
| '.html': 'HTML', | |
| '.css': 'CSS', | |
| '.json': 'JSON', | |
| '.txt': 'Text', | |
| '.md': 'Markdown', | |
| '.docx': 'Word Document', | |
| '.xlsx': 'Excel Spreadsheet', | |
| '.pdf': 'PDF Document', | |
| '.png': 'PNG Image', | |
| '.jpg': 'JPEG Image', | |
| '.jpeg': 'JPEG Image', | |
| '.gif': 'GIF Image', | |
| '.svg': 'SVG Image', | |
| '.csv': 'CSV Data', | |
| '.zip': 'ZIP Archive' | |
| } | |
| return type_map.get(ext, 'File') | |
| def get_file_tree_markdown(self, max_files: int = 50) -> str: | |
| """ | |
| Generate markdown representation of file tree. | |
| Args: | |
| max_files: Maximum files to display | |
| Returns: | |
| Markdown formatted file tree | |
| """ | |
| files = self.scan_all_files() | |
| if not files: | |
| return "π **No files found**\n\n*Upload files or generate artifacts to see them here.*" | |
| # Group by type | |
| by_type = {} | |
| for file in files: | |
| ftype = file['type'] | |
| if ftype not in by_type: | |
| by_type[ftype] = [] | |
| by_type[ftype].append(file) | |
| # Build markdown | |
| md = f"## π File Browser\n\n" | |
| md += f"**Total: {len(files)} files**\n\n" | |
| # Summary by type | |
| md += "### π By Type\n\n" | |
| for ftype, type_files in sorted(by_type.items()): | |
| total_size = sum(f['size_kb'] for f in type_files) | |
| md += f"- **{ftype}**: {len(type_files)} files ({total_size:.1f} KB)\n" | |
| md += "\n### π Recent Files\n\n" | |
| # Show recent files | |
| display_files = files[:max_files] | |
| for file in display_files: | |
| icon = self._get_file_icon(file['type']) | |
| md += f"{icon} **{file['filename']}**\n" | |
| md += f" - Type: {file['type']}\n" | |
| md += f" - Size: {file['size_kb']} KB\n" | |
| md += f" - Path: `{file['relative_path']}`\n" | |
| md += f" - Modified: {file['modified'][:16]}\n\n" | |
| if len(files) > max_files: | |
| md += f"\n*... and {len(files) - max_files} more files*\n" | |
| return md | |
| def _get_file_icon(self, file_type: str) -> str: | |
| """Get emoji icon for file type.""" | |
| icon_map = { | |
| 'Python Script': 'π', | |
| 'Jupyter Notebook': 'π', | |
| 'JavaScript': 'π', | |
| 'TypeScript': 'π', | |
| 'HTML': 'π', | |
| 'Word Document': 'π', | |
| 'Excel Spreadsheet': 'π', | |
| 'PDF Document': 'π', | |
| 'Markdown': 'π', | |
| 'Text': 'π', | |
| 'Image': 'πΌοΈ', | |
| 'PNG Image': 'πΌοΈ', | |
| 'JPEG Image': 'πΌοΈ', | |
| 'CSV Data': 'π', | |
| 'ZIP Archive': 'π¦' | |
| } | |
| return icon_map.get(file_type, 'π') | |
| def get_files_for_download(self) -> List[str]: | |
| """ | |
| Get list of file paths for Gradio Files component. | |
| Returns: | |
| List of absolute file paths | |
| """ | |
| files = self.scan_all_files() | |
| return [f['path'] for f in files] | |
| def create_download_package(self, session_id: str = None) -> str: | |
| """ | |
| Create a ZIP file containing all recent artifacts. | |
| Args: | |
| session_id: Optional session ID to filter files | |
| Returns: | |
| Path to created ZIP file | |
| """ | |
| files = self.scan_all_files() | |
| if not files: | |
| raise ValueError("No files to package") | |
| # Create ZIP filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_filename = f"artifacts_{timestamp}.zip" | |
| zip_path = os.path.join("outputs", zip_filename) | |
| # Create ZIP | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file_info in files: | |
| # Add file with relative path | |
| arcname = os.path.join( | |
| file_info['type'].replace(' ', '_'), | |
| file_info['filename'] | |
| ) | |
| zipf.write(file_info['path'], arcname) | |
| log.info(f"π¦ Created download package: {zip_filename} ({len(files)} files)") | |
| return zip_path | |
| def get_file_stats(self) -> Dict: | |
| """ | |
| Get comprehensive file statistics. | |
| Returns: | |
| Dict with file counts, sizes, and types | |
| """ | |
| files = self.scan_all_files() | |
| total_size = sum(f['size_bytes'] for f in files) | |
| # Count by type | |
| by_type = {} | |
| for file in files: | |
| ftype = file['type'] | |
| by_type[ftype] = by_type.get(ftype, 0) + 1 | |
| # Count by directory | |
| by_dir = {} | |
| for file in files: | |
| base = file['base_dir'] | |
| by_dir[base] = by_dir.get(base, 0) + 1 | |
| return { | |
| 'total_files': len(files), | |
| 'total_size_bytes': total_size, | |
| 'total_size_mb': round(total_size / (1000 * 1000), 2), | |
| 'by_type': by_type, | |
| 'by_directory': by_dir, | |
| 'most_recent': files[0] if files else None | |
| } | |
| def delete_file(self, filepath: str) -> Tuple[bool, str]: | |
| """ | |
| Delete a specific file. | |
| Args: | |
| filepath: Absolute path to file | |
| Returns: | |
| (success, message) tuple | |
| """ | |
| try: | |
| if not os.path.exists(filepath): | |
| return False, "File not found" | |
| os.remove(filepath) | |
| log.info(f"ποΈ Deleted file: {filepath}") | |
| return True, f"Deleted {os.path.basename(filepath)}" | |
| except Exception as e: | |
| log.error(f"Failed to delete {filepath}: {e}") | |
| return False, f"Error: {e}" | |
| def clear_all_files(self) -> Tuple[int, str]: | |
| """ | |
| Clear all files from monitored directories. | |
| Returns: | |
| (count, message) tuple | |
| """ | |
| files = self.scan_all_files() | |
| deleted = 0 | |
| for file_info in files: | |
| try: | |
| os.remove(file_info['path']) | |
| deleted += 1 | |
| except Exception as e: | |
| log.warning(f"Failed to delete {file_info['path']}: {e}") | |
| log.info(f"ποΈ Cleared {deleted} files") | |
| return deleted, f"Cleared {deleted} files" | |
| def get_file_preview(self, filepath: str, max_lines: int = 50) -> str: | |
| """ | |
| Generate preview of file content. | |
| Args: | |
| filepath: Path to file | |
| max_lines: Maximum lines to show for text files | |
| Returns: | |
| Preview text or message | |
| """ | |
| if not os.path.exists(filepath): | |
| return "β File not found" | |
| ext = os.path.splitext(filepath)[1].lower() | |
| # Text-based files | |
| text_extensions = ['.txt', '.md', '.py', '.js', '.ts', '.json', '.csv', '.html', '.css'] | |
| if ext in text_extensions: | |
| try: | |
| with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
| lines = f.readlines() | |
| if len(lines) <= max_lines: | |
| return ''.join(lines) | |
| else: | |
| preview = ''.join(lines[:max_lines]) | |
| return f"{preview}\n\n... ({len(lines) - max_lines} more lines)" | |
| except Exception as e: | |
| return f"β Preview error: {e}" | |
| # Binary files | |
| size = os.path.getsize(filepath) | |
| return f"π¦ Binary file ({size / 1000:.1f} KB)\n\nDownload to view content." | |
| # Global instance | |
| file_browser = FileBrowserManager() | |
| def get_file_browser() -> FileBrowserManager: | |
| """Get global file browser instance.""" | |
| return file_browser | |
| # Convenience functions for Gradio integration | |
| def refresh_file_list() -> str: | |
| """Refresh and return file tree markdown.""" | |
| return file_browser.get_file_tree_markdown() | |
| def get_download_files() -> List[str]: | |
| """Get file paths for download.""" | |
| return file_browser.get_files_for_download() | |
| def create_zip_package() -> str: | |
| """Create and return path to ZIP package.""" | |
| try: | |
| return file_browser.create_download_package() | |
| except Exception as e: | |
| log.error(f"Failed to create ZIP: {e}") | |
| return "" | |
| def get_stats_markdown() -> str: | |
| """Get file statistics as markdown.""" | |
| stats = file_browser.get_file_stats() | |
| md = "### π Statistics\n\n" | |
| md += f"- **Total Files**: {stats['total_files']}\n" | |
| md += f"- **Total Size**: {stats['total_size_mb']} MB\n\n" | |
| if stats['by_type']: | |
| md += "**By Type:**\n" | |
| for ftype, count in sorted(stats['by_type'].items()): | |
| md += f"- {ftype}: {count}\n" | |
| return md | |
| # Export all | |
| __all__ = [ | |
| 'FileBrowserManager', | |
| 'file_browser', | |
| 'get_file_browser', | |
| 'refresh_file_list', | |
| 'get_download_files', | |
| 'create_zip_package', | |
| 'get_stats_markdown' | |
| ] |