|
|
from typing import Dict, List, Union, Optional, Any, Tuple
|
|
|
import numpy as np
|
|
|
import threading
|
|
|
from queue import Queue
|
|
|
import sys
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent))
|
|
|
|
|
|
from virtual_vram import VirtualVRAM
|
|
|
from tensor_core import TensorCore, TensorCoreArray
|
|
|
from http_storage import LocalStorage
|
|
|
|
|
|
class VirtualGPUDevice:
|
|
|
"""Adapter for Virtual GPU integration with Helium"""
|
|
|
|
|
|
def __init__(self, device_id: int = 0, memory_size: Optional[int] = None):
|
|
|
"""Initialize virtual GPU device
|
|
|
|
|
|
Args:
|
|
|
device_id: Virtual GPU device ID
|
|
|
memory_size: VRAM size in GB (None for unlimited)
|
|
|
"""
|
|
|
self.device_id = device_id
|
|
|
|
|
|
|
|
|
self.vram = VirtualVRAM(size_gb=memory_size)
|
|
|
|
|
|
|
|
|
self.tensor_cores = TensorCoreArray(
|
|
|
num_tensor_cores=8000,
|
|
|
memory_size=None,
|
|
|
device_id=device_id
|
|
|
)
|
|
|
|
|
|
|
|
|
self._command_queue: Queue = Queue()
|
|
|
self._worker_thread = threading.Thread(target=self._process_commands, daemon=True)
|
|
|
self._worker_thread.start()
|
|
|
|
|
|
|
|
|
self._tensor_cache: Dict[str, Any] = {}
|
|
|
|
|
|
def _process_commands(self):
|
|
|
"""Process commands from queue"""
|
|
|
while True:
|
|
|
cmd = self._command_queue.get()
|
|
|
if cmd is None:
|
|
|
break
|
|
|
|
|
|
op, args, kwargs = cmd
|
|
|
if hasattr(self.tensor_cores, op):
|
|
|
getattr(self.tensor_cores, op)(*args, **kwargs)
|
|
|
|
|
|
def allocate(self, shape: Tuple[int, ...], dtype=np.float32) -> str:
|
|
|
"""Allocate memory on virtual GPU
|
|
|
|
|
|
Returns:
|
|
|
Tensor ID in virtual GPU memory
|
|
|
"""
|
|
|
size = np.prod(shape) * np.dtype(dtype).itemsize
|
|
|
tensor_id = self.vram.allocate(size)
|
|
|
self._tensor_cache[tensor_id] = {
|
|
|
'tensor_id': tensor_id,
|
|
|
'shape': shape,
|
|
|
'dtype': dtype
|
|
|
}
|
|
|
return tensor_id
|
|
|
|
|
|
def to_gpu(self, data: np.ndarray) -> str:
|
|
|
"""Copy numpy array to virtual GPU memory"""
|
|
|
tensor_id = self.allocate(data.shape, data.dtype)
|
|
|
self.vram.store_tensor(tensor_id, data)
|
|
|
return tensor_id
|
|
|
|
|
|
def from_gpu(self, tensor_id: str) -> np.ndarray:
|
|
|
"""Copy data from virtual GPU to CPU"""
|
|
|
info = self._tensor_cache[tensor_id]
|
|
|
data = self.vram.load_tensor(info['tensor_id'])
|
|
|
return np.asarray(data, dtype=info['dtype']).reshape(info['shape'])
|
|
|
|
|
|
def matmul(self, a: Union[str, "HeliumTensor"], b: Union[str, "HeliumTensor"]) -> str:
|
|
|
"""Matrix multiplication on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
b_id = b if isinstance(b, str) else self.to_gpu(b.numpy())
|
|
|
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
b_info = self._tensor_cache[b_id]
|
|
|
|
|
|
out_shape = (a_info['shape'][0], b_info['shape'][1])
|
|
|
out_id = self.allocate(out_shape, a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
b_data = self.vram.load_tensor(b_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'matmul',
|
|
|
(a_data, b_data),
|
|
|
{'out_id': self._tensor_cache[out_id]['tensor_id']}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def add(self, a: Union[str, "HeliumTensor"], b: Union[str, "HeliumTensor"]) -> str:
|
|
|
"""Element-wise addition on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
b_id = b if isinstance(b, str) else self.to_gpu(b.numpy())
|
|
|
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
b_info = self._tensor_cache[b_id]
|
|
|
out_id = self.allocate(a_info['shape'], a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
b_data = self.vram.load_tensor(b_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'add',
|
|
|
(a_data, b_data),
|
|
|
{'out_id': self._tensor_cache[out_id]['tensor_id']}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def mul(self, a: Union[str, "HeliumTensor"], b: Union[str, "HeliumTensor"]) -> str:
|
|
|
"""Element-wise multiplication on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
b_id = b if isinstance(b, str) else self.to_gpu(b.numpy())
|
|
|
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
b_info = self._tensor_cache[b_id]
|
|
|
out_id = self.allocate(a_info['shape'], a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
b_data = self.vram.load_tensor(b_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'multiply',
|
|
|
(a_data, b_data),
|
|
|
{'out_id': self._tensor_cache[out_id]['tensor_id']}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def mul_scalar(self, a: Union[str, "HeliumTensor"], scalar: float) -> str:
|
|
|
"""Scalar multiplication on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
out_id = self.allocate(a_info['shape'], a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'scalar_multiply',
|
|
|
(a_data, scalar),
|
|
|
{'out_id': self._tensor_cache[out_id]['tensor_id']}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def transpose(self, a: Union[str, "HeliumTensor"], axes: Optional[Tuple[int, ...]] = None) -> str:
|
|
|
"""Transpose tensor on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
|
|
|
if axes is None:
|
|
|
axes = tuple(range(len(a_info['shape'])-1, -1, -1))
|
|
|
|
|
|
new_shape = tuple(a_info['shape'][i] for i in axes)
|
|
|
out_id = self.allocate(new_shape, a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'transpose',
|
|
|
(a_data,),
|
|
|
{
|
|
|
'axes': axes,
|
|
|
'out_id': self._tensor_cache[out_id]['tensor_id']
|
|
|
}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def reshape(self, a: Union[str, "HeliumTensor"], new_shape: Tuple[int, ...]) -> str:
|
|
|
"""Reshape tensor on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
|
|
|
|
|
|
if np.prod(new_shape) != np.prod(a_info['shape']):
|
|
|
raise ValueError("New shape must have same total size as old shape")
|
|
|
|
|
|
out_id = self.allocate(new_shape, a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'reshape',
|
|
|
(a_data,),
|
|
|
{
|
|
|
'new_shape': new_shape,
|
|
|
'out_id': self._tensor_cache[out_id]['tensor_id']
|
|
|
}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def softmax(self, a: Union[str, "HeliumTensor"], axis: int = -1) -> str:
|
|
|
"""Softmax on virtual GPU"""
|
|
|
a_id = a if isinstance(a, str) else self.to_gpu(a.numpy())
|
|
|
a_info = self._tensor_cache[a_id]
|
|
|
out_id = self.allocate(a_info['shape'], a_info['dtype'])
|
|
|
|
|
|
|
|
|
a_data = self.vram.load_tensor(a_info['tensor_id'])
|
|
|
|
|
|
|
|
|
self._command_queue.put((
|
|
|
'softmax',
|
|
|
(a_data,),
|
|
|
{
|
|
|
'axis': axis,
|
|
|
'out_id': self._tensor_cache[out_id]['tensor_id']
|
|
|
}
|
|
|
))
|
|
|
|
|
|
return out_id
|
|
|
|
|
|
def get_tensor(self, tensor_id: str) -> np.ndarray:
|
|
|
"""Get tensor data"""
|
|
|
if tensor_id not in self._tensor_cache:
|
|
|
raise KeyError(f"Tensor {tensor_id} not found")
|
|
|
return self.from_gpu(tensor_id)
|
|
|
|
|
|
def tensor_exists(self, tensor_id: str) -> bool:
|
|
|
"""Check if tensor exists in virtual GPU memory"""
|
|
|
return tensor_id in self._tensor_cache
|
|
|
|
|
|
def delete_tensor(self, tensor_id: str):
|
|
|
"""Free tensor memory"""
|
|
|
if tensor_id in self._tensor_cache:
|
|
|
self.vram.free(self._tensor_cache[tensor_id]['tensor_id'])
|
|
|
del self._tensor_cache[tensor_id]
|
|
|
|
|
|
def __del__(self):
|
|
|
"""Cleanup allocated memory and stop worker"""
|
|
|
|
|
|
self._command_queue.put(None)
|
|
|
if hasattr(self, '_worker_thread'):
|
|
|
self._worker_thread.join()
|
|
|
|
|
|
|
|
|
for tensor_id in list(self._tensor_cache.keys()):
|
|
|
self.delete_tensor(tensor_id)
|
|
|
|