Spaces:
Runtime error
Runtime error
| import numpy as np | |
| from collections import OrderedDict | |
| from typing import Dict, Any, Optional, Tuple, Union | |
| from dataclasses import dataclass | |
| import time | |
| class MemoryBlock: | |
| """Represents a block of memory in the symbolic VRAM.""" | |
| address: int | |
| size: int | |
| data: Optional[Any] | |
| allocated_time: float | |
| last_accessed: float | |
| class Framebuffer: | |
| """Represents a 2D drawing surface in VRAM.""" | |
| def __init__(self, width: int, height: int, channels: int = 3, dtype=np.uint8): | |
| self.width = width | |
| self.height = height | |
| self.channels = channels | |
| self.dtype = dtype | |
| # Create the pixel buffer symbolically to avoid large allocations | |
| # The actual pixel data will be managed by the MemoryManager | |
| self.pixel_buffer_address: Optional[int] = None | |
| self.pixel_buffer_size: int = width * height * channels * np.dtype(dtype).itemsize | |
| self.pixel_buffer = np.zeros((height, width, channels), dtype=dtype) | |
| self.vram_address: Optional[int] = None # This is the address in the MemoryManager | |
| def resize(self, new_width: int, new_height: int) -> None: | |
| # No actual data to resize, just update symbolic size | |
| self.width = new_width | |
| self.height = new_height | |
| self.pixel_buffer_size = new_width * new_height * self.channels * np.dtype(self.dtype).itemsize | |
| def clear(self, color: Tuple[int, int, int]) -> None: | |
| self.pixel_buffer[:, :] = color | |
| def get_pixel(self, x: int, y: int) -> np.ndarray: | |
| if 0 <= x < self.width and 0 <= y < self.height: | |
| return self.pixel_buffer[y, x] | |
| return np.zeros(self.channels, dtype=self.dtype) | |
| def set_pixel(self, x: int, y: int, color: Tuple[int, int, int]) -> None: | |
| if 0 <= x < self.width and 0 <= y < self.height: | |
| self.pixel_buffer[y, x] = color[:self.channels] | |
| def get_memory_usage(self) -> int: | |
| """Get the memory usage of this framebuffer in bytes.""" | |
| return self.pixel_buffer_size | |
| class MemoryManager: | |
| """Manages the symbolic 500GB GDDR7 memory space.""" | |
| def __init__(self, total_memory_gb: int = 500, block_size_kb: int = 4): | |
| self.total_memory_bytes = total_memory_gb * 1024 * 1024 * 1024 # 500GB | |
| self.block_size_bytes = block_size_kb * 1024 # 4KB blocks | |
| self.total_blocks = self.total_memory_bytes // self.block_size_bytes | |
| # Symbolic memory space - only allocated blocks are stored | |
| self.memory_blocks: Dict[int, MemoryBlock] = {} | |
| # Free block tracking - use a list of free block ranges instead of a set of all blocks | |
| self.free_block_ranges = [(0, self.total_blocks - 1)] # (start_block_id, end_block_id) | |
| self.allocated_blocks = set() # Still track allocated blocks for quick lookup | |
| # Address allocation counter | |
| self.next_address = 0 | |
| def allocate_block(self, size_bytes: int) -> Optional[int]: | |
| """Allocate a block of memory and return its address.""" | |
| blocks_needed = (size_bytes + self.block_size_bytes - 1) // self.block_size_bytes | |
| # Find a suitable contiguous block range | |
| for i, (start, end) in enumerate(self.free_block_ranges): | |
| available_blocks = end - start + 1 | |
| if available_blocks >= blocks_needed: | |
| # Found a suitable range | |
| base_block_id = start | |
| # Update free_block_ranges | |
| new_start = start + blocks_needed | |
| if new_start <= end: | |
| self.free_block_ranges[i] = (new_start, end) | |
| else: | |
| self.free_block_ranges.pop(i) | |
| # Add to allocated_blocks | |
| for j in range(blocks_needed): | |
| self.allocated_blocks.add(base_block_id + j) | |
| # Create memory block | |
| base_address = base_block_id * self.block_size_bytes | |
| memory_block = MemoryBlock( | |
| address=base_address, | |
| size=size_bytes, | |
| data=bytearray(size_bytes), # Allocate actual bytearray for data | |
| allocated_time=time.time(), | |
| last_accessed=time.time() | |
| ) | |
| self.memory_blocks[base_address] = memory_block | |
| return base_address | |
| return None # Out of memory | |
| def deallocate_block(self, address: int) -> bool: | |
| """Deallocate a block of memory.""" | |
| if address in self.memory_blocks: | |
| memory_block = self.memory_blocks[address] | |
| blocks_to_free = (memory_block.size + self.block_size_bytes - 1) // self.block_size_bytes | |
| base_block_id = address // self.block_size_bytes | |
| for i in range(blocks_to_free): | |
| block_id = base_block_id + i | |
| if block_id in self.allocated_blocks: | |
| self.allocated_blocks.remove(block_id) | |
| # Add back to free_block_ranges (simple merge for now) | |
| self.free_block_ranges.append((block_id, block_id)) | |
| self.free_block_ranges.sort() # Keep sorted for efficient merging | |
| del self.memory_blocks[address] | |
| return True | |
| return False | |
| def read_data(self, address: int, size: int) -> Optional[np.ndarray]: | |
| """Read data from memory.""" | |
| if address in self.memory_blocks: | |
| memory_block = self.memory_blocks[address] | |
| if memory_block.data is not None and size <= memory_block.size: | |
| return np.frombuffer(memory_block.data[:size], dtype=np.uint8) # Return as numpy array | |
| return None | |
| def write_data(self, address: int, data: Union[np.ndarray, bytes]) -> bool: | |
| """Write data to memory.""" | |
| if address in self.memory_blocks: | |
| memory_block = self.memory_blocks[address] | |
| if memory_block.data is not None: | |
| if isinstance(data, np.ndarray): | |
| data_bytes = data.tobytes() | |
| elif isinstance(data, bytes): | |
| data_bytes = data | |
| else: | |
| raise TypeError("Data must be a NumPy array or bytes.") | |
| if len(data_bytes) <= memory_block.size: | |
| memory_block.data[:len(data_bytes)] = data_bytes | |
| return True | |
| return False | |
| def get_memory_stats(self) -> Dict[str, Any]: | |
| """Get memory usage statistics.""" | |
| allocated_bytes = sum(block.size for block in self.memory_blocks.values()) | |
| free_bytes = self.total_memory_bytes - allocated_bytes | |
| return { | |
| "total_memory_gb": self.total_memory_bytes / (1024**3), | |
| "allocated_bytes": allocated_bytes, | |
| "free_bytes": free_bytes, | |
| "allocated_blocks_count": len(self.allocated_blocks), | |
| "free_block_ranges_count": len(self.free_block_ranges), | |
| "utilization_percent": (allocated_bytes / self.total_memory_bytes) * 100 if self.total_memory_bytes > 0 else 0 | |
| } | |
| class VRAM: | |
| """ | |
| Main VRAM class that provides the interface for the 500GB GDDR7 memory. | |
| This class combines the MemoryManager for low-level memory operations | |
| with higher-level abstractions like Framebuffers. | |
| """ | |
| def __init__(self, memory_size_gb: int = 500): | |
| self.memory_manager = MemoryManager(memory_size_gb) | |
| # Cache for frequently accessed data (simulates L1/L2 cache) | |
| self.cache_size = 1000 # Number of cache entries | |
| self.cache = OrderedDict() | |
| # Framebuffer registry | |
| self.framebuffers: Dict[str, Framebuffer] = {} | |
| self.framebuffer_counter = 0 | |
| # Texture registry | |
| self.textures: Dict[str, np.ndarray] = {} | |
| self.texture_counter = 0 | |
| def create_framebuffer(self, width: int, height: int, channels: int = 3, | |
| name: Optional[str] = None) -> str: | |
| """Create a new framebuffer and return its ID.""" | |
| if name is None: | |
| name = f"framebuffer_{self.framebuffer_counter}" | |
| self.framebuffer_counter += 1 | |
| framebuffer = Framebuffer(width, height, channels) | |
| # Allocate memory for the framebuffer | |
| memory_size = framebuffer.get_memory_usage() | |
| address = self.memory_manager.allocate_block(memory_size) | |
| if address is not None: | |
| framebuffer.vram_address = address | |
| self.framebuffers[name] = framebuffer | |
| return name | |
| else: | |
| raise MemoryError("Failed to allocate memory for framebuffer") | |
| def get_framebuffer(self, name: str) -> Optional[Framebuffer]: | |
| """Get a framebuffer by name.""" | |
| return self.framebuffers.get(name) | |
| def delete_framebuffer(self, name: str) -> bool: | |
| """Delete a framebuffer and free its memory.""" | |
| if name in self.framebuffers: | |
| framebuffer = self.framebuffers[name] | |
| if framebuffer.vram_address is not None: | |
| self.memory_manager.deallocate_block(framebuffer.vram_address) | |
| del self.framebuffers[name] | |
| return True | |
| return False | |
| def load_texture(self, texture_data: Union[np.ndarray, bytes], name: Optional[str] = None) -> str: | |
| """Load texture data into VRAM and return its ID.""" | |
| if name is None: | |
| name = f"texture_{self.texture_counter}" | |
| self.texture_counter += 1 | |
| size_bytes = 0 | |
| if isinstance(texture_data, np.ndarray): | |
| size_bytes = texture_data.nbytes | |
| elif isinstance(texture_data, bytes): | |
| size_bytes = len(texture_data) | |
| else: | |
| raise TypeError("Texture data must be a NumPy array or bytes.") | |
| # Allocate memory for the texture | |
| address = self.memory_manager.allocate_block(size_bytes) | |
| if address is not None: | |
| self.memory_manager.write_data(address, texture_data) # Write actual data | |
| self.textures[name] = texture_data # Store actual data for reference | |
| return name | |
| else: | |
| raise MemoryError("Failed to allocate memory for texture") | |
| def get_texture(self, name: str) -> Optional[np.ndarray]: | |
| """Get texture data by name.""" | |
| return self.textures.get(name) | |
| def cache_read(self, address: int, size: int) -> Optional[np.ndarray]: | |
| """Read data with caching support.""" | |
| cache_key = (address, size) | |
| # Check cache first | |
| if cache_key in self.cache: | |
| # Move to end (most recently used) | |
| data = self.cache.pop(cache_key) | |
| self.cache[cache_key] = data | |
| return data.copy() | |
| # Read from memory | |
| data = self.memory_manager.read_data(address, size) | |
| if data is not None: | |
| # Add to cache | |
| if len(self.cache) >= self.cache_size: | |
| # Remove least recently used item | |
| self.cache.popitem(last=False) | |
| self.cache[cache_key] = data.copy() | |
| return data | |
| def transfer_from_ram(self, name: str, data: Union[np.ndarray, bytes], | |
| delay_ms: float = 0.0) -> Optional[str]: | |
| """Transfer a block of data from RAM to VRAM.""" | |
| if isinstance(data, np.ndarray): | |
| size_bytes = data.nbytes | |
| data_to_store = data.flatten() | |
| elif isinstance(data, bytes): | |
| size_bytes = len(data) | |
| data_to_store = np.frombuffer(data, dtype=np.uint8) | |
| else: | |
| raise TypeError("Data must be a NumPy array or bytes.") | |
| # Simulate delay | |
| if delay_ms > 0: | |
| time.sleep(delay_ms / 1000.0) | |
| # Allocate memory in VRAM | |
| address = self.memory_manager.allocate_block(size_bytes) | |
| if address is not None: | |
| # Store data in VRAM | |
| self.memory_manager.write_data(address, data_to_store) | |
| # Register the transferred data as a texture/buffer in VRAM | |
| # For simplicity, we\"ll register it as a texture for now | |
| texture_id = f"ram_transfer_{self.texture_counter}" | |
| self.texture_counter += 1 | |
| self.textures[texture_id] = data # Store actual data for reference | |
| print(f"Transferred {size_bytes} bytes from RAM to VRAM at address {address} as {texture_id}") | |
| return texture_id | |
| else: | |
| print(f"Failed to transfer {size_bytes} bytes from RAM to VRAM: Out of VRAM memory.") | |
| return None | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive VRAM statistics.""" | |
| memory_stats = self.memory_manager.get_memory_stats() | |
| framebuffer_memory = sum(fb.get_memory_usage() for fb in self.framebuffers.values()) | |
| texture_memory = sum(tex.nbytes for tex in self.textures.values()) | |
| return { | |
| **memory_stats, | |
| "framebuffers_count": len(self.framebuffers), | |
| "textures_count": len(self.textures), | |
| "framebuffer_memory_bytes": framebuffer_memory, | |
| "texture_memory_bytes": texture_memory, | |
| "cache_entries": len(self.cache), | |
| "cache_hit_ratio": 0.0 # TODO: Implement cache hit tracking | |
| } | |
| if __name__ == "__main__": | |
| # Test the VRAM module | |
| vram = VRAM(memory_size_gb=1) # Use 1GB for testing | |
| # Create a framebuffer | |
| fb_id = vram.create_framebuffer(1920, 1080, 3) | |
| print(f"Created framebuffer: {fb_id}") | |
| # Get the framebuffer and modify it | |
| fb = vram.get_framebuffer(fb_id) | |
| if fb: | |
| fb.clear((255, 0, 0)) # Clear to red | |
| fb.set_pixel(100, 100, (0, 255, 0)) # Set a green pixel | |
| print(f"Framebuffer size: {fb.width}x{fb.height}") | |
| print(f"Pixel at (100, 100): {fb.get_pixel(100, 100)}") | |
| # Load a test texture | |
| test_texture = np.random.randint(0, 256, (256, 256, 3), dtype=np.uint8) | |
| tex_id = vram.load_texture(test_texture) | |
| print(f"Loaded texture: {tex_id}") | |
| # Test transfer_from_ram | |
| ram_data = b"\x01\x02\x03\x04\x05\x06\x07\x08" | |
| transferred_id = vram.transfer_from_ram("test_ram_data", ram_data, delay_ms=10) | |
| print(f"Transferred RAM data ID: {transferred_id}") | |
| # Print statistics | |
| stats = vram.get_stats() | |
| print(f"VRAM Stats: {stats}") | |