Spaces:
Sleeping
Sleeping
| """ | |
| Screenshot Storage Manager | |
| Handles persistent storage, retrieval, and cleanup of screenshots | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple | |
| from PIL import Image | |
| import shutil | |
| class ScreenshotStorage: | |
| """Manages persistent storage of screenshots with metadata and cleanup""" | |
| def __init__(self, base_dir: str = "data/screenshots"): | |
| """ | |
| Initialize storage manager | |
| Args: | |
| base_dir: Base directory for storing screenshots | |
| """ | |
| self.base_dir = Path(base_dir) | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| self.metadata_dir = self.base_dir / "metadata" | |
| self.metadata_dir.mkdir(exist_ok=True) | |
| self.logger = logging.getLogger(__name__) | |
| self.logger.info(f"β Screenshot storage initialized at {self.base_dir}") | |
| def save_screenshot( | |
| self, | |
| image: Image.Image, | |
| execution_id: str, | |
| viewport: str, | |
| screenshot_type: str, | |
| metadata: Optional[Dict] = None | |
| ) -> str: | |
| """ | |
| Save screenshot with metadata | |
| Args: | |
| image: PIL Image object | |
| execution_id: Unique execution identifier | |
| viewport: Viewport name (desktop, mobile, etc.) | |
| screenshot_type: Type of screenshot (figma, website, annotated, comparison) | |
| metadata: Optional metadata dictionary | |
| Returns: | |
| Path to saved screenshot | |
| """ | |
| try: | |
| # Create execution directory | |
| exec_dir = self.base_dir / execution_id | |
| exec_dir.mkdir(exist_ok=True) | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{viewport}_{screenshot_type}_{timestamp}.png" | |
| filepath = exec_dir / filename | |
| # Save image | |
| image.save(filepath, "PNG") | |
| self.logger.info(f"β Saved screenshot: {filepath}") | |
| # Save metadata | |
| if metadata is None: | |
| metadata = {} | |
| metadata.update({ | |
| "execution_id": execution_id, | |
| "viewport": viewport, | |
| "screenshot_type": screenshot_type, | |
| "timestamp": timestamp, | |
| "filepath": str(filepath), | |
| "file_size": filepath.stat().st_size, | |
| "image_size": image.size | |
| }) | |
| self._save_metadata(execution_id, viewport, screenshot_type, metadata) | |
| return str(filepath) | |
| except Exception as e: | |
| self.logger.error(f"β Error saving screenshot: {str(e)}") | |
| raise | |
| def _save_metadata( | |
| self, | |
| execution_id: str, | |
| viewport: str, | |
| screenshot_type: str, | |
| metadata: Dict | |
| ): | |
| """Save metadata for screenshot""" | |
| try: | |
| metadata_file = self.metadata_dir / f"{execution_id}_{viewport}_{screenshot_type}.json" | |
| with open(metadata_file, 'w') as f: | |
| json.dump(metadata, f, indent=2, default=str) | |
| self.logger.debug(f"β Saved metadata: {metadata_file}") | |
| except Exception as e: | |
| self.logger.error(f"β Error saving metadata: {str(e)}") | |
| def get_screenshot( | |
| self, | |
| execution_id: str, | |
| viewport: str, | |
| screenshot_type: str | |
| ) -> Optional[Image.Image]: | |
| """ | |
| Retrieve screenshot | |
| Args: | |
| execution_id: Execution identifier | |
| viewport: Viewport name | |
| screenshot_type: Type of screenshot | |
| Returns: | |
| PIL Image object or None if not found | |
| """ | |
| try: | |
| exec_dir = self.base_dir / execution_id | |
| # Find the most recent matching screenshot | |
| pattern = f"{viewport}_{screenshot_type}_*.png" | |
| matching_files = list(exec_dir.glob(pattern)) | |
| if not matching_files: | |
| self.logger.warning(f"β οΈ No screenshots found for {execution_id}/{viewport}/{screenshot_type}") | |
| return None | |
| # Get most recent | |
| latest_file = max(matching_files, key=lambda p: p.stat().st_mtime) | |
| image = Image.open(latest_file) | |
| self.logger.info(f"β Retrieved screenshot: {latest_file}") | |
| return image | |
| except Exception as e: | |
| self.logger.error(f"β Error retrieving screenshot: {str(e)}") | |
| return None | |
| def get_execution_screenshots(self, execution_id: str) -> Dict[str, List[str]]: | |
| """ | |
| Get all screenshots for an execution | |
| Args: | |
| execution_id: Execution identifier | |
| Returns: | |
| Dictionary mapping screenshot types to file paths | |
| """ | |
| try: | |
| exec_dir = self.base_dir / execution_id | |
| if not exec_dir.exists(): | |
| self.logger.warning(f"β οΈ Execution directory not found: {execution_id}") | |
| return {} | |
| screenshots = {} | |
| for file in sorted(exec_dir.glob("*.png")): | |
| # Parse filename: viewport_type_timestamp.png | |
| parts = file.stem.split("_") | |
| if len(parts) >= 2: | |
| screenshot_type = parts[1] | |
| if screenshot_type not in screenshots: | |
| screenshots[screenshot_type] = [] | |
| screenshots[screenshot_type].append(str(file)) | |
| self.logger.info(f"β Retrieved {sum(len(v) for v in screenshots.values())} screenshots for {execution_id}") | |
| return screenshots | |
| except Exception as e: | |
| self.logger.error(f"β Error retrieving execution screenshots: {str(e)}") | |
| return {} | |
| def get_execution_metadata(self, execution_id: str) -> Dict[str, Dict]: | |
| """ | |
| Get all metadata for an execution | |
| Args: | |
| execution_id: Execution identifier | |
| Returns: | |
| Dictionary of metadata | |
| """ | |
| try: | |
| metadata = {} | |
| pattern = f"{execution_id}_*.json" | |
| for metadata_file in self.metadata_dir.glob(pattern): | |
| with open(metadata_file, 'r') as f: | |
| data = json.load(f) | |
| key = metadata_file.stem | |
| metadata[key] = data | |
| self.logger.info(f"β Retrieved metadata for {execution_id}") | |
| return metadata | |
| except Exception as e: | |
| self.logger.error(f"β Error retrieving metadata: {str(e)}") | |
| return {} | |
| def cleanup_old_screenshots(self, days: int = 7) -> Tuple[int, int]: | |
| """ | |
| Remove screenshots older than N days | |
| Args: | |
| days: Number of days to keep | |
| Returns: | |
| Tuple of (deleted_files, freed_space_mb) | |
| """ | |
| try: | |
| cutoff = datetime.now() - timedelta(days=days) | |
| deleted_count = 0 | |
| freed_space = 0 | |
| for exec_dir in self.base_dir.iterdir(): | |
| if not exec_dir.is_dir() or exec_dir.name == "metadata": | |
| continue | |
| for screenshot in exec_dir.glob("*.png"): | |
| mtime = datetime.fromtimestamp(screenshot.stat().st_mtime) | |
| if mtime < cutoff: | |
| file_size = screenshot.stat().st_size | |
| screenshot.unlink() | |
| deleted_count += 1 | |
| freed_space += file_size | |
| self.logger.info(f"ποΈ Deleted old screenshot: {screenshot}") | |
| # Remove empty execution directories | |
| if not list(exec_dir.glob("*.png")): | |
| exec_dir.rmdir() | |
| self.logger.info(f"ποΈ Removed empty directory: {exec_dir}") | |
| freed_space_mb = freed_space / (1024 * 1024) | |
| self.logger.info(f"β Cleanup complete: {deleted_count} files deleted, {freed_space_mb:.2f}MB freed") | |
| return deleted_count, freed_space_mb | |
| except Exception as e: | |
| self.logger.error(f"β Error during cleanup: {str(e)}") | |
| return 0, 0 | |
| def get_storage_stats(self) -> Dict[str, any]: | |
| """ | |
| Get storage statistics | |
| Returns: | |
| Dictionary with storage stats | |
| """ | |
| try: | |
| total_size = 0 | |
| total_files = 0 | |
| executions = {} | |
| for exec_dir in self.base_dir.iterdir(): | |
| if not exec_dir.is_dir() or exec_dir.name == "metadata": | |
| continue | |
| exec_size = 0 | |
| exec_files = 0 | |
| for screenshot in exec_dir.glob("*.png"): | |
| file_size = screenshot.stat().st_size | |
| exec_size += file_size | |
| exec_files += 1 | |
| total_size += file_size | |
| total_files += 1 | |
| executions[exec_dir.name] = { | |
| "files": exec_files, | |
| "size_mb": exec_size / (1024 * 1024) | |
| } | |
| return { | |
| "total_files": total_files, | |
| "total_size_mb": total_size / (1024 * 1024), | |
| "executions": executions, | |
| "base_dir": str(self.base_dir) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"β Error getting storage stats: {str(e)}") | |
| return {} | |
| def export_execution(self, execution_id: str, export_path: str) -> bool: | |
| """ | |
| Export all screenshots and metadata for an execution | |
| Args: | |
| execution_id: Execution identifier | |
| export_path: Path to export to | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| export_dir = Path(export_path) / execution_id | |
| export_dir.mkdir(parents=True, exist_ok=True) | |
| # Copy screenshots | |
| exec_dir = self.base_dir / execution_id | |
| if exec_dir.exists(): | |
| for screenshot in exec_dir.glob("*.png"): | |
| shutil.copy2(screenshot, export_dir / screenshot.name) | |
| # Copy metadata | |
| metadata = self.get_execution_metadata(execution_id) | |
| metadata_export = export_dir / "metadata.json" | |
| with open(metadata_export, 'w') as f: | |
| json.dump(metadata, f, indent=2, default=str) | |
| self.logger.info(f"β Exported execution {execution_id} to {export_dir}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"β Error exporting execution: {str(e)}") | |
| return False | |
| def delete_execution(self, execution_id: str) -> bool: | |
| """ | |
| Delete all screenshots and metadata for an execution | |
| Args: | |
| execution_id: Execution identifier | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| exec_dir = self.base_dir / execution_id | |
| if exec_dir.exists(): | |
| shutil.rmtree(exec_dir) | |
| self.logger.info(f"ποΈ Deleted execution directory: {exec_dir}") | |
| # Delete metadata files | |
| pattern = f"{execution_id}_*.json" | |
| for metadata_file in self.metadata_dir.glob(pattern): | |
| metadata_file.unlink() | |
| self.logger.info(f"ποΈ Deleted metadata: {metadata_file}") | |
| self.logger.info(f"β Deleted execution {execution_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"β Error deleting execution: {str(e)}") | |
| return False | |
| def list_executions(self) -> List[Dict[str, any]]: | |
| """ | |
| List all stored executions | |
| Returns: | |
| List of execution info dictionaries | |
| """ | |
| try: | |
| executions = [] | |
| for exec_dir in sorted(self.base_dir.iterdir(), reverse=True): | |
| if not exec_dir.is_dir() or exec_dir.name == "metadata": | |
| continue | |
| screenshots = list(exec_dir.glob("*.png")) | |
| if screenshots: | |
| # Get creation time from directory | |
| mtime = datetime.fromtimestamp(exec_dir.stat().st_mtime) | |
| executions.append({ | |
| "execution_id": exec_dir.name, | |
| "timestamp": mtime.isoformat(), | |
| "screenshot_count": len(screenshots), | |
| "size_mb": sum(f.stat().st_size for f in screenshots) / (1024 * 1024) | |
| }) | |
| return executions | |
| except Exception as e: | |
| self.logger.error(f"β Error listing executions: {str(e)}") | |
| return [] | |
| # Convenience functions | |
| def get_storage_manager(base_dir: str = "data/screenshots") -> ScreenshotStorage: | |
| """Get or create storage manager instance""" | |
| return ScreenshotStorage(base_dir) | |
| def cleanup_storage(base_dir: str = "data/screenshots", days: int = 7): | |
| """Cleanup old screenshots""" | |
| storage = ScreenshotStorage(base_dir) | |
| return storage.cleanup_old_screenshots(days) | |
| def get_storage_stats(base_dir: str = "data/screenshots") -> Dict: | |
| """Get storage statistics""" | |
| storage = ScreenshotStorage(base_dir) | |
| return storage.get_storage_stats() | |