INV / helium /tensor.py
Fred808's picture
Upload 256 files
7a0c684 verified
import numpy as np
from typing import Optional, Tuple, Union, List
from .virtual_gpu_device import VirtualGPUDevice
from .device_utils import get_gpu_device, get_default_device
class HeliumTensor:
"""Base tensor class for Helium framework with virtual GPU support"""
def __init__(
self,
data: Union[np.ndarray, "HeliumTensor", str],
device: Optional[Union[str, VirtualGPUDevice]] = None,
requires_grad: bool = False
):
self.device = device if device is not None else get_default_device()
if isinstance(self.device, str):
self.device = get_gpu_device(self.device)
self.requires_grad = requires_grad
self.grad: Optional[Union[np.ndarray, str]] = None
self._backward_fn = lambda: None
if isinstance(data, HeliumTensor):
if data.device == self.device:
self.data = data.data
else:
# Move data to new device
if isinstance(data.data, str):
# Data is already on a GPU
cpu_data = data.device.from_gpu(data.data)
self.data = self.device.to_gpu(cpu_data)
else:
self.data = self.device.to_gpu(data.data)
elif isinstance(data, str):
# Data is a tensor ID from virtual GPU
if hasattr(self.device, 'tensor_exists') and self.device.tensor_exists(data):
self.data = data
else:
raise ValueError(f"Tensor {data} not found on device {self.device}")
else:
# Data is a numpy array
self.data = self.device.to_gpu(np.asarray(data))
@property
def shape(self) -> Tuple[int, ...]:
if isinstance(self.data, str):
return self.device.get_tensor(self.data).shape
return self.data.shape
@property
def dtype(self):
if isinstance(self.data, str):
return self.device.get_tensor(self.data).dtype
return self.data.dtype
def numpy(self) -> np.ndarray:
"""Get tensor data as numpy array"""
if isinstance(self.data, str):
return self.device.from_gpu(self.data)
return self.data
def __repr__(self) -> str:
shape = self.shape
dtype = self.dtype
device_str = f"gpu{self.device.device_id}" if hasattr(self.device, 'device_id') else str(self.device)
return f"HeliumTensor(shape={shape}, dtype={dtype}, device={device_str})"
def to(self, device) -> "HeliumTensor":
"""Move tensor to specified device"""
if isinstance(device, str):
device = get_gpu_device(device)
if device == self.device:
return self
# Move data to new device
if isinstance(self.data, str):
# Data is already on a GPU
cpu_data = self.device.from_gpu(self.data)
new_data = device.to_gpu(cpu_data)
else:
new_data = device.to_gpu(self.data)
return HeliumTensor(new_data, device, requires_grad=self.requires_grad)
def detach(self) -> "HeliumTensor":
"""Create a new tensor detached from current compute graph"""
return HeliumTensor(self.data, self.device, requires_grad=False)
def backward(self, gradient: Optional[Union[np.ndarray, "HeliumTensor"]] = None):
"""Compute gradients through backward pass"""
if not self.requires_grad:
return
if gradient is None:
if isinstance(self.data, str):
ones = np.ones(self.shape)
gradient = self.device.to_gpu(ones)
else:
gradient = np.ones_like(self.data)
elif isinstance(gradient, HeliumTensor):
if gradient.device != self.device:
gradient = gradient.to(self.device)
gradient = gradient.data
self.grad = gradient
self._backward_fn()
def zero_grad(self):
"""Zero out gradients"""
if self.grad is not None:
if isinstance(self.grad, str):
self.device.delete_tensor(self.grad)
self.grad = None
else:
self.grad.fill(0)
return
if gradient is None:
gradient = np.ones_like(self.data)
self.grad = gradient
self._backward_fn()
def zero_grad(self):
"""Zero out gradients"""
if self.grad is not None:
self.grad.fill(0)