import json import numpy as np from typing import Dict, Any, Optional, Union import threading import time import hashlib import logging import os import shutil import uuid from pathlib import Path class LocalStorage: """ Local storage implementation for GPU memory management. Maintains backward compatibility with previous storage interfaces. """ # Singleton instance _instance = None _lock = threading.Lock() def __new__(cls, storage_path: str = "storage"): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) # Convert to absolute path if relative if not os.path.isabs(storage_path): storage_path = os.path.abspath(storage_path) cls._instance._init_singleton(storage_path) return cls._instance def _init_singleton(self, storage_path: str): """Initialize the singleton instance with local storage""" if hasattr(self, 'initialized'): return # Setup storage paths self.base_path = Path(storage_path) self.vram_path = self.base_path / "vram_blocks" self.models_path = self.base_path / "models" self.cache_path = self.base_path / "cache" self.state_path = self.base_path / "states" # Create directories for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]: path.mkdir(parents=True, exist_ok=True) # Basic state management self.lock = threading.Lock() self._closing = False self._connected = True # Resource monitoring self.resource_monitor = { 'vram_used': 0, 'active_tensors': 0, 'loaded_models': set(), 'last_updated': time.time() } # Storage statistics self.stats = { 'total_size': 0, 'available_size': float('inf'), 'model_count': 0, 'tensor_count': 0 } # Initialize registries self.model_registry = {} self.tensor_registry = {} self.initialized = True self._connected = True # Initialize monitoring self.resource_monitor = { 'vram_used': 0, 'active_tensors': 0, 'loaded_models': set(), 'last_updated': time.time() } # Initialize registries self.model_registry = {} self.initialized = True def is_connected(self) -> bool: """Check if storage is connected (always True for local storage)""" return self._connected and not self._closing and self.ping() def close(self): """Close storage connection""" self._closing = True self._connected = False # Initialize resource monitoring self.resource_monitor = { 'vram_used': 0, 'active_tensors': 0, 'loaded_models': set(), 'last_updated': time.time() } # Initialize model registry and connection state self.model_registry = {} self._connected = True self.model_registry = {} self._connected = True def is_model_loaded(self, model_id: str) -> bool: """Check if a model is loaded in local storage""" if not model_id: return False # Check if model directory exists model_dir = self.models_path / model_id.replace('/', '_') if not model_dir.exists(): return False # Check for model files model_file = model_dir / "model.bin" config_file = model_dir / "config.json" return model_file.exists() and config_file.exists() self._connected = True # Local storage is always "connected" def wait_for_connection(self, timeout: float = 30.0) -> bool: """ Simulates connection wait for compatibility with previous interfaces. Always returns True for local storage since no connection is needed. """ return self.ping() def __init__(self, storage_path: str = "storage"): """This will actually just return the singleton instance. The actual initialization happens in __new__ and _init_singleton""" pass def _check_storage_ready(self) -> bool: """Check if local storage is ready for use""" try: # Verify all required directories exist and are accessible for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]: if not path.exists() or not os.access(str(path), os.R_OK | os.W_OK): return False # Update storage statistics self.stats.update({ 'total_size': sum(f.stat().st_size for f in self.base_path.rglob('*') if f.is_file()), 'model_count': len(list(self.models_path.glob('*'))), 'tensor_count': len(list(self.vram_path.glob('*.npy'))) }) return True except Exception as e: logging.error(f"Storage check failed: {e}") return False def _check_storage(self) -> Dict[str, Any]: """Check local storage status and usage""" try: # Update storage monitoring self.storage_monitor.update({ 'total_size': sum(f.stat().st_size for f in self.base_path.rglob('*') if f.is_file()), 'last_access': time.time(), 'disk_usage': os.path.getsize(str(self.base_path)) if os.path.exists(str(self.base_path)) else 0 }) return {"status": "ok", "monitor": self.storage_monitor} except Exception as e: logging.error(f"Error checking storage: {e}") return {"status": "error", "message": str(e)} def store_tensor(self, tensor_id: str, data: np.ndarray, model_size: Optional[int] = None) -> bool: """Store tensor data in local storage""" try: if data is None: raise ValueError("Cannot store None tensor") # Calculate tensor metadata tensor_shape = data.shape tensor_dtype = str(data.dtype) tensor_size = data.nbytes # Save tensor data tensor_path = self.vram_path / f"{tensor_id}.npy" np.save(str(tensor_path), data) # Save metadata metadata = { 'shape': tensor_shape, 'dtype': tensor_dtype, 'size': tensor_size, 'timestamp': time.time(), 'model_size': model_size if model_size is not None else -1 } metadata_path = self.vram_path / f"{tensor_id}_meta.json" with open(metadata_path, 'w') as f: json.dump(metadata, f) # Update tensor registry with self.lock: self.tensor_registry[tensor_id] = metadata self.resource_monitor['vram_used'] += tensor_size self.resource_monitor['active_tensors'] += 1 return True except Exception as e: logging.error(f"Error storing tensor {tensor_id}: {str(e)}") return False def load_tensor(self, tensor_id: str) -> Optional[np.ndarray]: """Load tensor data from local storage""" try: tensor_path = self.vram_path / f"{tensor_id}.npy" metadata_path = self.vram_path / f"{tensor_id}_meta.json" # Check if tensor files exist if not tensor_path.exists() or not metadata_path.exists(): logging.warning(f"Tensor {tensor_id} not found in local storage") return None # Load metadata with open(metadata_path, 'r') as f: metadata = json.load(f) # Load tensor data arr = np.load(str(tensor_path)) # Update registry if not present if tensor_id not in self.tensor_registry: with self.lock: self.tensor_registry[tensor_id] = metadata return arr except Exception as e: logging.error(f"Error loading tensor {tensor_id}: {str(e)}") return None def store_state(self, component: str, state_id: str, state_data: Dict[str, Any]) -> bool: """Store component state in local storage""" try: # Create component directory if needed component_dir = self.state_path / component component_dir.mkdir(parents=True, exist_ok=True) # Save state data with timestamp state_file = component_dir / f"{state_id}.json" data_to_save = { "data": state_data, "timestamp": time.time() } with open(state_file, 'w') as f: json.dump(data_to_save, f, indent=2) return True except Exception as e: logging.error(f"Error storing state for {component}/{state_id}: {str(e)}") return False def load_state(self, component: str, state_id: str) -> Optional[Dict[str, Any]]: """Load component state from local storage""" try: state_file = self.state_path / component / f"{state_id}.json" if not state_file.exists(): logging.warning(f"State file not found for {component}/{state_id}") return None with open(state_file, 'r') as f: saved_data = json.load(f) return saved_data.get('data') except Exception as e: logging.error(f"Error loading state for {component}/{state_id}: {str(e)}") return None def cache_data(self, key: str, data: Any) -> bool: """Cache data via HTTP API""" try: request_data = {"data": data} response = self._make_request( 'POST', f'/cache/{key}', json=request_data ) return response and response.get('status') == 'success' except Exception as e: logging.error(f"Error caching data for key {key}: {str(e)}") return False def get_cached_data(self, key: str) -> Optional[Any]: """Get cached data via HTTP API""" try: response = self._make_request("GET", f"/cache/{key}") if response and response.get('status') == 'success': return response.get('data') return None except Exception as e: logging.error(f"Error getting cached data for key {key}: {str(e)}") return None def load_model(self, model_name: str, model_path: Optional[str] = None, model_data: Optional[Dict] = None) -> bool: """Load a model from local storage""" try: # Check if model is already loaded if self.is_model_loaded(model_name): logging.info(f"Model {model_name} already loaded") return True # Generate model directory path model_dir = self.models_path / model_name.replace('/', '_') model_dir.mkdir(parents=True, exist_ok=True) # Clean up any existing files for existing_file in model_dir.glob('*'): try: if existing_file.is_file(): existing_file.unlink() except Exception as e: logging.warning(f"Could not remove existing file {existing_file}: {e}") # Save model data if provided if model_data: model_config_path = model_dir / "config.json" with open(model_config_path, 'w') as f: json.dump(model_data, f, indent=2) # Update model registry with self.lock: self.model_registry[model_name] = { 'path': str(model_dir), 'config': model_data, 'loaded_at': time.time(), 'hash': self._calculate_model_hash(model_path) if model_path else None } self.resource_monitor['loaded_models'].add(model_name) # Copy model files if path provided if model_path and os.path.exists(model_path): model_file_path = model_dir / "model.bin" shutil.copy2(model_path, model_file_path) logging.info(f"Successfully loaded model {model_name} to local storage") return True except Exception as e: logging.error(f"Error loading model {model_name}: {str(e)}") return False def _calculate_model_hash(self, model_path: str) -> str: """Calculate SHA256 hash of model file""" try: sha256_hash = hashlib.sha256() with open(model_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() except Exception as e: logging.error(f"Error calculating model hash: {str(e)}") return "" def ping(self) -> bool: """Check if local storage is accessible""" if self._closing: return False return self._check_storage_ready() # Compatibility aliases for existing code HTTPGPUStorage = LocalStorage WebSocketGPUStorage = LocalStorage