Spaces:
Runtime error
Runtime error
| import json | |
| import numpy as np | |
| from typing import Dict, Any, Optional, Union | |
| import threading | |
| import time | |
| import hashlib | |
| import logging | |
| import os | |
| import shutil | |
| import uuid | |
| from pathlib import Path | |
| class LocalStorage: | |
| """ | |
| Local storage implementation for GPU memory management. | |
| Maintains backward compatibility with previous storage interfaces. | |
| """ | |
| # Singleton instance | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls, storage_path: str = "storage"): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| # Convert to absolute path if relative | |
| if not os.path.isabs(storage_path): | |
| storage_path = os.path.abspath(storage_path) | |
| cls._instance._init_singleton(storage_path) | |
| return cls._instance | |
| def _init_singleton(self, storage_path: str): | |
| """Initialize the singleton instance with local storage""" | |
| if hasattr(self, 'initialized'): | |
| return | |
| # Setup storage paths | |
| self.base_path = Path(storage_path) | |
| self.vram_path = self.base_path / "vram_blocks" | |
| self.models_path = self.base_path / "models" | |
| self.cache_path = self.base_path / "cache" | |
| self.state_path = self.base_path / "states" | |
| # Create directories | |
| for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]: | |
| path.mkdir(parents=True, exist_ok=True) | |
| # Basic state management | |
| self.lock = threading.Lock() | |
| self._closing = False | |
| self._connected = True | |
| # Resource monitoring | |
| self.resource_monitor = { | |
| 'vram_used': 0, | |
| 'active_tensors': 0, | |
| 'loaded_models': set(), | |
| 'last_updated': time.time() | |
| } | |
| # Storage statistics | |
| self.stats = { | |
| 'total_size': 0, | |
| 'available_size': float('inf'), | |
| 'model_count': 0, | |
| 'tensor_count': 0 | |
| } | |
| # Initialize registries | |
| self.model_registry = {} | |
| self.tensor_registry = {} | |
| self.initialized = True | |
| self._connected = True | |
| # Initialize monitoring | |
| self.resource_monitor = { | |
| 'vram_used': 0, | |
| 'active_tensors': 0, | |
| 'loaded_models': set(), | |
| 'last_updated': time.time() | |
| } | |
| # Initialize registries | |
| self.model_registry = {} | |
| self.initialized = True | |
| def is_connected(self) -> bool: | |
| """Check if storage is connected (always True for local storage)""" | |
| return self._connected and not self._closing and self.ping() | |
| def close(self): | |
| """Close storage connection""" | |
| self._closing = True | |
| self._connected = False | |
| # Initialize resource monitoring | |
| self.resource_monitor = { | |
| 'vram_used': 0, | |
| 'active_tensors': 0, | |
| 'loaded_models': set(), | |
| 'last_updated': time.time() | |
| } | |
| # Initialize model registry and connection state | |
| self.model_registry = {} | |
| self._connected = True | |
| self.model_registry = {} | |
| self._connected = True | |
| def is_model_loaded(self, model_id: str) -> bool: | |
| """Check if a model is loaded in local storage""" | |
| if not model_id: | |
| return False | |
| # Check if model directory exists | |
| model_dir = self.models_path / model_id.replace('/', '_') | |
| if not model_dir.exists(): | |
| return False | |
| # Check for model files | |
| model_file = model_dir / "model.bin" | |
| config_file = model_dir / "config.json" | |
| return model_file.exists() and config_file.exists() | |
| self._connected = True # Local storage is always "connected" | |
| def wait_for_connection(self, timeout: float = 30.0) -> bool: | |
| """ | |
| Simulates connection wait for compatibility with previous interfaces. | |
| Always returns True for local storage since no connection is needed. | |
| """ | |
| return self.ping() | |
| def __init__(self, storage_path: str = "storage"): | |
| """This will actually just return the singleton instance. | |
| The actual initialization happens in __new__ and _init_singleton""" | |
| pass | |
| def _check_storage_ready(self) -> bool: | |
| """Check if local storage is ready for use""" | |
| try: | |
| # Verify all required directories exist and are accessible | |
| for path in [self.vram_path, self.models_path, self.cache_path, self.state_path]: | |
| if not path.exists() or not os.access(str(path), os.R_OK | os.W_OK): | |
| return False | |
| # Update storage statistics | |
| self.stats.update({ | |
| 'total_size': sum(f.stat().st_size for f in self.base_path.rglob('*') if f.is_file()), | |
| 'model_count': len(list(self.models_path.glob('*'))), | |
| 'tensor_count': len(list(self.vram_path.glob('*.npy'))) | |
| }) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Storage check failed: {e}") | |
| return False | |
| def _check_storage(self) -> Dict[str, Any]: | |
| """Check local storage status and usage""" | |
| try: | |
| # Update storage monitoring | |
| self.storage_monitor.update({ | |
| 'total_size': sum(f.stat().st_size for f in self.base_path.rglob('*') if f.is_file()), | |
| 'last_access': time.time(), | |
| 'disk_usage': os.path.getsize(str(self.base_path)) if os.path.exists(str(self.base_path)) else 0 | |
| }) | |
| return {"status": "ok", "monitor": self.storage_monitor} | |
| except Exception as e: | |
| logging.error(f"Error checking storage: {e}") | |
| return {"status": "error", "message": str(e)} | |
| def store_tensor(self, tensor_id: str, data: np.ndarray, model_size: Optional[int] = None) -> bool: | |
| """Store tensor data in local storage""" | |
| try: | |
| if data is None: | |
| raise ValueError("Cannot store None tensor") | |
| # Calculate tensor metadata | |
| tensor_shape = data.shape | |
| tensor_dtype = str(data.dtype) | |
| tensor_size = data.nbytes | |
| # Save tensor data | |
| tensor_path = self.vram_path / f"{tensor_id}.npy" | |
| np.save(str(tensor_path), data) | |
| # Save metadata | |
| metadata = { | |
| 'shape': tensor_shape, | |
| 'dtype': tensor_dtype, | |
| 'size': tensor_size, | |
| 'timestamp': time.time(), | |
| 'model_size': model_size if model_size is not None else -1 | |
| } | |
| metadata_path = self.vram_path / f"{tensor_id}_meta.json" | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f) | |
| # Update tensor registry | |
| with self.lock: | |
| self.tensor_registry[tensor_id] = metadata | |
| self.resource_monitor['vram_used'] += tensor_size | |
| self.resource_monitor['active_tensors'] += 1 | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error storing tensor {tensor_id}: {str(e)}") | |
| return False | |
| def load_tensor(self, tensor_id: str) -> Optional[np.ndarray]: | |
| """Load tensor data from local storage""" | |
| try: | |
| tensor_path = self.vram_path / f"{tensor_id}.npy" | |
| metadata_path = self.vram_path / f"{tensor_id}_meta.json" | |
| # Check if tensor files exist | |
| if not tensor_path.exists() or not metadata_path.exists(): | |
| logging.warning(f"Tensor {tensor_id} not found in local storage") | |
| return None | |
| # Load metadata | |
| with open(metadata_path, 'r') as f: | |
| metadata = json.load(f) | |
| # Load tensor data | |
| arr = np.load(str(tensor_path)) | |
| # Update registry if not present | |
| if tensor_id not in self.tensor_registry: | |
| with self.lock: | |
| self.tensor_registry[tensor_id] = metadata | |
| return arr | |
| except Exception as e: | |
| logging.error(f"Error loading tensor {tensor_id}: {str(e)}") | |
| return None | |
| def store_state(self, component: str, state_id: str, state_data: Dict[str, Any]) -> bool: | |
| """Store component state in local storage""" | |
| try: | |
| # Create component directory if needed | |
| component_dir = self.state_path / component | |
| component_dir.mkdir(parents=True, exist_ok=True) | |
| # Save state data with timestamp | |
| state_file = component_dir / f"{state_id}.json" | |
| data_to_save = { | |
| "data": state_data, | |
| "timestamp": time.time() | |
| } | |
| with open(state_file, 'w') as f: | |
| json.dump(data_to_save, f, indent=2) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error storing state for {component}/{state_id}: {str(e)}") | |
| return False | |
| def load_state(self, component: str, state_id: str) -> Optional[Dict[str, Any]]: | |
| """Load component state from local storage""" | |
| try: | |
| state_file = self.state_path / component / f"{state_id}.json" | |
| if not state_file.exists(): | |
| logging.warning(f"State file not found for {component}/{state_id}") | |
| return None | |
| with open(state_file, 'r') as f: | |
| saved_data = json.load(f) | |
| return saved_data.get('data') | |
| except Exception as e: | |
| logging.error(f"Error loading state for {component}/{state_id}: {str(e)}") | |
| return None | |
| def cache_data(self, key: str, data: Any) -> bool: | |
| """Cache data via HTTP API""" | |
| try: | |
| request_data = {"data": data} | |
| response = self._make_request( | |
| 'POST', | |
| f'/cache/{key}', | |
| json=request_data | |
| ) | |
| return response and response.get('status') == 'success' | |
| except Exception as e: | |
| logging.error(f"Error caching data for key {key}: {str(e)}") | |
| return False | |
| def get_cached_data(self, key: str) -> Optional[Any]: | |
| """Get cached data via HTTP API""" | |
| try: | |
| response = self._make_request("GET", f"/cache/{key}") | |
| if response and response.get('status') == 'success': | |
| return response.get('data') | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error getting cached data for key {key}: {str(e)}") | |
| return None | |
| def load_model(self, model_name: str, model_path: Optional[str] = None, model_data: Optional[Dict] = None) -> bool: | |
| """Load a model from local storage""" | |
| try: | |
| # Check if model is already loaded | |
| if self.is_model_loaded(model_name): | |
| logging.info(f"Model {model_name} already loaded") | |
| return True | |
| # Generate model directory path | |
| model_dir = self.models_path / model_name.replace('/', '_') | |
| model_dir.mkdir(parents=True, exist_ok=True) | |
| # Clean up any existing files | |
| for existing_file in model_dir.glob('*'): | |
| try: | |
| if existing_file.is_file(): | |
| existing_file.unlink() | |
| except Exception as e: | |
| logging.warning(f"Could not remove existing file {existing_file}: {e}") | |
| # Save model data if provided | |
| if model_data: | |
| model_config_path = model_dir / "config.json" | |
| with open(model_config_path, 'w') as f: | |
| json.dump(model_data, f, indent=2) | |
| # Update model registry | |
| with self.lock: | |
| self.model_registry[model_name] = { | |
| 'path': str(model_dir), | |
| 'config': model_data, | |
| 'loaded_at': time.time(), | |
| 'hash': self._calculate_model_hash(model_path) if model_path else None | |
| } | |
| self.resource_monitor['loaded_models'].add(model_name) | |
| # Copy model files if path provided | |
| if model_path and os.path.exists(model_path): | |
| model_file_path = model_dir / "model.bin" | |
| shutil.copy2(model_path, model_file_path) | |
| logging.info(f"Successfully loaded model {model_name} to local storage") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error loading model {model_name}: {str(e)}") | |
| return False | |
| def _calculate_model_hash(self, model_path: str) -> str: | |
| """Calculate SHA256 hash of model file""" | |
| try: | |
| sha256_hash = hashlib.sha256() | |
| with open(model_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| except Exception as e: | |
| logging.error(f"Error calculating model hash: {str(e)}") | |
| return "" | |
| def ping(self) -> bool: | |
| """Check if local storage is accessible""" | |
| if self._closing: | |
| return False | |
| return self._check_storage_ready() | |
| # Compatibility aliases for existing code | |
| HTTPGPUStorage = LocalStorage | |
| WebSocketGPUStorage = LocalStorage | |