|
|
import json
|
|
|
import numpy as np
|
|
|
from typing import Dict, Any, Optional, Union
|
|
|
import threading
|
|
|
import time
|
|
|
import hashlib
|
|
|
import logging
|
|
|
import uuid
|
|
|
import duckdb
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
from huggingface_hub import HfApi, HfFileSystem
|
|
|
from config import get_hf_token_cached
|
|
|
|
|
|
|
|
|
|
|
|
class LocalStorage:
|
|
|
"""
|
|
|
Remote storage implementation using DuckDB and HuggingFace.
|
|
|
Provides efficient distributed storage and querying capabilities.
|
|
|
No local filesystem dependencies.
|
|
|
"""
|
|
|
|
|
|
|
|
|
_instance = None
|
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def __new__(cls, db_url: str = "hf://datasets/Fred808/helium/storage.json"):
|
|
|
with cls._lock:
|
|
|
if cls._instance is None:
|
|
|
cls._instance = super().__new__(cls)
|
|
|
cls._instance._init_singleton(db_url)
|
|
|
return cls._instance
|
|
|
|
|
|
def _init_singleton(self, db_url: str):
|
|
|
if hasattr(self, 'initialized'):
|
|
|
return
|
|
|
|
|
|
|
|
|
self.storage_id = hashlib.md5(db_url.encode()).hexdigest()[:8]
|
|
|
|
|
|
|
|
|
self.db_url = db_url
|
|
|
if db_url.startswith('hf://'):
|
|
|
|
|
|
|
|
|
_, _, owner, dataset, db_file = db_url.split('/', 4)
|
|
|
db_path = f"s3://datasets-cached/{owner}/{dataset}/{db_file}"
|
|
|
print(f"Connecting to database at: {db_path}")
|
|
|
|
|
|
|
|
|
self.hf_token = get_hf_token_cached()
|
|
|
|
|
|
self.conn = duckdb.connect(db_path)
|
|
|
self.conn.execute("""
|
|
|
INSTALL httpfs;
|
|
|
LOAD httpfs;
|
|
|
SET s3_endpoint='hf.co';
|
|
|
SET s3_use_ssl=true;
|
|
|
SET s3_url_style='path';
|
|
|
""")
|
|
|
|
|
|
self.conn.execute(f"SET s3_access_key_id='{self.hf_token}';")
|
|
|
self.conn.execute(f"SET s3_secret_access_key='{self.hf_token}';")
|
|
|
self.dataset_path = db_path
|
|
|
else:
|
|
|
|
|
|
print(f"Connecting to database at: {db_url}")
|
|
|
self.conn = duckdb.connect(db_url)
|
|
|
|
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
self._closing = False
|
|
|
|
|
|
|
|
|
self.resource_monitor = {
|
|
|
'vram_used': 0,
|
|
|
'active_tensors': 0,
|
|
|
'loaded_models': set(),
|
|
|
'last_updated': time.time()
|
|
|
}
|
|
|
|
|
|
|
|
|
self.stats = {
|
|
|
'total_size': 0,
|
|
|
'available_size': float('inf'),
|
|
|
'model_count': 0,
|
|
|
'tensor_count': 0
|
|
|
}
|
|
|
|
|
|
|
|
|
self._init_database()
|
|
|
|
|
|
|
|
|
self.model_registry = {}
|
|
|
self.tensor_registry = {}
|
|
|
self._connected = True
|
|
|
self.initialized = True
|
|
|
|
|
|
def _init_database(self):
|
|
|
"""Initialize DuckDB database with required tables"""
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
INSTALL json;
|
|
|
LOAD json;
|
|
|
INSTALL httpfs;
|
|
|
LOAD httpfs;
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS vram_blocks (
|
|
|
block_id VARCHAR PRIMARY KEY,
|
|
|
size BIGINT,
|
|
|
data BLOB, -- Store tensor data directly
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
last_accessed TIMESTAMP,
|
|
|
is_pinned BOOLEAN DEFAULT FALSE,
|
|
|
device_id VARCHAR
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS models (
|
|
|
model_id VARCHAR PRIMARY KEY,
|
|
|
name VARCHAR,
|
|
|
version VARCHAR,
|
|
|
data BLOB, -- Store model data directly
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
last_accessed TIMESTAMP,
|
|
|
is_loaded BOOLEAN DEFAULT FALSE,
|
|
|
config JSON
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS cache (
|
|
|
cache_id VARCHAR PRIMARY KEY,
|
|
|
key VARCHAR UNIQUE,
|
|
|
data BLOB, -- Store cached data directly
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
accessed_at TIMESTAMP,
|
|
|
expires_at TIMESTAMP,
|
|
|
size BIGINT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS states (
|
|
|
state_id VARCHAR PRIMARY KEY,
|
|
|
name VARCHAR,
|
|
|
data BLOB, -- Store state data directly
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_at TIMESTAMP,
|
|
|
parent_id VARCHAR,
|
|
|
is_checkpoint BOOLEAN DEFAULT FALSE
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS tensor_ops (
|
|
|
op_id VARCHAR PRIMARY KEY,
|
|
|
core_id VARCHAR,
|
|
|
operation_type VARCHAR,
|
|
|
input_tensors JSON,
|
|
|
output_tensors JSON,
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
completed_at TIMESTAMP,
|
|
|
status VARCHAR,
|
|
|
execution_time_ns BIGINT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS tensor_core_states (
|
|
|
core_id VARCHAR PRIMARY KEY,
|
|
|
array_id VARCHAR,
|
|
|
current_op VARCHAR,
|
|
|
register_state JSON,
|
|
|
shared_memory_state JSON,
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_at TIMESTAMP,
|
|
|
status VARCHAR,
|
|
|
is_active BOOLEAN
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS core_communication (
|
|
|
comm_id VARCHAR PRIMARY KEY,
|
|
|
source_core_id VARCHAR,
|
|
|
target_core_id VARCHAR,
|
|
|
data_id VARCHAR,
|
|
|
metadata JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
completed_at TIMESTAMP,
|
|
|
status VARCHAR,
|
|
|
transfer_size_bytes BIGINT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_vram_blocks_device ON vram_blocks(device_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_vram_blocks_accessed ON vram_blocks(last_accessed)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_models_name ON models(name)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_models_loaded ON models(is_loaded)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_accessed ON cache(accessed_at)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_at)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_states_parent ON states(parent_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_states_updated ON states(updated_at)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_tensor_ops_core ON tensor_ops(core_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_tensor_ops_status ON tensor_ops(status)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_core_states_array ON tensor_core_states(array_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_core_states_active ON tensor_core_states(is_active)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_source ON core_communication(source_core_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_target ON core_communication(target_core_id)")
|
|
|
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_status ON core_communication(status)")
|
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
def _sync_to_huggingface(self):
|
|
|
"""Periodically sync the database to HuggingFace if using HF storage"""
|
|
|
while not self._closing:
|
|
|
time.sleep(300)
|
|
|
if hasattr(self, 'hf_api') and not self._closing:
|
|
|
try:
|
|
|
|
|
|
self.conn.close()
|
|
|
|
|
|
|
|
|
_, _, owner, dataset, db_file = self.db_url.split('/', 4)
|
|
|
self.hf_api.upload_file(
|
|
|
path_or_fileobj=str(self.db_path),
|
|
|
path_in_repo=db_file,
|
|
|
repo_id=f"datasets/{owner}/{dataset}",
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
|
|
|
self.conn = duckdb.connect(str(self.db_path))
|
|
|
self.conn.execute("""
|
|
|
INSTALL httpfs;
|
|
|
LOAD httpfs;
|
|
|
SET s3_endpoint='hf.co';
|
|
|
SET s3_use_ssl=true;
|
|
|
SET s3_url_style='path';
|
|
|
""")
|
|
|
|
|
|
self.conn.execute(f"SET s3_access_key_id='{self.hf_token}';")
|
|
|
self.conn.execute(f"SET s3_secret_access_key='{self.hf_token}';")
|
|
|
except Exception as e:
|
|
|
logging.error(f"Failed to sync database to HuggingFace: {e}")
|
|
|
|
|
|
def _store_in_db(self, table: str, data_id: str, data: Union[bytes, np.ndarray], metadata: Dict = None, **kwargs):
|
|
|
"""Store entry in database using DuckDB"""
|
|
|
metadata_json = json.dumps(metadata) if metadata else None
|
|
|
|
|
|
|
|
|
if isinstance(data, np.ndarray):
|
|
|
data = data.tobytes()
|
|
|
|
|
|
|
|
|
fields = ['data', 'metadata']
|
|
|
values = [data, metadata_json]
|
|
|
|
|
|
|
|
|
for key, value in kwargs.items():
|
|
|
fields.append(key)
|
|
|
values.append(value)
|
|
|
|
|
|
|
|
|
id_column = f"{table[:-1]}_id" if table.endswith('s') else 'id'
|
|
|
fields_str = ','.join([id_column] + fields)
|
|
|
placeholders = ','.join(['?' for _ in range(len(fields) + 1)])
|
|
|
|
|
|
|
|
|
query = f"""
|
|
|
DELETE FROM {table} WHERE {id_column} = ?;
|
|
|
INSERT INTO {table} ({fields_str}) VALUES ({placeholders});
|
|
|
"""
|
|
|
|
|
|
|
|
|
self.conn.execute("BEGIN TRANSACTION")
|
|
|
try:
|
|
|
|
|
|
self.conn.execute(query, [data_id] + values)
|
|
|
self.conn.commit()
|
|
|
except Exception as e:
|
|
|
self.conn.rollback()
|
|
|
raise e
|
|
|
|
|
|
def _get_from_db(self, table: str, data_id: str) -> Optional[Dict]:
|
|
|
"""Retrieve entry from database using DuckDB"""
|
|
|
id_column = f"{table[:-1]}_id" if table.endswith('s') else 'id'
|
|
|
query = f"SELECT * FROM {table} WHERE {id_column} = ?"
|
|
|
|
|
|
result = self.conn.execute(query, [data_id]).fetchone()
|
|
|
if result:
|
|
|
|
|
|
columns = self.conn.execute(f"DESCRIBE {table}").fetchall()
|
|
|
column_names = [col[0] for col in columns]
|
|
|
result_dict = dict(zip(column_names, result))
|
|
|
|
|
|
|
|
|
if result_dict.get('metadata'):
|
|
|
result_dict['metadata'] = json.loads(result_dict['metadata'])
|
|
|
return result_dict
|
|
|
return None
|
|
|
|
|
|
def is_connected(self) -> bool:
|
|
|
"""Check if storage is connected (always True for local storage)"""
|
|
|
return self._connected and not self._closing and self.ping()
|
|
|
|
|
|
def close(self):
|
|
|
"""Close storage connection"""
|
|
|
self._closing = True
|
|
|
self._connected = False
|
|
|
|
|
|
|
|
|
self.resource_monitor = {
|
|
|
'vram_used': 0,
|
|
|
'active_tensors': 0,
|
|
|
'loaded_models': set(),
|
|
|
'last_updated': time.time()
|
|
|
}
|
|
|
|
|
|
|
|
|
self.model_registry = {}
|
|
|
self._connected = True
|
|
|
self.model_registry = {}
|
|
|
self._connected = True
|
|
|
|
|
|
def is_model_loaded(self, model_id: str) -> bool:
|
|
|
"""Check if a model is loaded in storage"""
|
|
|
if not model_id:
|
|
|
return False
|
|
|
|
|
|
|
|
|
result = self.conn.execute(
|
|
|
"SELECT is_loaded FROM models WHERE model_id = ?",
|
|
|
[model_id]
|
|
|
).fetchone()
|
|
|
|
|
|
return bool(result[0]) if result else False
|
|
|
|
|
|
def wait_for_connection(self, timeout: float = 30.0) -> bool:
|
|
|
"""Wait for database connection to be ready"""
|
|
|
end_time = time.time() + timeout
|
|
|
while time.time() < end_time:
|
|
|
if self._check_storage_ready():
|
|
|
return True
|
|
|
time.sleep(0.5)
|
|
|
return False
|
|
|
|
|
|
def __init__(self, db_url: str = None):
|
|
|
"""This will actually just return the singleton instance.
|
|
|
The actual initialization happens in __new__ and _init_singleton"""
|
|
|
pass
|
|
|
|
|
|
def _check_storage_ready(self) -> bool:
|
|
|
"""Check if storage is ready for use"""
|
|
|
try:
|
|
|
|
|
|
result = self.conn.execute("SELECT 1").fetchone()
|
|
|
if not result or result[0] != 1:
|
|
|
return False
|
|
|
|
|
|
|
|
|
self.stats.update({
|
|
|
'model_count': self.conn.execute("SELECT COUNT(*) FROM models").fetchone()[0],
|
|
|
'tensor_count': self.conn.execute("SELECT COUNT(*) FROM vram_blocks").fetchone()[0],
|
|
|
'total_size': self.conn.execute(
|
|
|
"SELECT COALESCE(SUM(size), 0) FROM vram_blocks"
|
|
|
).fetchone()[0]
|
|
|
})
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Storage check failed: {e}")
|
|
|
return False
|
|
|
|
|
|
def _check_storage(self) -> Dict[str, Any]:
|
|
|
"""Check storage status and usage"""
|
|
|
try:
|
|
|
|
|
|
stats = self.conn.execute("""
|
|
|
SELECT
|
|
|
COALESCE(SUM(size), 0) as total_size,
|
|
|
COUNT(*) as block_count
|
|
|
FROM vram_blocks
|
|
|
""").fetchone()
|
|
|
|
|
|
self.storage_monitor.update({
|
|
|
'total_size': stats[0],
|
|
|
'last_access': time.time(),
|
|
|
'block_count': stats[1]
|
|
|
})
|
|
|
return {"status": "ok", "monitor": self.storage_monitor}
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error checking storage: {e}")
|
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def store_tensor(self, tensor_id: str, data: np.ndarray, model_size: Optional[int] = None) -> bool:
|
|
|
"""Store tensor data in database"""
|
|
|
try:
|
|
|
if data is None:
|
|
|
raise ValueError("Cannot store None tensor")
|
|
|
|
|
|
|
|
|
tensor_shape = data.shape
|
|
|
tensor_dtype = str(data.dtype)
|
|
|
tensor_size = data.nbytes
|
|
|
|
|
|
|
|
|
tensor_bytes = data.tobytes()
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
'shape': tensor_shape,
|
|
|
'dtype': tensor_dtype,
|
|
|
'size': tensor_size,
|
|
|
'timestamp': time.time(),
|
|
|
'model_size': model_size if model_size is not None else -1
|
|
|
}
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
INSERT INTO vram_blocks (
|
|
|
block_id, data, metadata, size, created_at, last_accessed
|
|
|
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
|
ON CONFLICT (block_id) DO UPDATE SET
|
|
|
data = excluded.data,
|
|
|
metadata = excluded.metadata,
|
|
|
size = excluded.size,
|
|
|
last_accessed = CURRENT_TIMESTAMP
|
|
|
""", [tensor_id, tensor_bytes, json.dumps(metadata), tensor_size])
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
self.resource_monitor['vram_used'] += tensor_size
|
|
|
self.resource_monitor['active_tensors'] += 1
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error storing tensor {tensor_id}: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def load_tensor(self, tensor_id: str) -> Optional[np.ndarray]:
|
|
|
"""Load tensor data from database"""
|
|
|
try:
|
|
|
|
|
|
result = self.conn.execute("""
|
|
|
SELECT data, metadata
|
|
|
FROM vram_blocks
|
|
|
WHERE block_id = ?
|
|
|
""", [tensor_id]).fetchone()
|
|
|
|
|
|
if not result:
|
|
|
logging.warning(f"Tensor {tensor_id} not found in database")
|
|
|
return None
|
|
|
|
|
|
tensor_bytes, metadata_str = result
|
|
|
metadata = json.loads(metadata_str)
|
|
|
|
|
|
|
|
|
arr = np.frombuffer(tensor_bytes, dtype=metadata['dtype'])
|
|
|
arr = arr.reshape(metadata['shape'])
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
UPDATE vram_blocks
|
|
|
SET last_accessed = CURRENT_TIMESTAMP
|
|
|
WHERE block_id = ?
|
|
|
""", [tensor_id])
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
if tensor_id not in self.tensor_registry:
|
|
|
self.tensor_registry[tensor_id] = metadata
|
|
|
|
|
|
return arr
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error loading tensor {tensor_id}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def store_state(self, component: str, state_id: str, state_data: Dict[str, Any]) -> bool:
|
|
|
"""Store component state in database"""
|
|
|
try:
|
|
|
|
|
|
metadata = {
|
|
|
'component': component,
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
|
|
|
self.conn.execute("""
|
|
|
INSERT INTO states (
|
|
|
state_id, name, data, metadata, created_at, updated_at
|
|
|
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
|
ON CONFLICT (state_id) DO UPDATE SET
|
|
|
data = excluded.data,
|
|
|
metadata = excluded.metadata,
|
|
|
updated_at = CURRENT_TIMESTAMP
|
|
|
""", [state_id, component, json.dumps(state_data), json.dumps(metadata)])
|
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error storing state for {component}/{state_id}: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def load_state(self, component: str, state_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Load component state from database"""
|
|
|
try:
|
|
|
|
|
|
result = self.conn.execute("""
|
|
|
SELECT data
|
|
|
FROM states
|
|
|
WHERE state_id = ? AND name = ?
|
|
|
""", [state_id, component]).fetchone()
|
|
|
|
|
|
if not result:
|
|
|
logging.warning(f"State not found for {component}/{state_id}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
UPDATE states
|
|
|
SET updated_at = CURRENT_TIMESTAMP
|
|
|
WHERE state_id = ?
|
|
|
""", [state_id])
|
|
|
|
|
|
return json.loads(result[0])
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error loading state for {component}/{state_id}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def load_model(self, model_name: str, model_data: Optional[Union[bytes, Dict]] = None, model_config: Optional[Dict] = None) -> bool:
|
|
|
"""Load a model into storage"""
|
|
|
try:
|
|
|
|
|
|
if self.is_model_loaded(model_name):
|
|
|
logging.info(f"Model {model_name} already loaded")
|
|
|
return True
|
|
|
|
|
|
|
|
|
model_id = hashlib.md5(model_name.encode()).hexdigest()
|
|
|
|
|
|
|
|
|
if isinstance(model_data, dict):
|
|
|
model_data = json.dumps(model_data).encode()
|
|
|
|
|
|
self.conn.execute("""
|
|
|
INSERT INTO models (
|
|
|
model_id, name, version, data, metadata, config,
|
|
|
created_at, last_accessed, is_loaded
|
|
|
) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)
|
|
|
ON CONFLICT (model_id) DO UPDATE SET
|
|
|
data = excluded.data,
|
|
|
metadata = excluded.metadata,
|
|
|
config = excluded.config,
|
|
|
last_accessed = CURRENT_TIMESTAMP,
|
|
|
is_loaded = TRUE
|
|
|
""", [
|
|
|
model_id,
|
|
|
model_name,
|
|
|
"1.0",
|
|
|
model_data or b"",
|
|
|
json.dumps({"source": "direct_load"}),
|
|
|
json.dumps(model_config) if model_config else "{}"
|
|
|
])
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
self.model_registry[model_name] = {
|
|
|
'id': model_id,
|
|
|
'loaded': True,
|
|
|
'last_access': time.time()
|
|
|
}
|
|
|
|
|
|
logging.info(f"Successfully loaded model {model_name}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"Error loading model {model_name}: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
def ping(self) -> bool:
|
|
|
"""Check if storage is accessible"""
|
|
|
if self._closing:
|
|
|
return False
|
|
|
return self._check_storage_ready()
|
|
|
|