Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import json | |
| import shutil | |
| import os | |
| from typing import Dict, Any, Optional | |
| import numpy as np | |
| from datetime import datetime | |
| class StorageManager: | |
| def __init__(self, base_path: Path): | |
| self.base_path = base_path | |
| # Storage paths | |
| self.vram_path = base_path / "vram_blocks" | |
| self.state_path = base_path / "gpu_state" | |
| self.cache_path = base_path / "cache" | |
| # Create directory structure | |
| self._init_storage() | |
| # Component paths | |
| self.component_paths = { | |
| 'cores': self.state_path / "cores", | |
| 'tensor_cores': self.state_path / "tensor_cores", | |
| 'warps': self.state_path / "warps", | |
| 'threads': self.state_path / "threads", | |
| 'memory': self.state_path / "memory" | |
| } | |
| def _init_storage(self): | |
| """Initialize storage directory structure""" | |
| # Create main directories | |
| self.vram_path.mkdir(parents=True, exist_ok=True) | |
| self.state_path.mkdir(parents=True, exist_ok=True) | |
| self.cache_path.mkdir(parents=True, exist_ok=True) | |
| # Create subdirectories for different types of state | |
| for component in ['cores', 'tensor_cores', 'warps', 'threads', 'memory']: | |
| (self.state_path / component).mkdir(exist_ok=True) | |
| # Create VRAM block subdirectories | |
| for subdir in ['active', 'archived', 'temp']: | |
| (self.vram_path / subdir).mkdir(exist_ok=True) | |
| def store_vram_block(self, block_id: str, data: np.ndarray, temp: bool = False) -> Path: | |
| """Store a VRAM block with metadata""" | |
| # Determine storage location | |
| if temp: | |
| target_dir = self.vram_path / "temp" | |
| else: | |
| target_dir = self.vram_path / "active" | |
| file_path = target_dir / f"{block_id}.npz" | |
| metadata_path = target_dir / f"{block_id}.meta.json" | |
| # Save data and metadata | |
| np.savez_compressed(file_path, data=data) | |
| metadata = { | |
| 'created': datetime.now().isoformat(), | |
| 'shape': data.shape, | |
| 'dtype': str(data.dtype), | |
| 'size_bytes': data.nbytes | |
| } | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f) | |
| return file_path | |
| def load_vram_block(self, block_id: str) -> Optional[np.ndarray]: | |
| """Load a VRAM block with fallback to archived""" | |
| # Check active blocks first | |
| active_path = self.vram_path / "active" / f"{block_id}.npz" | |
| if active_path.exists(): | |
| return np.load(active_path)['data'] | |
| # Check archived blocks | |
| archived_path = self.vram_path / "archived" / f"{block_id}.npz" | |
| if archived_path.exists(): | |
| data = np.load(archived_path)['data'] | |
| # Move back to active | |
| self.store_vram_block(block_id, data) | |
| return data | |
| return None | |
| def store_component_state(self, component: str, state_id: str, state: Dict) -> Path: | |
| """Store component state with versioning""" | |
| component_path = self.component_paths[component] | |
| state_dir = component_path / state_id | |
| state_dir.mkdir(exist_ok=True) | |
| # Create versioned state file | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| state_file = state_dir / f"state_{timestamp}.json" | |
| with open(state_file, 'w') as f: | |
| json.dump(state, f) | |
| # Update latest symlink | |
| latest_link = state_dir / "latest.json" | |
| if latest_link.exists(): | |
| latest_link.unlink() | |
| latest_link.symlink_to(state_file) | |
| return state_file | |
| def load_component_state(self, component: str, state_id: str, version: str = "latest") -> Optional[Dict]: | |
| """Load component state with version support""" | |
| component_path = self.component_paths[component] | |
| state_dir = component_path / state_id | |
| if version == "latest": | |
| state_file = state_dir / "latest.json" | |
| else: | |
| state_file = state_dir / f"state_{version}.json" | |
| if state_file.exists(): | |
| with open(state_file) as f: | |
| return json.load(f) | |
| return None | |
| def archive_vram_block(self, block_id: str): | |
| """Move a VRAM block to archive""" | |
| source_path = self.vram_path / "active" / f"{block_id}.npz" | |
| target_path = self.vram_path / "archived" / f"{block_id}.npz" | |
| if source_path.exists(): | |
| shutil.move(source_path, target_path) | |
| # Also move metadata | |
| source_meta = self.vram_path / "active" / f"{block_id}.meta.json" | |
| target_meta = self.vram_path / "archived" / f"{block_id}.meta.json" | |
| if source_meta.exists(): | |
| shutil.move(source_meta, target_meta) | |
| def cleanup_temp_blocks(self, max_age_hours: int = 24): | |
| """Clean up old temporary VRAM blocks""" | |
| temp_dir = self.vram_path / "temp" | |
| current_time = datetime.now() | |
| for file_path in temp_dir.glob("*.npz"): | |
| # Check corresponding metadata | |
| meta_path = file_path.with_suffix('.meta.json') | |
| if meta_path.exists(): | |
| with open(meta_path) as f: | |
| metadata = json.load(f) | |
| created = datetime.fromisoformat(metadata['created']) | |
| # Remove if too old | |
| if (current_time - created).total_seconds() > max_age_hours * 3600: | |
| file_path.unlink() | |
| meta_path.unlink() | |
| def get_storage_stats(self) -> Dict[str, Any]: | |
| """Get storage statistics""" | |
| stats = { | |
| 'vram': { | |
| 'active_blocks': len(list(self.vram_path.glob("active/*.npz"))), | |
| 'archived_blocks': len(list(self.vram_path.glob("archived/*.npz"))), | |
| 'temp_blocks': len(list(self.vram_path.glob("temp/*.npz"))), | |
| 'total_size_bytes': self._get_dir_size(self.vram_path) | |
| }, | |
| 'state': { | |
| 'components': {}, | |
| 'total_size_bytes': self._get_dir_size(self.state_path) | |
| }, | |
| 'cache': { | |
| 'size_bytes': self._get_dir_size(self.cache_path) | |
| } | |
| } | |
| # Add component-specific stats | |
| for component in self.component_paths: | |
| stats['state']['components'][component] = { | |
| 'states': len(list(self.component_paths[component].glob("**/state_*.json"))), | |
| 'size_bytes': self._get_dir_size(self.component_paths[component]) | |
| } | |
| return stats | |
| def _get_dir_size(self, path: Path) -> int: | |
| """Calculate total size of a directory""" | |
| return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file()) | |
| def create_snapshot(self, snapshot_id: str): | |
| """Create a snapshot of the entire storage""" | |
| snapshot_dir = self.base_path / "snapshots" / snapshot_id | |
| snapshot_dir.mkdir(parents=True) | |
| # Copy current state | |
| shutil.copytree(self.state_path, snapshot_dir / "gpu_state") | |
| shutil.copytree(self.vram_path / "active", snapshot_dir / "vram_blocks") | |
| # Create snapshot metadata | |
| metadata = { | |
| 'created': datetime.now().isoformat(), | |
| 'stats': self.get_storage_stats() | |
| } | |
| with open(snapshot_dir / "snapshot.json", 'w') as f: | |
| json.dump(metadata, f) | |
| def restore_snapshot(self, snapshot_id: str): | |
| """Restore from a snapshot""" | |
| snapshot_dir = self.base_path / "snapshots" / snapshot_id | |
| if not snapshot_dir.exists(): | |
| raise ValueError(f"Snapshot {snapshot_id} not found") | |
| # Clear current state | |
| shutil.rmtree(self.state_path) | |
| shutil.rmtree(self.vram_path / "active") | |
| # Restore from snapshot | |
| shutil.copytree(snapshot_dir / "gpu_state", self.state_path) | |
| shutil.copytree(snapshot_dir / "vram_blocks", self.vram_path / "active") | |