""" Advanced Texture and Buffer Management for Virtual GPU Features: - Mipmapped textures with automatic generation - Multiple texture formats and compression - Advanced filtering modes (bilinear, trilinear, anisotropic) - Modern buffer types (uniform, storage, indirect) - Full framebuffer support with MSAA - Memory-efficient texture streaming - Cache-aware memory layout """ import numpy as np from enum import Enum, auto import cv2 # for mipmap generation from typing import List, Tuple, Optional, Union, Dict import zlib # for texture compression import time # for access tracking from typing import List, Tuple, Optional, Union import zlib # for texture compression class TextureFormat(Enum): R8 = auto() # 8-bit red channel RG8 = auto() # 8-bit red-green RGB8 = auto() # 8-bit RGB RGBA8 = auto() # 8-bit RGBA R16F = auto() # 16-bit float red RGBA16F = auto() # 16-bit float RGBA R32F = auto() # 32-bit float red RGBA32F = auto() # 32-bit float RGBA BC1 = auto() # Block compression (DXT1) BC3 = auto() # Block compression (DXT5) class FilterMode(Enum): NEAREST = auto() BILINEAR = auto() TRILINEAR = auto() ANISOTROPIC = auto() class WrapMode(Enum): REPEAT = auto() CLAMP = auto() MIRROR = auto() class MipLevel: def __init__(self, width: int, height: int, data: np.ndarray): self.width = width self.height = height self.data = data self.cache_hits = 0 # For cache analysis class Texture: def __init__(self, width: int, height: int, format: TextureFormat = TextureFormat.RGBA8, filter_mode: FilterMode = FilterMode.BILINEAR, wrap_mode: WrapMode = WrapMode.REPEAT, generate_mipmaps: bool = True, aniso_level: int = 1): self.width = width self.height = height self.format = format self.filter_mode = filter_mode self.wrap_mode = wrap_mode self.aniso_level = min(max(1, aniso_level), 16) # Initialize main texture data self.channels = self._get_channel_count() self.dtype = self._get_data_type() self.data = np.zeros((height, width, self.channels), dtype=self.dtype) # Mipmap chain self.mipmaps: List[MipLevel] = [] if generate_mipmaps: self._generate_mipchain() # Compression state self.compressed = False self.compressed_data = None # Cache for sampling coordinates self.sample_cache = {} self.cache_size_limit = 1024 # Adjust based on memory constraints def _get_channel_count(self) -> int: return { TextureFormat.R8: 1, TextureFormat.RG8: 2, TextureFormat.RGB8: 3, TextureFormat.RGBA8: 4, TextureFormat.R16F: 1, TextureFormat.RGBA16F: 4, TextureFormat.R32F: 1, TextureFormat.RGBA32F: 4, TextureFormat.BC1: 3, TextureFormat.BC3: 4 }[self.format] def _get_data_type(self) -> np.dtype: if self.format in [TextureFormat.R8, TextureFormat.RG8, TextureFormat.RGB8, TextureFormat.RGBA8]: return np.uint8 elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]: return np.float16 return np.float32 def _generate_mipchain(self): """Generate complete mipmap chain using box filtering""" self.mipmaps.clear() current = self.data while current.shape[0] > 1 and current.shape[1] > 1: # Box filter downsampling next_mip = cv2.resize(current, (current.shape[1]//2, current.shape[0]//2), interpolation=cv2.INTER_LINEAR) self.mipmaps.append(MipLevel(next_mip.shape[1], next_mip.shape[0], next_mip)) current = next_mip def compress(self): """Compress texture data using format-specific compression""" if self.format in [TextureFormat.BC1, TextureFormat.BC3]: # Block compression would go here # For now, use basic zlib compression self.compressed_data = zlib.compress(self.data.tobytes()) self.compressed = True def decompress(self): """Decompress texture data""" if self.compressed and self.compressed_data: raw_data = zlib.decompress(self.compressed_data) self.data = np.frombuffer(raw_data, dtype=self.dtype).reshape( self.height, self.width, self.channels) self.compressed = False def upload(self, img: np.ndarray, generate_mipmaps: bool = True): """Upload new texture data and optionally regenerate mipmaps""" assert img.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {img.shape}" self.data[:] = img if generate_mipmaps: self._generate_mipchain() # Clear sample cache self.sample_cache.clear() def _sample_nearest(self, u: float, v: float, mip_level: int = 0) -> np.ndarray: """Nearest neighbor sampling""" if mip_level >= len(self.mipmaps): data = self.data else: data = self.mipmaps[mip_level].data # Apply wrap mode if self.wrap_mode == WrapMode.REPEAT: u = u % 1.0 v = v % 1.0 elif self.wrap_mode == WrapMode.CLAMP: u = min(max(u, 0), 1) v = min(max(v, 0), 1) x = min(max(int(u * (data.shape[1]-1)), 0), data.shape[1]-1) y = min(max(int(v * (data.shape[0]-1)), 0), data.shape[0]-1) return data[y, x] def _sample_bilinear(self, u: float, v: float, mip_level: int = 0) -> np.ndarray: """Bilinear texture sampling""" # Check cache first cache_key = (u, v, mip_level) if cache_key in self.sample_cache: return self.sample_cache[cache_key] if mip_level >= len(self.mipmaps): data = self.data else: data = self.mipmaps[mip_level].data # Apply wrap mode if self.wrap_mode == WrapMode.REPEAT: u = u % 1.0 v = v % 1.0 elif self.wrap_mode == WrapMode.CLAMP: u = min(max(u, 0), 1) v = min(max(v, 0), 1) # Calculate sample coordinates x = u * (data.shape[1] - 1) y = v * (data.shape[0] - 1) x0, y0 = int(x), int(y) x1, y1 = min(x0 + 1, data.shape[1] - 1), min(y0 + 1, data.shape[0] - 1) # Calculate interpolation weights wx = x - x0 wy = y - y0 # Sample four nearest texels c00 = data[y0, x0] c10 = data[y0, x1] c01 = data[y1, x0] c11 = data[y1, x1] # Bilinear interpolation result = (c00 * (1-wx) * (1-wy) + c10 * wx * (1-wy) + c01 * (1-wx) * wy + c11 * wx * wy) # Cache result if len(self.sample_cache) < self.cache_size_limit: self.sample_cache[cache_key] = result return result def sample(self, u: float, v: float) -> np.ndarray: """Sample texture with current filter mode""" if self.compressed: self.decompress() # Calculate mip level for trilinear/anisotropic mip_level = 0 if self.filter_mode in [FilterMode.TRILINEAR, FilterMode.ANISOTROPIC]: # Basic mip level selection based on texture coordinate derivatives # In a real GPU, this would use screen-space derivatives mip_level = min(len(self.mipmaps)-1, int(max(0, -np.log2(max(u, v))))) # Apply filtering mode if self.filter_mode == FilterMode.NEAREST: return self._sample_nearest(u, v, mip_level) elif self.filter_mode == FilterMode.BILINEAR: return self._sample_bilinear(u, v, mip_level) elif self.filter_mode == FilterMode.TRILINEAR: # Interpolate between two mip levels if mip_level < len(self.mipmaps): low_mip = self._sample_bilinear(u, v, mip_level) high_mip = self._sample_bilinear(u, v, mip_level + 1) factor = max(0, -np.log2(max(u, v))) - mip_level return low_mip * (1-factor) + high_mip * factor return self._sample_bilinear(u, v, mip_level) else: # ANISOTROPIC # Simple anisotropic approximation - average multiple samples samples = [] for i in range(self.aniso_level): offset = i / (self.aniso_level - 1) - 0.5 samples.append(self._sample_bilinear(u + offset*0.001, v, mip_level)) return np.mean(samples, axis=0) class BufferType(Enum): VERTEX = auto() # Vertex data INDEX = auto() # Index data UNIFORM = auto() # Uniform buffer STORAGE = auto() # Storage buffer INDIRECT = auto() # Indirect draw commands class Buffer: def __init__(self, data: np.ndarray, buffer_type: BufferType, dynamic: bool = False, map_write: bool = False): """ Initialize buffer with specific type and usage flags Args: data: Initial buffer data buffer_type: Type of buffer (vertex, index, uniform, etc) dynamic: Whether buffer will be frequently updated map_write: Whether buffer should be mappable for CPU writes """ self.buffer_type = buffer_type self.dynamic = dynamic self.map_write = map_write self.mapped = False # Main storage self.data = np.array(data) # Shadow buffer for mapped writes self.shadow_buffer = None if not map_write else np.array(data) # Cache alignment and striding self.stride = self._calculate_stride() self.aligned_size = self._align_size(self.data.nbytes) # Usage tracking self.access_count = 0 self.last_access = 0 def _calculate_stride(self) -> int: """Calculate optimal stride for the buffer type""" base_stride = self.data.itemsize * self.data.shape[-1] # Align to 16 bytes for modern GPUs return ((base_stride + 15) // 16) * 16 def _align_size(self, size: int) -> int: """Align buffer size to GPU-friendly boundaries""" return ((size + 255) // 256) * 256 def upload(self, data: np.ndarray): """Upload new data to buffer""" assert data.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {data.shape}" if self.mapped: raise RuntimeError("Cannot upload to mapped buffer") self.data[:] = data self.access_count += 1 self.last_access = time.time() def map(self) -> Optional[np.ndarray]: """Map buffer for CPU access""" if not self.map_write: raise RuntimeError("Buffer not created with map_write flag") if self.mapped: raise RuntimeError("Buffer already mapped") self.mapped = True self.shadow_buffer[:] = self.data return self.shadow_buffer def unmap(self): """Unmap buffer and apply changes""" if not self.mapped: raise RuntimeError("Buffer not mapped") self.data[:] = self.shadow_buffer self.mapped = False self.access_count += 1 self.last_access = time.time() def get(self) -> np.ndarray: """Get buffer data""" if self.mapped: raise RuntimeError("Cannot read from mapped buffer") self.access_count += 1 self.last_access = time.time() return self.data class AttachmentType(Enum): COLOR = auto() # Color attachment DEPTH = auto() # Depth attachment STENCIL = auto() # Stencil attachment DEPTH_STENCIL = auto() # Combined depth-stencil class MSAASamples(Enum): MSAA_1X = 1 # No multisampling MSAA_2X = 2 # 2x multisampling MSAA_4X = 4 # 4x multisampling MSAA_8X = 8 # 8x multisampling class Attachment: def __init__(self, width: int, height: int, attachment_type: AttachmentType, format: TextureFormat = TextureFormat.RGBA8, samples: MSAASamples = MSAASamples.MSAA_1X): self.width = width self.height = height self.attachment_type = attachment_type self.format = format self.samples = samples # Initialize storage based on type and format channels = 4 if format in [TextureFormat.RGBA8, TextureFormat.RGBA16F, TextureFormat.RGBA32F] else 1 if samples == MSAASamples.MSAA_1X: self.data = np.zeros((height, width, channels), dtype=self._get_dtype()) else: self.data = np.zeros((height, width, samples.value, channels), dtype=self._get_dtype()) def _get_dtype(self) -> np.dtype: if self.format in [TextureFormat.R8, TextureFormat.RGBA8]: return np.uint8 elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]: return np.float16 return np.float32 class Framebuffer: def __init__(self, width: int, height: int, color_formats: List[TextureFormat] = [TextureFormat.RGBA8], depth_format: Optional[TextureFormat] = TextureFormat.R32F, stencil_format: Optional[TextureFormat] = TextureFormat.R8, samples: MSAASamples = MSAASamples.MSAA_1X): """ Initialize framebuffer with multiple render targets, depth, and stencil Args: width: Framebuffer width height: Framebuffer height color_formats: List of formats for color attachments depth_format: Format for depth attachment (None to disable) stencil_format: Format for stencil attachment (None to disable) samples: MSAA sample count """ self.width = width self.height = height self.samples = samples # Create color attachments self.color_attachments = [ Attachment(width, height, AttachmentType.COLOR, format, samples) for format in color_formats ] # Create depth attachment self.depth_attachment = ( Attachment(width, height, AttachmentType.DEPTH, depth_format, samples) if depth_format else None ) # Create stencil attachment self.stencil_attachment = ( Attachment(width, height, AttachmentType.STENCIL, stencil_format, samples) if stencil_format else None ) # Active color attachment self.active_color = 0 # State tracking self.clear_values = { AttachmentType.COLOR: np.zeros(4), AttachmentType.DEPTH: 1.0, AttachmentType.STENCIL: 0 } def bind(self, color_attachment: int = 0): """Bind specific color attachment for writing""" assert 0 <= color_attachment < len(self.color_attachments) self.active_color = color_attachment def set_clear_values(self, color=None, depth=None, stencil=None): """Set clear values for attachments""" if color is not None: self.clear_values[AttachmentType.COLOR] = np.array(color) if depth is not None: self.clear_values[AttachmentType.DEPTH] = depth if stencil is not None: self.clear_values[AttachmentType.STENCIL] = stencil def clear(self, color=True, depth=True, stencil=True): """Clear specified attachments""" if color: for attachment in self.color_attachments: if self.samples == MSAASamples.MSAA_1X: attachment.data.fill(self.clear_values[AttachmentType.COLOR]) else: for s in range(self.samples.value): attachment.data[..., s, :] = self.clear_values[AttachmentType.COLOR] if depth and self.depth_attachment: if self.samples == MSAASamples.MSAA_1X: self.depth_attachment.data.fill(self.clear_values[AttachmentType.DEPTH]) else: for s in range(self.samples.value): self.depth_attachment.data[..., s, 0] = self.clear_values[AttachmentType.DEPTH] if stencil and self.stencil_attachment: if self.samples == MSAASamples.MSAA_1X: self.stencil_attachment.data.fill(self.clear_values[AttachmentType.STENCIL]) else: for s in range(self.samples.value): self.stencil_attachment.data[..., s, 0] = self.clear_values[AttachmentType.STENCIL] def write_color(self, x: int, y: int, color: np.ndarray, sample: int = 0): """Write color value to current attachment""" attachment = self.color_attachments[self.active_color] if self.samples == MSAASamples.MSAA_1X: attachment.data[y, x] = color else: attachment.data[y, x, sample] = color def write_depth(self, x: int, y: int, depth: float, sample: int = 0): """Write depth value""" if self.depth_attachment: if self.samples == MSAASamples.MSAA_1X: self.depth_attachment.data[y, x] = depth else: self.depth_attachment.data[y, x, sample] = depth def write_stencil(self, x: int, y: int, stencil: int, sample: int = 0): """Write stencil value""" if self.stencil_attachment: if self.samples == MSAASamples.MSAA_1X: self.stencil_attachment.data[y, x] = stencil else: self.stencil_attachment.data[y, x, sample] = stencil def read_color(self, x: int, y: int, attachment: int = 0) -> np.ndarray: """Read color value (resolves MSAA if needed)""" attachment = self.color_attachments[attachment] if self.samples == MSAASamples.MSAA_1X: return attachment.data[y, x] # Resolve MSAA by averaging samples return np.mean(attachment.data[y, x], axis=0) def read_depth(self, x: int, y: int) -> float: """Read depth value (resolves MSAA if needed)""" if not self.depth_attachment: return 1.0 if self.samples == MSAASamples.MSAA_1X: return self.depth_attachment.data[y, x] return np.mean(self.depth_attachment.data[y, x]) def read_stencil(self, x: int, y: int) -> int: """Read stencil value (uses first sample in MSAA)""" if not self.stencil_attachment: return 0 if self.samples == MSAASamples.MSAA_1X: return self.stencil_attachment.data[y, x] return self.stencil_attachment.data[y, x, 0] def resolve_msaa(self) -> List[np.ndarray]: """Resolve MSAA framebuffer to non-MSAA textures""" if self.samples == MSAASamples.MSAA_1X: return [attachment.data for attachment in self.color_attachments] resolved = [] for attachment in self.color_attachments: # Average across samples resolved.append(np.mean(attachment.data, axis=2)) return resolved