INV / virtual_gpu_driver /src /graphics /texture_buffer_manager.py
Fred808's picture
Upload 256 files
7a0c684 verified
"""
Advanced Texture and Buffer Management for Virtual GPU
Features:
- Mipmapped textures with automatic generation
- Multiple texture formats and compression
- Advanced filtering modes (bilinear, trilinear, anisotropic)
- Modern buffer types (uniform, storage, indirect)
- Full framebuffer support with MSAA
- Memory-efficient texture streaming
- Cache-aware memory layout
"""
import numpy as np
from enum import Enum, auto
import cv2 # for mipmap generation
from typing import List, Tuple, Optional, Union, Dict
import zlib # for texture compression
import time # for access tracking
from typing import List, Tuple, Optional, Union
import zlib # for texture compression
class TextureFormat(Enum):
R8 = auto() # 8-bit red channel
RG8 = auto() # 8-bit red-green
RGB8 = auto() # 8-bit RGB
RGBA8 = auto() # 8-bit RGBA
R16F = auto() # 16-bit float red
RGBA16F = auto() # 16-bit float RGBA
R32F = auto() # 32-bit float red
RGBA32F = auto() # 32-bit float RGBA
BC1 = auto() # Block compression (DXT1)
BC3 = auto() # Block compression (DXT5)
class FilterMode(Enum):
NEAREST = auto()
BILINEAR = auto()
TRILINEAR = auto()
ANISOTROPIC = auto()
class WrapMode(Enum):
REPEAT = auto()
CLAMP = auto()
MIRROR = auto()
class MipLevel:
def __init__(self, width: int, height: int, data: np.ndarray):
self.width = width
self.height = height
self.data = data
self.cache_hits = 0 # For cache analysis
class Texture:
def __init__(self, width: int, height: int, format: TextureFormat = TextureFormat.RGBA8,
filter_mode: FilterMode = FilterMode.BILINEAR,
wrap_mode: WrapMode = WrapMode.REPEAT,
generate_mipmaps: bool = True,
aniso_level: int = 1):
self.width = width
self.height = height
self.format = format
self.filter_mode = filter_mode
self.wrap_mode = wrap_mode
self.aniso_level = min(max(1, aniso_level), 16)
# Initialize main texture data
self.channels = self._get_channel_count()
self.dtype = self._get_data_type()
self.data = np.zeros((height, width, self.channels), dtype=self.dtype)
# Mipmap chain
self.mipmaps: List[MipLevel] = []
if generate_mipmaps:
self._generate_mipchain()
# Compression state
self.compressed = False
self.compressed_data = None
# Cache for sampling coordinates
self.sample_cache = {}
self.cache_size_limit = 1024 # Adjust based on memory constraints
def _get_channel_count(self) -> int:
return {
TextureFormat.R8: 1,
TextureFormat.RG8: 2,
TextureFormat.RGB8: 3,
TextureFormat.RGBA8: 4,
TextureFormat.R16F: 1,
TextureFormat.RGBA16F: 4,
TextureFormat.R32F: 1,
TextureFormat.RGBA32F: 4,
TextureFormat.BC1: 3,
TextureFormat.BC3: 4
}[self.format]
def _get_data_type(self) -> np.dtype:
if self.format in [TextureFormat.R8, TextureFormat.RG8, TextureFormat.RGB8, TextureFormat.RGBA8]:
return np.uint8
elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]:
return np.float16
return np.float32
def _generate_mipchain(self):
"""Generate complete mipmap chain using box filtering"""
self.mipmaps.clear()
current = self.data
while current.shape[0] > 1 and current.shape[1] > 1:
# Box filter downsampling
next_mip = cv2.resize(current,
(current.shape[1]//2, current.shape[0]//2),
interpolation=cv2.INTER_LINEAR)
self.mipmaps.append(MipLevel(next_mip.shape[1], next_mip.shape[0], next_mip))
current = next_mip
def compress(self):
"""Compress texture data using format-specific compression"""
if self.format in [TextureFormat.BC1, TextureFormat.BC3]:
# Block compression would go here
# For now, use basic zlib compression
self.compressed_data = zlib.compress(self.data.tobytes())
self.compressed = True
def decompress(self):
"""Decompress texture data"""
if self.compressed and self.compressed_data:
raw_data = zlib.decompress(self.compressed_data)
self.data = np.frombuffer(raw_data, dtype=self.dtype).reshape(
self.height, self.width, self.channels)
self.compressed = False
def upload(self, img: np.ndarray, generate_mipmaps: bool = True):
"""Upload new texture data and optionally regenerate mipmaps"""
assert img.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {img.shape}"
self.data[:] = img
if generate_mipmaps:
self._generate_mipchain()
# Clear sample cache
self.sample_cache.clear()
def _sample_nearest(self, u: float, v: float, mip_level: int = 0) -> np.ndarray:
"""Nearest neighbor sampling"""
if mip_level >= len(self.mipmaps):
data = self.data
else:
data = self.mipmaps[mip_level].data
# Apply wrap mode
if self.wrap_mode == WrapMode.REPEAT:
u = u % 1.0
v = v % 1.0
elif self.wrap_mode == WrapMode.CLAMP:
u = min(max(u, 0), 1)
v = min(max(v, 0), 1)
x = min(max(int(u * (data.shape[1]-1)), 0), data.shape[1]-1)
y = min(max(int(v * (data.shape[0]-1)), 0), data.shape[0]-1)
return data[y, x]
def _sample_bilinear(self, u: float, v: float, mip_level: int = 0) -> np.ndarray:
"""Bilinear texture sampling"""
# Check cache first
cache_key = (u, v, mip_level)
if cache_key in self.sample_cache:
return self.sample_cache[cache_key]
if mip_level >= len(self.mipmaps):
data = self.data
else:
data = self.mipmaps[mip_level].data
# Apply wrap mode
if self.wrap_mode == WrapMode.REPEAT:
u = u % 1.0
v = v % 1.0
elif self.wrap_mode == WrapMode.CLAMP:
u = min(max(u, 0), 1)
v = min(max(v, 0), 1)
# Calculate sample coordinates
x = u * (data.shape[1] - 1)
y = v * (data.shape[0] - 1)
x0, y0 = int(x), int(y)
x1, y1 = min(x0 + 1, data.shape[1] - 1), min(y0 + 1, data.shape[0] - 1)
# Calculate interpolation weights
wx = x - x0
wy = y - y0
# Sample four nearest texels
c00 = data[y0, x0]
c10 = data[y0, x1]
c01 = data[y1, x0]
c11 = data[y1, x1]
# Bilinear interpolation
result = (c00 * (1-wx) * (1-wy) +
c10 * wx * (1-wy) +
c01 * (1-wx) * wy +
c11 * wx * wy)
# Cache result
if len(self.sample_cache) < self.cache_size_limit:
self.sample_cache[cache_key] = result
return result
def sample(self, u: float, v: float) -> np.ndarray:
"""Sample texture with current filter mode"""
if self.compressed:
self.decompress()
# Calculate mip level for trilinear/anisotropic
mip_level = 0
if self.filter_mode in [FilterMode.TRILINEAR, FilterMode.ANISOTROPIC]:
# Basic mip level selection based on texture coordinate derivatives
# In a real GPU, this would use screen-space derivatives
mip_level = min(len(self.mipmaps)-1, int(max(0, -np.log2(max(u, v)))))
# Apply filtering mode
if self.filter_mode == FilterMode.NEAREST:
return self._sample_nearest(u, v, mip_level)
elif self.filter_mode == FilterMode.BILINEAR:
return self._sample_bilinear(u, v, mip_level)
elif self.filter_mode == FilterMode.TRILINEAR:
# Interpolate between two mip levels
if mip_level < len(self.mipmaps):
low_mip = self._sample_bilinear(u, v, mip_level)
high_mip = self._sample_bilinear(u, v, mip_level + 1)
factor = max(0, -np.log2(max(u, v))) - mip_level
return low_mip * (1-factor) + high_mip * factor
return self._sample_bilinear(u, v, mip_level)
else: # ANISOTROPIC
# Simple anisotropic approximation - average multiple samples
samples = []
for i in range(self.aniso_level):
offset = i / (self.aniso_level - 1) - 0.5
samples.append(self._sample_bilinear(u + offset*0.001, v, mip_level))
return np.mean(samples, axis=0)
class BufferType(Enum):
VERTEX = auto() # Vertex data
INDEX = auto() # Index data
UNIFORM = auto() # Uniform buffer
STORAGE = auto() # Storage buffer
INDIRECT = auto() # Indirect draw commands
class Buffer:
def __init__(self, data: np.ndarray, buffer_type: BufferType,
dynamic: bool = False, map_write: bool = False):
"""
Initialize buffer with specific type and usage flags
Args:
data: Initial buffer data
buffer_type: Type of buffer (vertex, index, uniform, etc)
dynamic: Whether buffer will be frequently updated
map_write: Whether buffer should be mappable for CPU writes
"""
self.buffer_type = buffer_type
self.dynamic = dynamic
self.map_write = map_write
self.mapped = False
# Main storage
self.data = np.array(data)
# Shadow buffer for mapped writes
self.shadow_buffer = None if not map_write else np.array(data)
# Cache alignment and striding
self.stride = self._calculate_stride()
self.aligned_size = self._align_size(self.data.nbytes)
# Usage tracking
self.access_count = 0
self.last_access = 0
def _calculate_stride(self) -> int:
"""Calculate optimal stride for the buffer type"""
base_stride = self.data.itemsize * self.data.shape[-1]
# Align to 16 bytes for modern GPUs
return ((base_stride + 15) // 16) * 16
def _align_size(self, size: int) -> int:
"""Align buffer size to GPU-friendly boundaries"""
return ((size + 255) // 256) * 256
def upload(self, data: np.ndarray):
"""Upload new data to buffer"""
assert data.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {data.shape}"
if self.mapped:
raise RuntimeError("Cannot upload to mapped buffer")
self.data[:] = data
self.access_count += 1
self.last_access = time.time()
def map(self) -> Optional[np.ndarray]:
"""Map buffer for CPU access"""
if not self.map_write:
raise RuntimeError("Buffer not created with map_write flag")
if self.mapped:
raise RuntimeError("Buffer already mapped")
self.mapped = True
self.shadow_buffer[:] = self.data
return self.shadow_buffer
def unmap(self):
"""Unmap buffer and apply changes"""
if not self.mapped:
raise RuntimeError("Buffer not mapped")
self.data[:] = self.shadow_buffer
self.mapped = False
self.access_count += 1
self.last_access = time.time()
def get(self) -> np.ndarray:
"""Get buffer data"""
if self.mapped:
raise RuntimeError("Cannot read from mapped buffer")
self.access_count += 1
self.last_access = time.time()
return self.data
class AttachmentType(Enum):
COLOR = auto() # Color attachment
DEPTH = auto() # Depth attachment
STENCIL = auto() # Stencil attachment
DEPTH_STENCIL = auto() # Combined depth-stencil
class MSAASamples(Enum):
MSAA_1X = 1 # No multisampling
MSAA_2X = 2 # 2x multisampling
MSAA_4X = 4 # 4x multisampling
MSAA_8X = 8 # 8x multisampling
class Attachment:
def __init__(self, width: int, height: int,
attachment_type: AttachmentType,
format: TextureFormat = TextureFormat.RGBA8,
samples: MSAASamples = MSAASamples.MSAA_1X):
self.width = width
self.height = height
self.attachment_type = attachment_type
self.format = format
self.samples = samples
# Initialize storage based on type and format
channels = 4 if format in [TextureFormat.RGBA8, TextureFormat.RGBA16F, TextureFormat.RGBA32F] else 1
if samples == MSAASamples.MSAA_1X:
self.data = np.zeros((height, width, channels), dtype=self._get_dtype())
else:
self.data = np.zeros((height, width, samples.value, channels), dtype=self._get_dtype())
def _get_dtype(self) -> np.dtype:
if self.format in [TextureFormat.R8, TextureFormat.RGBA8]:
return np.uint8
elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]:
return np.float16
return np.float32
class Framebuffer:
def __init__(self, width: int, height: int,
color_formats: List[TextureFormat] = [TextureFormat.RGBA8],
depth_format: Optional[TextureFormat] = TextureFormat.R32F,
stencil_format: Optional[TextureFormat] = TextureFormat.R8,
samples: MSAASamples = MSAASamples.MSAA_1X):
"""
Initialize framebuffer with multiple render targets, depth, and stencil
Args:
width: Framebuffer width
height: Framebuffer height
color_formats: List of formats for color attachments
depth_format: Format for depth attachment (None to disable)
stencil_format: Format for stencil attachment (None to disable)
samples: MSAA sample count
"""
self.width = width
self.height = height
self.samples = samples
# Create color attachments
self.color_attachments = [
Attachment(width, height, AttachmentType.COLOR, format, samples)
for format in color_formats
]
# Create depth attachment
self.depth_attachment = (
Attachment(width, height, AttachmentType.DEPTH, depth_format, samples)
if depth_format else None
)
# Create stencil attachment
self.stencil_attachment = (
Attachment(width, height, AttachmentType.STENCIL, stencil_format, samples)
if stencil_format else None
)
# Active color attachment
self.active_color = 0
# State tracking
self.clear_values = {
AttachmentType.COLOR: np.zeros(4),
AttachmentType.DEPTH: 1.0,
AttachmentType.STENCIL: 0
}
def bind(self, color_attachment: int = 0):
"""Bind specific color attachment for writing"""
assert 0 <= color_attachment < len(self.color_attachments)
self.active_color = color_attachment
def set_clear_values(self, color=None, depth=None, stencil=None):
"""Set clear values for attachments"""
if color is not None:
self.clear_values[AttachmentType.COLOR] = np.array(color)
if depth is not None:
self.clear_values[AttachmentType.DEPTH] = depth
if stencil is not None:
self.clear_values[AttachmentType.STENCIL] = stencil
def clear(self, color=True, depth=True, stencil=True):
"""Clear specified attachments"""
if color:
for attachment in self.color_attachments:
if self.samples == MSAASamples.MSAA_1X:
attachment.data.fill(self.clear_values[AttachmentType.COLOR])
else:
for s in range(self.samples.value):
attachment.data[..., s, :] = self.clear_values[AttachmentType.COLOR]
if depth and self.depth_attachment:
if self.samples == MSAASamples.MSAA_1X:
self.depth_attachment.data.fill(self.clear_values[AttachmentType.DEPTH])
else:
for s in range(self.samples.value):
self.depth_attachment.data[..., s, 0] = self.clear_values[AttachmentType.DEPTH]
if stencil and self.stencil_attachment:
if self.samples == MSAASamples.MSAA_1X:
self.stencil_attachment.data.fill(self.clear_values[AttachmentType.STENCIL])
else:
for s in range(self.samples.value):
self.stencil_attachment.data[..., s, 0] = self.clear_values[AttachmentType.STENCIL]
def write_color(self, x: int, y: int, color: np.ndarray, sample: int = 0):
"""Write color value to current attachment"""
attachment = self.color_attachments[self.active_color]
if self.samples == MSAASamples.MSAA_1X:
attachment.data[y, x] = color
else:
attachment.data[y, x, sample] = color
def write_depth(self, x: int, y: int, depth: float, sample: int = 0):
"""Write depth value"""
if self.depth_attachment:
if self.samples == MSAASamples.MSAA_1X:
self.depth_attachment.data[y, x] = depth
else:
self.depth_attachment.data[y, x, sample] = depth
def write_stencil(self, x: int, y: int, stencil: int, sample: int = 0):
"""Write stencil value"""
if self.stencil_attachment:
if self.samples == MSAASamples.MSAA_1X:
self.stencil_attachment.data[y, x] = stencil
else:
self.stencil_attachment.data[y, x, sample] = stencil
def read_color(self, x: int, y: int, attachment: int = 0) -> np.ndarray:
"""Read color value (resolves MSAA if needed)"""
attachment = self.color_attachments[attachment]
if self.samples == MSAASamples.MSAA_1X:
return attachment.data[y, x]
# Resolve MSAA by averaging samples
return np.mean(attachment.data[y, x], axis=0)
def read_depth(self, x: int, y: int) -> float:
"""Read depth value (resolves MSAA if needed)"""
if not self.depth_attachment:
return 1.0
if self.samples == MSAASamples.MSAA_1X:
return self.depth_attachment.data[y, x]
return np.mean(self.depth_attachment.data[y, x])
def read_stencil(self, x: int, y: int) -> int:
"""Read stencil value (uses first sample in MSAA)"""
if not self.stencil_attachment:
return 0
if self.samples == MSAASamples.MSAA_1X:
return self.stencil_attachment.data[y, x]
return self.stencil_attachment.data[y, x, 0]
def resolve_msaa(self) -> List[np.ndarray]:
"""Resolve MSAA framebuffer to non-MSAA textures"""
if self.samples == MSAASamples.MSAA_1X:
return [attachment.data for attachment in self.color_attachments]
resolved = []
for attachment in self.color_attachments:
# Average across samples
resolved.append(np.mean(attachment.data, axis=2))
return resolved