NEWM / virtual_gpu /vram.py
Factor Studios
Upload 167 files
684cc60 verified
import numpy as np
from collections import OrderedDict
from typing import Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
import time
@dataclass
class MemoryBlock:
"""Represents a block of memory in the symbolic VRAM."""
address: int
size: int
data: Optional[Any]
allocated_time: float
last_accessed: float
class Framebuffer:
"""Represents a 2D drawing surface in VRAM."""
def __init__(self, width: int, height: int, channels: int = 3, dtype=np.uint8):
self.width = width
self.height = height
self.channels = channels
self.dtype = dtype
# Create the pixel buffer symbolically to avoid large allocations
# The actual pixel data will be managed by the MemoryManager
self.pixel_buffer_address: Optional[int] = None
self.pixel_buffer_size: int = width * height * channels * np.dtype(dtype).itemsize
self.pixel_buffer = np.zeros((height, width, channels), dtype=dtype)
self.vram_address: Optional[int] = None # This is the address in the MemoryManager
def resize(self, new_width: int, new_height: int) -> None:
# No actual data to resize, just update symbolic size
self.width = new_width
self.height = new_height
self.pixel_buffer_size = new_width * new_height * self.channels * np.dtype(self.dtype).itemsize
def clear(self, color: Tuple[int, int, int]) -> None:
self.pixel_buffer[:, :] = color
def get_pixel(self, x: int, y: int) -> np.ndarray:
if 0 <= x < self.width and 0 <= y < self.height:
return self.pixel_buffer[y, x]
return np.zeros(self.channels, dtype=self.dtype)
def set_pixel(self, x: int, y: int, color: Tuple[int, int, int]) -> None:
if 0 <= x < self.width and 0 <= y < self.height:
self.pixel_buffer[y, x] = color[:self.channels]
def get_memory_usage(self) -> int:
"""Get the memory usage of this framebuffer in bytes."""
return self.pixel_buffer_size
class MemoryManager:
"""Manages the symbolic 500GB GDDR7 memory space."""
def __init__(self, total_memory_gb: int = 500, block_size_kb: int = 4):
self.total_memory_bytes = total_memory_gb * 1024 * 1024 * 1024 # 500GB
self.block_size_bytes = block_size_kb * 1024 # 4KB blocks
self.total_blocks = self.total_memory_bytes // self.block_size_bytes
# Symbolic memory space - only allocated blocks are stored
self.memory_blocks: Dict[int, MemoryBlock] = {}
# Free block tracking - use a list of free block ranges instead of a set of all blocks
self.free_block_ranges = [(0, self.total_blocks - 1)] # (start_block_id, end_block_id)
self.allocated_blocks = set() # Still track allocated blocks for quick lookup
# Address allocation counter
self.next_address = 0
def allocate_block(self, size_bytes: int) -> Optional[int]:
"""Allocate a block of memory and return its address."""
blocks_needed = (size_bytes + self.block_size_bytes - 1) // self.block_size_bytes
# Find a suitable contiguous block range
for i, (start, end) in enumerate(self.free_block_ranges):
available_blocks = end - start + 1
if available_blocks >= blocks_needed:
# Found a suitable range
base_block_id = start
# Update free_block_ranges
new_start = start + blocks_needed
if new_start <= end:
self.free_block_ranges[i] = (new_start, end)
else:
self.free_block_ranges.pop(i)
# Add to allocated_blocks
for j in range(blocks_needed):
self.allocated_blocks.add(base_block_id + j)
# Create memory block
base_address = base_block_id * self.block_size_bytes
memory_block = MemoryBlock(
address=base_address,
size=size_bytes,
data=bytearray(size_bytes), # Allocate actual bytearray for data
allocated_time=time.time(),
last_accessed=time.time()
)
self.memory_blocks[base_address] = memory_block
return base_address
return None # Out of memory
def deallocate_block(self, address: int) -> bool:
"""Deallocate a block of memory."""
if address in self.memory_blocks:
memory_block = self.memory_blocks[address]
blocks_to_free = (memory_block.size + self.block_size_bytes - 1) // self.block_size_bytes
base_block_id = address // self.block_size_bytes
for i in range(blocks_to_free):
block_id = base_block_id + i
if block_id in self.allocated_blocks:
self.allocated_blocks.remove(block_id)
# Add back to free_block_ranges (simple merge for now)
self.free_block_ranges.append((block_id, block_id))
self.free_block_ranges.sort() # Keep sorted for efficient merging
del self.memory_blocks[address]
return True
return False
def read_data(self, address: int, size: int) -> Optional[np.ndarray]:
"""Read data from memory."""
if address in self.memory_blocks:
memory_block = self.memory_blocks[address]
if memory_block.data is not None and size <= memory_block.size:
return np.frombuffer(memory_block.data[:size], dtype=np.uint8) # Return as numpy array
return None
def write_data(self, address: int, data: Union[np.ndarray, bytes]) -> bool:
"""Write data to memory."""
if address in self.memory_blocks:
memory_block = self.memory_blocks[address]
if memory_block.data is not None:
if isinstance(data, np.ndarray):
data_bytes = data.tobytes()
elif isinstance(data, bytes):
data_bytes = data
else:
raise TypeError("Data must be a NumPy array or bytes.")
if len(data_bytes) <= memory_block.size:
memory_block.data[:len(data_bytes)] = data_bytes
return True
return False
def get_memory_stats(self) -> Dict[str, Any]:
"""Get memory usage statistics."""
allocated_bytes = sum(block.size for block in self.memory_blocks.values())
free_bytes = self.total_memory_bytes - allocated_bytes
return {
"total_memory_gb": self.total_memory_bytes / (1024**3),
"allocated_bytes": allocated_bytes,
"free_bytes": free_bytes,
"allocated_blocks_count": len(self.allocated_blocks),
"free_block_ranges_count": len(self.free_block_ranges),
"utilization_percent": (allocated_bytes / self.total_memory_bytes) * 100 if self.total_memory_bytes > 0 else 0
}
class VRAM:
"""
Main VRAM class that provides the interface for the 500GB GDDR7 memory.
This class combines the MemoryManager for low-level memory operations
with higher-level abstractions like Framebuffers.
"""
def __init__(self, memory_size_gb: int = 500):
self.memory_manager = MemoryManager(memory_size_gb)
# Cache for frequently accessed data (simulates L1/L2 cache)
self.cache_size = 1000 # Number of cache entries
self.cache = OrderedDict()
# Framebuffer registry
self.framebuffers: Dict[str, Framebuffer] = {}
self.framebuffer_counter = 0
# Texture registry
self.textures: Dict[str, np.ndarray] = {}
self.texture_counter = 0
def create_framebuffer(self, width: int, height: int, channels: int = 3,
name: Optional[str] = None) -> str:
"""Create a new framebuffer and return its ID."""
if name is None:
name = f"framebuffer_{self.framebuffer_counter}"
self.framebuffer_counter += 1
framebuffer = Framebuffer(width, height, channels)
# Allocate memory for the framebuffer
memory_size = framebuffer.get_memory_usage()
address = self.memory_manager.allocate_block(memory_size)
if address is not None:
framebuffer.vram_address = address
self.framebuffers[name] = framebuffer
return name
else:
raise MemoryError("Failed to allocate memory for framebuffer")
def get_framebuffer(self, name: str) -> Optional[Framebuffer]:
"""Get a framebuffer by name."""
return self.framebuffers.get(name)
def delete_framebuffer(self, name: str) -> bool:
"""Delete a framebuffer and free its memory."""
if name in self.framebuffers:
framebuffer = self.framebuffers[name]
if framebuffer.vram_address is not None:
self.memory_manager.deallocate_block(framebuffer.vram_address)
del self.framebuffers[name]
return True
return False
def load_texture(self, texture_data: Union[np.ndarray, bytes], name: Optional[str] = None) -> str:
"""Load texture data into VRAM and return its ID."""
if name is None:
name = f"texture_{self.texture_counter}"
self.texture_counter += 1
size_bytes = 0
if isinstance(texture_data, np.ndarray):
size_bytes = texture_data.nbytes
elif isinstance(texture_data, bytes):
size_bytes = len(texture_data)
else:
raise TypeError("Texture data must be a NumPy array or bytes.")
# Allocate memory for the texture
address = self.memory_manager.allocate_block(size_bytes)
if address is not None:
self.memory_manager.write_data(address, texture_data) # Write actual data
self.textures[name] = texture_data # Store actual data for reference
return name
else:
raise MemoryError("Failed to allocate memory for texture")
def get_texture(self, name: str) -> Optional[np.ndarray]:
"""Get texture data by name."""
return self.textures.get(name)
def cache_read(self, address: int, size: int) -> Optional[np.ndarray]:
"""Read data with caching support."""
cache_key = (address, size)
# Check cache first
if cache_key in self.cache:
# Move to end (most recently used)
data = self.cache.pop(cache_key)
self.cache[cache_key] = data
return data.copy()
# Read from memory
data = self.memory_manager.read_data(address, size)
if data is not None:
# Add to cache
if len(self.cache) >= self.cache_size:
# Remove least recently used item
self.cache.popitem(last=False)
self.cache[cache_key] = data.copy()
return data
def transfer_from_ram(self, name: str, data: Union[np.ndarray, bytes],
delay_ms: float = 0.0) -> Optional[str]:
"""Transfer a block of data from RAM to VRAM."""
if isinstance(data, np.ndarray):
size_bytes = data.nbytes
data_to_store = data.flatten()
elif isinstance(data, bytes):
size_bytes = len(data)
data_to_store = np.frombuffer(data, dtype=np.uint8)
else:
raise TypeError("Data must be a NumPy array or bytes.")
# Simulate delay
if delay_ms > 0:
time.sleep(delay_ms / 1000.0)
# Allocate memory in VRAM
address = self.memory_manager.allocate_block(size_bytes)
if address is not None:
# Store data in VRAM
self.memory_manager.write_data(address, data_to_store)
# Register the transferred data as a texture/buffer in VRAM
# For simplicity, we\"ll register it as a texture for now
texture_id = f"ram_transfer_{self.texture_counter}"
self.texture_counter += 1
self.textures[texture_id] = data # Store actual data for reference
print(f"Transferred {size_bytes} bytes from RAM to VRAM at address {address} as {texture_id}")
return texture_id
else:
print(f"Failed to transfer {size_bytes} bytes from RAM to VRAM: Out of VRAM memory.")
return None
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive VRAM statistics."""
memory_stats = self.memory_manager.get_memory_stats()
framebuffer_memory = sum(fb.get_memory_usage() for fb in self.framebuffers.values())
texture_memory = sum(tex.nbytes for tex in self.textures.values())
return {
**memory_stats,
"framebuffers_count": len(self.framebuffers),
"textures_count": len(self.textures),
"framebuffer_memory_bytes": framebuffer_memory,
"texture_memory_bytes": texture_memory,
"cache_entries": len(self.cache),
"cache_hit_ratio": 0.0 # TODO: Implement cache hit tracking
}
if __name__ == "__main__":
# Test the VRAM module
vram = VRAM(memory_size_gb=1) # Use 1GB for testing
# Create a framebuffer
fb_id = vram.create_framebuffer(1920, 1080, 3)
print(f"Created framebuffer: {fb_id}")
# Get the framebuffer and modify it
fb = vram.get_framebuffer(fb_id)
if fb:
fb.clear((255, 0, 0)) # Clear to red
fb.set_pixel(100, 100, (0, 255, 0)) # Set a green pixel
print(f"Framebuffer size: {fb.width}x{fb.height}")
print(f"Pixel at (100, 100): {fb.get_pixel(100, 100)}")
# Load a test texture
test_texture = np.random.randint(0, 256, (256, 256, 3), dtype=np.uint8)
tex_id = vram.load_texture(test_texture)
print(f"Loaded texture: {tex_id}")
# Test transfer_from_ram
ram_data = b"\x01\x02\x03\x04\x05\x06\x07\x08"
transferred_id = vram.transfer_from_ram("test_ram_data", ram_data, delay_ms=10)
print(f"Transferred RAM data ID: {transferred_id}")
# Print statistics
stats = vram.get_stats()
print(f"VRAM Stats: {stats}")