from pathlib import Path import json import shutil import os from typing import Dict, Any, Optional import numpy as np from datetime import datetime class StorageManager: def __init__(self, base_path: Path): self.base_path = base_path # Storage paths self.vram_path = base_path / "vram_blocks" self.state_path = base_path / "gpu_state" self.cache_path = base_path / "cache" # Create directory structure self._init_storage() # Component paths self.component_paths = { 'cores': self.state_path / "cores", 'tensor_cores': self.state_path / "tensor_cores", 'warps': self.state_path / "warps", 'threads': self.state_path / "threads", 'memory': self.state_path / "memory" } def _init_storage(self): """Initialize storage directory structure with unlimited capacity support""" # Create main directories with unlimited storage capacity self.vram_path.mkdir(parents=True, exist_ok=True) self.state_path.mkdir(parents=True, exist_ok=True) self.cache_path.mkdir(parents=True, exist_ok=True) # Create subdirectories for different types of state for component in ['cores', 'tensor_cores', 'warps', 'threads', 'memory']: (self.state_path / component).mkdir(exist_ok=True) # Create VRAM block subdirectories with unlimited capacity for subdir in ['active', 'archived', 'temp']: (self.vram_path / subdir).mkdir(exist_ok=True) # Initialize storage configuration for unlimited capacity self.storage_config = { "unlimited_storage": True, "dynamic_scaling": True, "auto_compress": False # Disabled for maximum performance } def store_vram_block(self, block_id: str, data: np.ndarray, temp: bool = False) -> Path: """Store a VRAM block with metadata""" # Determine storage location if temp: target_dir = self.vram_path / "temp" else: target_dir = self.vram_path / "active" file_path = target_dir / f"{block_id}.npz" metadata_path = target_dir / f"{block_id}.meta.json" # Save data and metadata np.savez_compressed(file_path, data=data) metadata = { 'created': datetime.now().isoformat(), 'shape': data.shape, 'dtype': str(data.dtype), 'size_bytes': data.nbytes } with open(metadata_path, 'w') as f: json.dump(metadata, f) return file_path def load_vram_block(self, block_id: str) -> Optional[np.ndarray]: """Load a VRAM block with fallback to archived""" # Check active blocks first active_path = self.vram_path / "active" / f"{block_id}.npz" if active_path.exists(): return np.load(active_path)['data'] # Check archived blocks archived_path = self.vram_path / "archived" / f"{block_id}.npz" if archived_path.exists(): data = np.load(archived_path)['data'] # Move back to active self.store_vram_block(block_id, data) return data return None def store_component_state(self, component: str, state_id: str, state: Dict) -> Path: """Store component state with versioning""" component_path = self.component_paths[component] state_dir = component_path / state_id state_dir.mkdir(exist_ok=True) # Create versioned state file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") state_file = state_dir / f"state_{timestamp}.json" with open(state_file, 'w') as f: json.dump(state, f) # Update latest symlink latest_link = state_dir / "latest.json" if latest_link.exists(): latest_link.unlink() latest_link.symlink_to(state_file) return state_file def load_component_state(self, component: str, state_id: str, version: str = "latest") -> Optional[Dict]: """Load component state with version support""" component_path = self.component_paths[component] state_dir = component_path / state_id if version == "latest": state_file = state_dir / "latest.json" else: state_file = state_dir / f"state_{version}.json" if state_file.exists(): with open(state_file) as f: return json.load(f) return None def archive_vram_block(self, block_id: str): """Move a VRAM block to archive""" source_path = self.vram_path / "active" / f"{block_id}.npz" target_path = self.vram_path / "archived" / f"{block_id}.npz" if source_path.exists(): shutil.move(source_path, target_path) # Also move metadata source_meta = self.vram_path / "active" / f"{block_id}.meta.json" target_meta = self.vram_path / "archived" / f"{block_id}.meta.json" if source_meta.exists(): shutil.move(source_meta, target_meta) def cleanup_temp_blocks(self, max_age_hours: int = 24): """Clean up old temporary VRAM blocks""" temp_dir = self.vram_path / "temp" current_time = datetime.now() for file_path in temp_dir.glob("*.npz"): # Check corresponding metadata meta_path = file_path.with_suffix('.meta.json') if meta_path.exists(): with open(meta_path) as f: metadata = json.load(f) created = datetime.fromisoformat(metadata['created']) # Remove if too old if (current_time - created).total_seconds() > max_age_hours * 3600: file_path.unlink() meta_path.unlink() def get_storage_stats(self) -> Dict[str, Any]: """Get storage statistics""" stats = { 'vram': { 'active_blocks': len(list(self.vram_path.glob("active/*.npz"))), 'archived_blocks': len(list(self.vram_path.glob("archived/*.npz"))), 'temp_blocks': len(list(self.vram_path.glob("temp/*.npz"))), 'total_size_bytes': self._get_dir_size(self.vram_path) }, 'state': { 'components': {}, 'total_size_bytes': self._get_dir_size(self.state_path) }, 'cache': { 'size_bytes': self._get_dir_size(self.cache_path) } } # Add component-specific stats for component in self.component_paths: stats['state']['components'][component] = { 'states': len(list(self.component_paths[component].glob("**/state_*.json"))), 'size_bytes': self._get_dir_size(self.component_paths[component]) } return stats def _get_dir_size(self, path: Path) -> int: """Calculate total size of a directory""" return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file()) def create_snapshot(self, snapshot_id: str): """Create a snapshot of the entire storage""" snapshot_dir = self.base_path / "snapshots" / snapshot_id snapshot_dir.mkdir(parents=True) # Copy current state shutil.copytree(self.state_path, snapshot_dir / "gpu_state") shutil.copytree(self.vram_path / "active", snapshot_dir / "vram_blocks") # Create snapshot metadata metadata = { 'created': datetime.now().isoformat(), 'stats': self.get_storage_stats() } with open(snapshot_dir / "snapshot.json", 'w') as f: json.dump(metadata, f) def restore_snapshot(self, snapshot_id: str): """Restore from a snapshot""" snapshot_dir = self.base_path / "snapshots" / snapshot_id if not snapshot_dir.exists(): raise ValueError(f"Snapshot {snapshot_id} not found") # Clear current state shutil.rmtree(self.state_path) shutil.rmtree(self.vram_path / "active") # Restore from snapshot shutil.copytree(snapshot_dir / "gpu_state", self.state_path) shutil.copytree(snapshot_dir / "vram_blocks", self.vram_path / "active")