SPOC_V1 / file_browser_manager.py
JatinAutonomousLabs's picture
Update file_browser_manager.py
2dd6a60 verified
raw
history blame
13.6 kB
"""
file_browser_manager.py - Integrated File Browser for Gradio
=============================================================
Provides real-time file access and management for all generated artifacts.
Works seamlessly with the existing artifact registry system.
Author: AI Lab Team
Version: 1.0
"""
import os
import json
import shutil
import zipfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import mimetypes
from logging_config import get_logger
log = get_logger(__name__)
class FileBrowserManager:
"""
Manages file browsing, preview, and download functionality.
Integrates with artifact_registry for complete file tracking.
"""
def __init__(self, base_dirs: List[str] = None):
"""
Initialize file browser.
Args:
base_dirs: List of directories to monitor (default: outputs, uploads)
"""
self.base_dirs = base_dirs or [
"outputs",
"outputs/user_artifacts",
"uploads",
"/tmp"
]
# Ensure all directories exist
for directory in self.base_dirs:
os.makedirs(directory, exist_ok=True)
log.info(f"πŸ“‚ File Browser initialized: {len(self.base_dirs)} directories")
def scan_all_files(self) -> List[Dict]:
"""
Scan all monitored directories and return file information.
Returns:
List of file info dicts with path, size, type, etc.
"""
all_files = [] # List to hold file information
unique_files = set() # Set to track unique file paths
for base_dir in self.base_dirs:
if not os.path.exists(base_dir):
continue
try:
for root, dirs, files in os.walk(base_dir):
for filename in files:
# Skip hidden files and system files
if filename.startswith('.') or filename.endswith('.json'):
continue
filepath = os.path.join(root, filename)
try:
stat = os.stat(filepath)
# Prepare file information
file_info = {
'filename': filename,
'path': filepath,
'relative_path': os.path.relpath(filepath, base_dir),
'directory': os.path.dirname(filepath),
'size_bytes': stat.st_size,
'size_kb': round(stat.st_size / 1000, 1),
'size_mb': round(stat.st_size / (1000 * 1000), 2),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'extension': os.path.splitext(filename)[1],
'type': self._get_file_type(filename),
'base_dir': base_dir
}
# Check for duplicates using the correct variable
if filepath not in unique_files:
unique_files.add(filepath)
all_files.append(file_info) # Append the full file_info dict
except Exception as e:
log.warning(f"Failed to stat {filepath}: {e}")
except Exception as e:
log.error(f"Failed to scan {base_dir}: {e}")
# Sort by modified time (newest first)
all_files.sort(key=lambda x: x['modified'], reverse=True)
return all_files
def _get_file_type(self, filename: str) -> str:
"""Determine file type from extension."""
ext = os.path.splitext(filename)[1].lower()
type_map = {
'.py': 'Python Script',
'.ipynb': 'Jupyter Notebook',
'.js': 'JavaScript',
'.ts': 'TypeScript',
'.html': 'HTML',
'.css': 'CSS',
'.json': 'JSON',
'.txt': 'Text',
'.md': 'Markdown',
'.docx': 'Word Document',
'.xlsx': 'Excel Spreadsheet',
'.pdf': 'PDF Document',
'.png': 'PNG Image',
'.jpg': 'JPEG Image',
'.jpeg': 'JPEG Image',
'.gif': 'GIF Image',
'.svg': 'SVG Image',
'.csv': 'CSV Data',
'.zip': 'ZIP Archive'
}
return type_map.get(ext, 'File')
def get_file_tree_markdown(self, max_files: int = 50) -> str:
"""
Generate markdown representation of file tree.
Args:
max_files: Maximum files to display
Returns:
Markdown formatted file tree
"""
files = self.scan_all_files()
if not files:
return "πŸ“ **No files found**\n\n*Upload files or generate artifacts to see them here.*"
# Group by type
by_type = {}
for file in files:
ftype = file['type']
if ftype not in by_type:
by_type[ftype] = []
by_type[ftype].append(file)
# Build markdown
md = f"## πŸ“‚ File Browser\n\n"
md += f"**Total: {len(files)} files**\n\n"
# Summary by type
md += "### πŸ“Š By Type\n\n"
for ftype, type_files in sorted(by_type.items()):
total_size = sum(f['size_kb'] for f in type_files)
md += f"- **{ftype}**: {len(type_files)} files ({total_size:.1f} KB)\n"
md += "\n### πŸ“„ Recent Files\n\n"
# Show recent files
display_files = files[:max_files]
for file in display_files:
icon = self._get_file_icon(file['type'])
md += f"{icon} **{file['filename']}**\n"
md += f" - Type: {file['type']}\n"
md += f" - Size: {file['size_kb']} KB\n"
md += f" - Path: `{file['relative_path']}`\n"
md += f" - Modified: {file['modified'][:16]}\n\n"
if len(files) > max_files:
md += f"\n*... and {len(files) - max_files} more files*\n"
return md
def _get_file_icon(self, file_type: str) -> str:
"""Get emoji icon for file type."""
icon_map = {
'Python Script': '🐍',
'Jupyter Notebook': 'πŸ““',
'JavaScript': 'πŸ“œ',
'TypeScript': 'πŸ“˜',
'HTML': '🌐',
'Word Document': 'πŸ“„',
'Excel Spreadsheet': 'πŸ“Š',
'PDF Document': 'πŸ“•',
'Markdown': 'πŸ“',
'Text': 'πŸ“ƒ',
'Image': 'πŸ–ΌοΈ',
'PNG Image': 'πŸ–ΌοΈ',
'JPEG Image': 'πŸ–ΌοΈ',
'CSV Data': 'πŸ“ˆ',
'ZIP Archive': 'πŸ“¦'
}
return icon_map.get(file_type, 'πŸ“„')
def get_files_for_download(self) -> List[str]:
"""
Get list of file paths for Gradio Files component.
Returns:
List of absolute file paths
"""
files = self.scan_all_files()
return [f['path'] for f in files]
def create_download_package(self, session_id: str = None) -> str:
"""
Create a ZIP file containing all recent artifacts.
Args:
session_id: Optional session ID to filter files
Returns:
Path to created ZIP file
"""
files = self.scan_all_files()
if not files:
raise ValueError("No files to package")
# Create ZIP filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"artifacts_{timestamp}.zip"
zip_path = os.path.join("outputs", zip_filename)
# Create ZIP
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_info in files:
# Add file with relative path
arcname = os.path.join(
file_info['type'].replace(' ', '_'),
file_info['filename']
)
zipf.write(file_info['path'], arcname)
log.info(f"πŸ“¦ Created download package: {zip_filename} ({len(files)} files)")
return zip_path
def get_file_stats(self) -> Dict:
"""
Get comprehensive file statistics.
Returns:
Dict with file counts, sizes, and types
"""
files = self.scan_all_files()
total_size = sum(f['size_bytes'] for f in files)
# Count by type
by_type = {}
for file in files:
ftype = file['type']
by_type[ftype] = by_type.get(ftype, 0) + 1
# Count by directory
by_dir = {}
for file in files:
base = file['base_dir']
by_dir[base] = by_dir.get(base, 0) + 1
return {
'total_files': len(files),
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1000 * 1000), 2),
'by_type': by_type,
'by_directory': by_dir,
'most_recent': files[0] if files else None
}
def delete_file(self, filepath: str) -> Tuple[bool, str]:
"""
Delete a specific file.
Args:
filepath: Absolute path to file
Returns:
(success, message) tuple
"""
try:
if not os.path.exists(filepath):
return False, "File not found"
os.remove(filepath)
log.info(f"πŸ—‘οΈ Deleted file: {filepath}")
return True, f"Deleted {os.path.basename(filepath)}"
except Exception as e:
log.error(f"Failed to delete {filepath}: {e}")
return False, f"Error: {e}"
def clear_all_files(self) -> Tuple[int, str]:
"""
Clear all files from monitored directories.
Returns:
(count, message) tuple
"""
files = self.scan_all_files()
deleted = 0
for file_info in files:
try:
os.remove(file_info['path'])
deleted += 1
except Exception as e:
log.warning(f"Failed to delete {file_info['path']}: {e}")
log.info(f"πŸ—‘οΈ Cleared {deleted} files")
return deleted, f"Cleared {deleted} files"
def get_file_preview(self, filepath: str, max_lines: int = 50) -> str:
"""
Generate preview of file content.
Args:
filepath: Path to file
max_lines: Maximum lines to show for text files
Returns:
Preview text or message
"""
if not os.path.exists(filepath):
return "❌ File not found"
ext = os.path.splitext(filepath)[1].lower()
# Text-based files
text_extensions = ['.txt', '.md', '.py', '.js', '.ts', '.json', '.csv', '.html', '.css']
if ext in text_extensions:
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
if len(lines) <= max_lines:
return ''.join(lines)
else:
preview = ''.join(lines[:max_lines])
return f"{preview}\n\n... ({len(lines) - max_lines} more lines)"
except Exception as e:
return f"❌ Preview error: {e}"
# Binary files
size = os.path.getsize(filepath)
return f"πŸ“¦ Binary file ({size / 1000:.1f} KB)\n\nDownload to view content."
# Global instance
file_browser = FileBrowserManager()
def get_file_browser() -> FileBrowserManager:
"""Get global file browser instance."""
return file_browser
# Convenience functions for Gradio integration
def refresh_file_list() -> str:
"""Refresh and return file tree markdown."""
return file_browser.get_file_tree_markdown()
def get_download_files() -> List[str]:
"""Get file paths for download."""
return file_browser.get_files_for_download()
def create_zip_package() -> str:
"""Create and return path to ZIP package."""
try:
return file_browser.create_download_package()
except Exception as e:
log.error(f"Failed to create ZIP: {e}")
return ""
def get_stats_markdown() -> str:
"""Get file statistics as markdown."""
stats = file_browser.get_file_stats()
md = "### πŸ“Š Statistics\n\n"
md += f"- **Total Files**: {stats['total_files']}\n"
md += f"- **Total Size**: {stats['total_size_mb']} MB\n\n"
if stats['by_type']:
md += "**By Type:**\n"
for ftype, count in sorted(stats['by_type'].items()):
md += f"- {ftype}: {count}\n"
return md
# Export all
__all__ = [
'FileBrowserManager',
'file_browser',
'get_file_browser',
'refresh_file_list',
'get_download_files',
'create_zip_package',
'get_stats_markdown'
]