""" file_browser_manager.py - Integrated File Browser for Gradio ============================================================= Provides real-time file access and management for all generated artifacts. Works seamlessly with the existing artifact registry system. Author: AI Lab Team Version: 1.0 """ import os import json import shutil import zipfile from datetime import datetime from pathlib import Path from typing import List, Dict, Tuple, Optional import mimetypes from logging_config import get_logger log = get_logger(__name__) class FileBrowserManager: """ Manages file browsing, preview, and download functionality. Integrates with artifact_registry for complete file tracking. """ def __init__(self, base_dirs: List[str] = None): """ Initialize file browser. Args: base_dirs: List of directories to monitor (default: outputs, uploads) """ self.base_dirs = base_dirs or [ "outputs", "outputs/user_artifacts", "uploads", "/tmp" ] # Ensure all directories exist for directory in self.base_dirs: os.makedirs(directory, exist_ok=True) log.info(f"📂 File Browser initialized: {len(self.base_dirs)} directories") def scan_all_files(self) -> List[Dict]: """ Scan all monitored directories and return file information. Returns: List of file info dicts with path, size, type, etc. """ all_files = [] # List to hold file information unique_files = set() # Set to track unique file paths for base_dir in self.base_dirs: if not os.path.exists(base_dir): continue try: for root, dirs, files in os.walk(base_dir): for filename in files: # Skip hidden files and system files if filename.startswith('.') or filename.endswith('.json'): continue filepath = os.path.join(root, filename) try: stat = os.stat(filepath) # Prepare file information file_info = { 'filename': filename, 'path': filepath, 'relative_path': os.path.relpath(filepath, base_dir), 'directory': os.path.dirname(filepath), 'size_bytes': stat.st_size, 'size_kb': round(stat.st_size / 1000, 1), 'size_mb': round(stat.st_size / (1000 * 1000), 2), 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'extension': os.path.splitext(filename)[1], 'type': self._get_file_type(filename), 'base_dir': base_dir } # Check for duplicates using the correct variable if filepath not in unique_files: unique_files.add(filepath) all_files.append(file_info) # Append the full file_info dict except Exception as e: log.warning(f"Failed to stat {filepath}: {e}") except Exception as e: log.error(f"Failed to scan {base_dir}: {e}") # Sort by modified time (newest first) all_files.sort(key=lambda x: x['modified'], reverse=True) return all_files def _get_file_type(self, filename: str) -> str: """Determine file type from extension.""" ext = os.path.splitext(filename)[1].lower() type_map = { '.py': 'Python Script', '.ipynb': 'Jupyter Notebook', '.js': 'JavaScript', '.ts': 'TypeScript', '.html': 'HTML', '.css': 'CSS', '.json': 'JSON', '.txt': 'Text', '.md': 'Markdown', '.docx': 'Word Document', '.xlsx': 'Excel Spreadsheet', '.pdf': 'PDF Document', '.png': 'PNG Image', '.jpg': 'JPEG Image', '.jpeg': 'JPEG Image', '.gif': 'GIF Image', '.svg': 'SVG Image', '.csv': 'CSV Data', '.zip': 'ZIP Archive' } return type_map.get(ext, 'File') def get_file_tree_markdown(self, max_files: int = 50) -> str: """ Generate markdown representation of file tree. Args: max_files: Maximum files to display Returns: Markdown formatted file tree """ files = self.scan_all_files() if not files: return "📁 **No files found**\n\n*Upload files or generate artifacts to see them here.*" # Group by type by_type = {} for file in files: ftype = file['type'] if ftype not in by_type: by_type[ftype] = [] by_type[ftype].append(file) # Build markdown md = f"## 📂 File Browser\n\n" md += f"**Total: {len(files)} files**\n\n" # Summary by type md += "### 📊 By Type\n\n" for ftype, type_files in sorted(by_type.items()): total_size = sum(f['size_kb'] for f in type_files) md += f"- **{ftype}**: {len(type_files)} files ({total_size:.1f} KB)\n" md += "\n### 📄 Recent Files\n\n" # Show recent files display_files = files[:max_files] for file in display_files: icon = self._get_file_icon(file['type']) md += f"{icon} **{file['filename']}**\n" md += f" - Type: {file['type']}\n" md += f" - Size: {file['size_kb']} KB\n" md += f" - Path: `{file['relative_path']}`\n" md += f" - Modified: {file['modified'][:16]}\n\n" if len(files) > max_files: md += f"\n*... and {len(files) - max_files} more files*\n" return md def _get_file_icon(self, file_type: str) -> str: """Get emoji icon for file type.""" icon_map = { 'Python Script': '🐍', 'Jupyter Notebook': '📓', 'JavaScript': '📜', 'TypeScript': '📘', 'HTML': '🌐', 'Word Document': '📄', 'Excel Spreadsheet': '📊', 'PDF Document': '📕', 'Markdown': '📝', 'Text': '📃', 'Image': '🖼️', 'PNG Image': '🖼️', 'JPEG Image': '🖼️', 'CSV Data': '📈', 'ZIP Archive': '📦' } return icon_map.get(file_type, '📄') def get_files_for_download(self) -> List[str]: """ Get list of file paths for Gradio Files component. Returns: List of absolute file paths """ files = self.scan_all_files() return [f['path'] for f in files] def create_download_package(self, session_id: str = None) -> str: """ Create a ZIP file containing all recent artifacts. Args: session_id: Optional session ID to filter files Returns: Path to created ZIP file """ files = self.scan_all_files() if not files: raise ValueError("No files to package") # Create ZIP filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"artifacts_{timestamp}.zip" zip_path = os.path.join("outputs", zip_filename) # Create ZIP with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_info in files: # Add file with relative path arcname = os.path.join( file_info['type'].replace(' ', '_'), file_info['filename'] ) zipf.write(file_info['path'], arcname) log.info(f"📦 Created download package: {zip_filename} ({len(files)} files)") return zip_path def get_file_stats(self) -> Dict: """ Get comprehensive file statistics. Returns: Dict with file counts, sizes, and types """ files = self.scan_all_files() total_size = sum(f['size_bytes'] for f in files) # Count by type by_type = {} for file in files: ftype = file['type'] by_type[ftype] = by_type.get(ftype, 0) + 1 # Count by directory by_dir = {} for file in files: base = file['base_dir'] by_dir[base] = by_dir.get(base, 0) + 1 return { 'total_files': len(files), 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1000 * 1000), 2), 'by_type': by_type, 'by_directory': by_dir, 'most_recent': files[0] if files else None } def delete_file(self, filepath: str) -> Tuple[bool, str]: """ Delete a specific file. Args: filepath: Absolute path to file Returns: (success, message) tuple """ try: if not os.path.exists(filepath): return False, "File not found" os.remove(filepath) log.info(f"🗑️ Deleted file: {filepath}") return True, f"Deleted {os.path.basename(filepath)}" except Exception as e: log.error(f"Failed to delete {filepath}: {e}") return False, f"Error: {e}" def clear_all_files(self) -> Tuple[int, str]: """ Clear all files from monitored directories. Returns: (count, message) tuple """ files = self.scan_all_files() deleted = 0 for file_info in files: try: os.remove(file_info['path']) deleted += 1 except Exception as e: log.warning(f"Failed to delete {file_info['path']}: {e}") log.info(f"🗑️ Cleared {deleted} files") return deleted, f"Cleared {deleted} files" def get_file_preview(self, filepath: str, max_lines: int = 50) -> str: """ Generate preview of file content. Args: filepath: Path to file max_lines: Maximum lines to show for text files Returns: Preview text or message """ if not os.path.exists(filepath): return "❌ File not found" ext = os.path.splitext(filepath)[1].lower() # Text-based files text_extensions = ['.txt', '.md', '.py', '.js', '.ts', '.json', '.csv', '.html', '.css'] if ext in text_extensions: try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() if len(lines) <= max_lines: return ''.join(lines) else: preview = ''.join(lines[:max_lines]) return f"{preview}\n\n... ({len(lines) - max_lines} more lines)" except Exception as e: return f"❌ Preview error: {e}" # Binary files size = os.path.getsize(filepath) return f"📦 Binary file ({size / 1000:.1f} KB)\n\nDownload to view content." # Global instance file_browser = FileBrowserManager() def get_file_browser() -> FileBrowserManager: """Get global file browser instance.""" return file_browser # Convenience functions for Gradio integration def refresh_file_list() -> str: """Refresh and return file tree markdown.""" return file_browser.get_file_tree_markdown() def get_download_files() -> List[str]: """Get file paths for download.""" return file_browser.get_files_for_download() def create_zip_package() -> str: """Create and return path to ZIP package.""" try: return file_browser.create_download_package() except Exception as e: log.error(f"Failed to create ZIP: {e}") return "" def get_stats_markdown() -> str: """Get file statistics as markdown.""" stats = file_browser.get_file_stats() md = "### 📊 Statistics\n\n" md += f"- **Total Files**: {stats['total_files']}\n" md += f"- **Total Size**: {stats['total_size_mb']} MB\n\n" if stats['by_type']: md += "**By Type:**\n" for ftype, count in sorted(stats['by_type'].items()): md += f"- {ftype}: {count}\n" return md # Export all __all__ = [ 'FileBrowserManager', 'file_browser', 'get_file_browser', 'refresh_file_list', 'get_download_files', 'create_zip_package', 'get_stats_markdown' ]