WBS1 / storage_manager.py
Factor Studios
Upload 9 files
ce4253e verified
from pathlib import Path
import json
import shutil
import os
from typing import Dict, Any, Optional
import numpy as np
from datetime import datetime
class StorageManager:
def __init__(self, base_path: Path):
self.base_path = base_path
# Storage paths
self.vram_path = base_path / "vram_blocks"
self.state_path = base_path / "gpu_state"
self.cache_path = base_path / "cache"
# Create directory structure
self._init_storage()
# Component paths
self.component_paths = {
'cores': self.state_path / "cores",
'tensor_cores': self.state_path / "tensor_cores",
'warps': self.state_path / "warps",
'threads': self.state_path / "threads",
'memory': self.state_path / "memory"
}
def _init_storage(self):
"""Initialize storage directory structure"""
# Create main directories
self.vram_path.mkdir(parents=True, exist_ok=True)
self.state_path.mkdir(parents=True, exist_ok=True)
self.cache_path.mkdir(parents=True, exist_ok=True)
# Create subdirectories for different types of state
for component in ['cores', 'tensor_cores', 'warps', 'threads', 'memory']:
(self.state_path / component).mkdir(exist_ok=True)
# Create VRAM block subdirectories
for subdir in ['active', 'archived', 'temp']:
(self.vram_path / subdir).mkdir(exist_ok=True)
def store_vram_block(self, block_id: str, data: np.ndarray, temp: bool = False) -> Path:
"""Store a VRAM block with metadata"""
# Determine storage location
if temp:
target_dir = self.vram_path / "temp"
else:
target_dir = self.vram_path / "active"
file_path = target_dir / f"{block_id}.npz"
metadata_path = target_dir / f"{block_id}.meta.json"
# Save data and metadata
np.savez_compressed(file_path, data=data)
metadata = {
'created': datetime.now().isoformat(),
'shape': data.shape,
'dtype': str(data.dtype),
'size_bytes': data.nbytes
}
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
return file_path
def load_vram_block(self, block_id: str) -> Optional[np.ndarray]:
"""Load a VRAM block with fallback to archived"""
# Check active blocks first
active_path = self.vram_path / "active" / f"{block_id}.npz"
if active_path.exists():
return np.load(active_path)['data']
# Check archived blocks
archived_path = self.vram_path / "archived" / f"{block_id}.npz"
if archived_path.exists():
data = np.load(archived_path)['data']
# Move back to active
self.store_vram_block(block_id, data)
return data
return None
def store_component_state(self, component: str, state_id: str, state: Dict) -> Path:
"""Store component state with versioning"""
component_path = self.component_paths[component]
state_dir = component_path / state_id
state_dir.mkdir(exist_ok=True)
# Create versioned state file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
state_file = state_dir / f"state_{timestamp}.json"
with open(state_file, 'w') as f:
json.dump(state, f)
# Update latest symlink
latest_link = state_dir / "latest.json"
if latest_link.exists():
latest_link.unlink()
latest_link.symlink_to(state_file)
return state_file
def load_component_state(self, component: str, state_id: str, version: str = "latest") -> Optional[Dict]:
"""Load component state with version support"""
component_path = self.component_paths[component]
state_dir = component_path / state_id
if version == "latest":
state_file = state_dir / "latest.json"
else:
state_file = state_dir / f"state_{version}.json"
if state_file.exists():
with open(state_file) as f:
return json.load(f)
return None
def archive_vram_block(self, block_id: str):
"""Move a VRAM block to archive"""
source_path = self.vram_path / "active" / f"{block_id}.npz"
target_path = self.vram_path / "archived" / f"{block_id}.npz"
if source_path.exists():
shutil.move(source_path, target_path)
# Also move metadata
source_meta = self.vram_path / "active" / f"{block_id}.meta.json"
target_meta = self.vram_path / "archived" / f"{block_id}.meta.json"
if source_meta.exists():
shutil.move(source_meta, target_meta)
def cleanup_temp_blocks(self, max_age_hours: int = 24):
"""Clean up old temporary VRAM blocks"""
temp_dir = self.vram_path / "temp"
current_time = datetime.now()
for file_path in temp_dir.glob("*.npz"):
# Check corresponding metadata
meta_path = file_path.with_suffix('.meta.json')
if meta_path.exists():
with open(meta_path) as f:
metadata = json.load(f)
created = datetime.fromisoformat(metadata['created'])
# Remove if too old
if (current_time - created).total_seconds() > max_age_hours * 3600:
file_path.unlink()
meta_path.unlink()
def get_storage_stats(self) -> Dict[str, Any]:
"""Get storage statistics"""
stats = {
'vram': {
'active_blocks': len(list(self.vram_path.glob("active/*.npz"))),
'archived_blocks': len(list(self.vram_path.glob("archived/*.npz"))),
'temp_blocks': len(list(self.vram_path.glob("temp/*.npz"))),
'total_size_bytes': self._get_dir_size(self.vram_path)
},
'state': {
'components': {},
'total_size_bytes': self._get_dir_size(self.state_path)
},
'cache': {
'size_bytes': self._get_dir_size(self.cache_path)
}
}
# Add component-specific stats
for component in self.component_paths:
stats['state']['components'][component] = {
'states': len(list(self.component_paths[component].glob("**/state_*.json"))),
'size_bytes': self._get_dir_size(self.component_paths[component])
}
return stats
def _get_dir_size(self, path: Path) -> int:
"""Calculate total size of a directory"""
return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())
def create_snapshot(self, snapshot_id: str):
"""Create a snapshot of the entire storage"""
snapshot_dir = self.base_path / "snapshots" / snapshot_id
snapshot_dir.mkdir(parents=True)
# Copy current state
shutil.copytree(self.state_path, snapshot_dir / "gpu_state")
shutil.copytree(self.vram_path / "active", snapshot_dir / "vram_blocks")
# Create snapshot metadata
metadata = {
'created': datetime.now().isoformat(),
'stats': self.get_storage_stats()
}
with open(snapshot_dir / "snapshot.json", 'w') as f:
json.dump(metadata, f)
def restore_snapshot(self, snapshot_id: str):
"""Restore from a snapshot"""
snapshot_dir = self.base_path / "snapshots" / snapshot_id
if not snapshot_dir.exists():
raise ValueError(f"Snapshot {snapshot_id} not found")
# Clear current state
shutil.rmtree(self.state_path)
shutil.rmtree(self.vram_path / "active")
# Restore from snapshot
shutil.copytree(snapshot_dir / "gpu_state", self.state_path)
shutil.copytree(snapshot_dir / "vram_blocks", self.vram_path / "active")