|
|
"""
|
|
|
Advanced Texture and Buffer Management for Virtual GPU
|
|
|
Features:
|
|
|
- Mipmapped textures with automatic generation
|
|
|
- Multiple texture formats and compression
|
|
|
- Advanced filtering modes (bilinear, trilinear, anisotropic)
|
|
|
- Modern buffer types (uniform, storage, indirect)
|
|
|
- Full framebuffer support with MSAA
|
|
|
- Memory-efficient texture streaming
|
|
|
- Cache-aware memory layout
|
|
|
"""
|
|
|
import numpy as np
|
|
|
from enum import Enum, auto
|
|
|
import cv2
|
|
|
from typing import List, Tuple, Optional, Union, Dict
|
|
|
import zlib
|
|
|
import time
|
|
|
from typing import List, Tuple, Optional, Union
|
|
|
import zlib
|
|
|
|
|
|
class TextureFormat(Enum):
|
|
|
R8 = auto()
|
|
|
RG8 = auto()
|
|
|
RGB8 = auto()
|
|
|
RGBA8 = auto()
|
|
|
R16F = auto()
|
|
|
RGBA16F = auto()
|
|
|
R32F = auto()
|
|
|
RGBA32F = auto()
|
|
|
BC1 = auto()
|
|
|
BC3 = auto()
|
|
|
|
|
|
class FilterMode(Enum):
|
|
|
NEAREST = auto()
|
|
|
BILINEAR = auto()
|
|
|
TRILINEAR = auto()
|
|
|
ANISOTROPIC = auto()
|
|
|
|
|
|
class WrapMode(Enum):
|
|
|
REPEAT = auto()
|
|
|
CLAMP = auto()
|
|
|
MIRROR = auto()
|
|
|
|
|
|
class MipLevel:
|
|
|
def __init__(self, width: int, height: int, data: np.ndarray):
|
|
|
self.width = width
|
|
|
self.height = height
|
|
|
self.data = data
|
|
|
self.cache_hits = 0
|
|
|
|
|
|
class Texture:
|
|
|
def __init__(self, width: int, height: int, format: TextureFormat = TextureFormat.RGBA8,
|
|
|
filter_mode: FilterMode = FilterMode.BILINEAR,
|
|
|
wrap_mode: WrapMode = WrapMode.REPEAT,
|
|
|
generate_mipmaps: bool = True,
|
|
|
aniso_level: int = 1):
|
|
|
self.width = width
|
|
|
self.height = height
|
|
|
self.format = format
|
|
|
self.filter_mode = filter_mode
|
|
|
self.wrap_mode = wrap_mode
|
|
|
self.aniso_level = min(max(1, aniso_level), 16)
|
|
|
|
|
|
|
|
|
self.channels = self._get_channel_count()
|
|
|
self.dtype = self._get_data_type()
|
|
|
self.data = np.zeros((height, width, self.channels), dtype=self.dtype)
|
|
|
|
|
|
|
|
|
self.mipmaps: List[MipLevel] = []
|
|
|
if generate_mipmaps:
|
|
|
self._generate_mipchain()
|
|
|
|
|
|
|
|
|
self.compressed = False
|
|
|
self.compressed_data = None
|
|
|
|
|
|
|
|
|
self.sample_cache = {}
|
|
|
self.cache_size_limit = 1024
|
|
|
|
|
|
def _get_channel_count(self) -> int:
|
|
|
return {
|
|
|
TextureFormat.R8: 1,
|
|
|
TextureFormat.RG8: 2,
|
|
|
TextureFormat.RGB8: 3,
|
|
|
TextureFormat.RGBA8: 4,
|
|
|
TextureFormat.R16F: 1,
|
|
|
TextureFormat.RGBA16F: 4,
|
|
|
TextureFormat.R32F: 1,
|
|
|
TextureFormat.RGBA32F: 4,
|
|
|
TextureFormat.BC1: 3,
|
|
|
TextureFormat.BC3: 4
|
|
|
}[self.format]
|
|
|
|
|
|
def _get_data_type(self) -> np.dtype:
|
|
|
if self.format in [TextureFormat.R8, TextureFormat.RG8, TextureFormat.RGB8, TextureFormat.RGBA8]:
|
|
|
return np.uint8
|
|
|
elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]:
|
|
|
return np.float16
|
|
|
return np.float32
|
|
|
|
|
|
def _generate_mipchain(self):
|
|
|
"""Generate complete mipmap chain using box filtering"""
|
|
|
self.mipmaps.clear()
|
|
|
current = self.data
|
|
|
|
|
|
while current.shape[0] > 1 and current.shape[1] > 1:
|
|
|
|
|
|
next_mip = cv2.resize(current,
|
|
|
(current.shape[1]//2, current.shape[0]//2),
|
|
|
interpolation=cv2.INTER_LINEAR)
|
|
|
self.mipmaps.append(MipLevel(next_mip.shape[1], next_mip.shape[0], next_mip))
|
|
|
current = next_mip
|
|
|
|
|
|
def compress(self):
|
|
|
"""Compress texture data using format-specific compression"""
|
|
|
if self.format in [TextureFormat.BC1, TextureFormat.BC3]:
|
|
|
|
|
|
|
|
|
self.compressed_data = zlib.compress(self.data.tobytes())
|
|
|
self.compressed = True
|
|
|
|
|
|
def decompress(self):
|
|
|
"""Decompress texture data"""
|
|
|
if self.compressed and self.compressed_data:
|
|
|
raw_data = zlib.decompress(self.compressed_data)
|
|
|
self.data = np.frombuffer(raw_data, dtype=self.dtype).reshape(
|
|
|
self.height, self.width, self.channels)
|
|
|
self.compressed = False
|
|
|
|
|
|
def upload(self, img: np.ndarray, generate_mipmaps: bool = True):
|
|
|
"""Upload new texture data and optionally regenerate mipmaps"""
|
|
|
assert img.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {img.shape}"
|
|
|
self.data[:] = img
|
|
|
|
|
|
if generate_mipmaps:
|
|
|
self._generate_mipchain()
|
|
|
|
|
|
|
|
|
self.sample_cache.clear()
|
|
|
|
|
|
def _sample_nearest(self, u: float, v: float, mip_level: int = 0) -> np.ndarray:
|
|
|
"""Nearest neighbor sampling"""
|
|
|
if mip_level >= len(self.mipmaps):
|
|
|
data = self.data
|
|
|
else:
|
|
|
data = self.mipmaps[mip_level].data
|
|
|
|
|
|
|
|
|
if self.wrap_mode == WrapMode.REPEAT:
|
|
|
u = u % 1.0
|
|
|
v = v % 1.0
|
|
|
elif self.wrap_mode == WrapMode.CLAMP:
|
|
|
u = min(max(u, 0), 1)
|
|
|
v = min(max(v, 0), 1)
|
|
|
|
|
|
x = min(max(int(u * (data.shape[1]-1)), 0), data.shape[1]-1)
|
|
|
y = min(max(int(v * (data.shape[0]-1)), 0), data.shape[0]-1)
|
|
|
return data[y, x]
|
|
|
|
|
|
def _sample_bilinear(self, u: float, v: float, mip_level: int = 0) -> np.ndarray:
|
|
|
"""Bilinear texture sampling"""
|
|
|
|
|
|
cache_key = (u, v, mip_level)
|
|
|
if cache_key in self.sample_cache:
|
|
|
return self.sample_cache[cache_key]
|
|
|
|
|
|
if mip_level >= len(self.mipmaps):
|
|
|
data = self.data
|
|
|
else:
|
|
|
data = self.mipmaps[mip_level].data
|
|
|
|
|
|
|
|
|
if self.wrap_mode == WrapMode.REPEAT:
|
|
|
u = u % 1.0
|
|
|
v = v % 1.0
|
|
|
elif self.wrap_mode == WrapMode.CLAMP:
|
|
|
u = min(max(u, 0), 1)
|
|
|
v = min(max(v, 0), 1)
|
|
|
|
|
|
|
|
|
x = u * (data.shape[1] - 1)
|
|
|
y = v * (data.shape[0] - 1)
|
|
|
x0, y0 = int(x), int(y)
|
|
|
x1, y1 = min(x0 + 1, data.shape[1] - 1), min(y0 + 1, data.shape[0] - 1)
|
|
|
|
|
|
|
|
|
wx = x - x0
|
|
|
wy = y - y0
|
|
|
|
|
|
|
|
|
c00 = data[y0, x0]
|
|
|
c10 = data[y0, x1]
|
|
|
c01 = data[y1, x0]
|
|
|
c11 = data[y1, x1]
|
|
|
|
|
|
|
|
|
result = (c00 * (1-wx) * (1-wy) +
|
|
|
c10 * wx * (1-wy) +
|
|
|
c01 * (1-wx) * wy +
|
|
|
c11 * wx * wy)
|
|
|
|
|
|
|
|
|
if len(self.sample_cache) < self.cache_size_limit:
|
|
|
self.sample_cache[cache_key] = result
|
|
|
|
|
|
return result
|
|
|
|
|
|
def sample(self, u: float, v: float) -> np.ndarray:
|
|
|
"""Sample texture with current filter mode"""
|
|
|
if self.compressed:
|
|
|
self.decompress()
|
|
|
|
|
|
|
|
|
mip_level = 0
|
|
|
if self.filter_mode in [FilterMode.TRILINEAR, FilterMode.ANISOTROPIC]:
|
|
|
|
|
|
|
|
|
mip_level = min(len(self.mipmaps)-1, int(max(0, -np.log2(max(u, v)))))
|
|
|
|
|
|
|
|
|
if self.filter_mode == FilterMode.NEAREST:
|
|
|
return self._sample_nearest(u, v, mip_level)
|
|
|
elif self.filter_mode == FilterMode.BILINEAR:
|
|
|
return self._sample_bilinear(u, v, mip_level)
|
|
|
elif self.filter_mode == FilterMode.TRILINEAR:
|
|
|
|
|
|
if mip_level < len(self.mipmaps):
|
|
|
low_mip = self._sample_bilinear(u, v, mip_level)
|
|
|
high_mip = self._sample_bilinear(u, v, mip_level + 1)
|
|
|
factor = max(0, -np.log2(max(u, v))) - mip_level
|
|
|
return low_mip * (1-factor) + high_mip * factor
|
|
|
return self._sample_bilinear(u, v, mip_level)
|
|
|
else:
|
|
|
|
|
|
samples = []
|
|
|
for i in range(self.aniso_level):
|
|
|
offset = i / (self.aniso_level - 1) - 0.5
|
|
|
samples.append(self._sample_bilinear(u + offset*0.001, v, mip_level))
|
|
|
return np.mean(samples, axis=0)
|
|
|
|
|
|
class BufferType(Enum):
|
|
|
VERTEX = auto()
|
|
|
INDEX = auto()
|
|
|
UNIFORM = auto()
|
|
|
STORAGE = auto()
|
|
|
INDIRECT = auto()
|
|
|
|
|
|
class Buffer:
|
|
|
def __init__(self, data: np.ndarray, buffer_type: BufferType,
|
|
|
dynamic: bool = False, map_write: bool = False):
|
|
|
"""
|
|
|
Initialize buffer with specific type and usage flags
|
|
|
|
|
|
Args:
|
|
|
data: Initial buffer data
|
|
|
buffer_type: Type of buffer (vertex, index, uniform, etc)
|
|
|
dynamic: Whether buffer will be frequently updated
|
|
|
map_write: Whether buffer should be mappable for CPU writes
|
|
|
"""
|
|
|
self.buffer_type = buffer_type
|
|
|
self.dynamic = dynamic
|
|
|
self.map_write = map_write
|
|
|
self.mapped = False
|
|
|
|
|
|
|
|
|
self.data = np.array(data)
|
|
|
|
|
|
|
|
|
self.shadow_buffer = None if not map_write else np.array(data)
|
|
|
|
|
|
|
|
|
self.stride = self._calculate_stride()
|
|
|
self.aligned_size = self._align_size(self.data.nbytes)
|
|
|
|
|
|
|
|
|
self.access_count = 0
|
|
|
self.last_access = 0
|
|
|
|
|
|
def _calculate_stride(self) -> int:
|
|
|
"""Calculate optimal stride for the buffer type"""
|
|
|
base_stride = self.data.itemsize * self.data.shape[-1]
|
|
|
|
|
|
return ((base_stride + 15) // 16) * 16
|
|
|
|
|
|
def _align_size(self, size: int) -> int:
|
|
|
"""Align buffer size to GPU-friendly boundaries"""
|
|
|
return ((size + 255) // 256) * 256
|
|
|
|
|
|
def upload(self, data: np.ndarray):
|
|
|
"""Upload new data to buffer"""
|
|
|
assert data.shape == self.data.shape, f"Shape mismatch: expected {self.data.shape}, got {data.shape}"
|
|
|
if self.mapped:
|
|
|
raise RuntimeError("Cannot upload to mapped buffer")
|
|
|
|
|
|
self.data[:] = data
|
|
|
self.access_count += 1
|
|
|
self.last_access = time.time()
|
|
|
|
|
|
def map(self) -> Optional[np.ndarray]:
|
|
|
"""Map buffer for CPU access"""
|
|
|
if not self.map_write:
|
|
|
raise RuntimeError("Buffer not created with map_write flag")
|
|
|
|
|
|
if self.mapped:
|
|
|
raise RuntimeError("Buffer already mapped")
|
|
|
|
|
|
self.mapped = True
|
|
|
self.shadow_buffer[:] = self.data
|
|
|
return self.shadow_buffer
|
|
|
|
|
|
def unmap(self):
|
|
|
"""Unmap buffer and apply changes"""
|
|
|
if not self.mapped:
|
|
|
raise RuntimeError("Buffer not mapped")
|
|
|
|
|
|
self.data[:] = self.shadow_buffer
|
|
|
self.mapped = False
|
|
|
self.access_count += 1
|
|
|
self.last_access = time.time()
|
|
|
|
|
|
def get(self) -> np.ndarray:
|
|
|
"""Get buffer data"""
|
|
|
if self.mapped:
|
|
|
raise RuntimeError("Cannot read from mapped buffer")
|
|
|
self.access_count += 1
|
|
|
self.last_access = time.time()
|
|
|
return self.data
|
|
|
|
|
|
class AttachmentType(Enum):
|
|
|
COLOR = auto()
|
|
|
DEPTH = auto()
|
|
|
STENCIL = auto()
|
|
|
DEPTH_STENCIL = auto()
|
|
|
|
|
|
class MSAASamples(Enum):
|
|
|
MSAA_1X = 1
|
|
|
MSAA_2X = 2
|
|
|
MSAA_4X = 4
|
|
|
MSAA_8X = 8
|
|
|
|
|
|
class Attachment:
|
|
|
def __init__(self, width: int, height: int,
|
|
|
attachment_type: AttachmentType,
|
|
|
format: TextureFormat = TextureFormat.RGBA8,
|
|
|
samples: MSAASamples = MSAASamples.MSAA_1X):
|
|
|
self.width = width
|
|
|
self.height = height
|
|
|
self.attachment_type = attachment_type
|
|
|
self.format = format
|
|
|
self.samples = samples
|
|
|
|
|
|
|
|
|
channels = 4 if format in [TextureFormat.RGBA8, TextureFormat.RGBA16F, TextureFormat.RGBA32F] else 1
|
|
|
if samples == MSAASamples.MSAA_1X:
|
|
|
self.data = np.zeros((height, width, channels), dtype=self._get_dtype())
|
|
|
else:
|
|
|
self.data = np.zeros((height, width, samples.value, channels), dtype=self._get_dtype())
|
|
|
|
|
|
def _get_dtype(self) -> np.dtype:
|
|
|
if self.format in [TextureFormat.R8, TextureFormat.RGBA8]:
|
|
|
return np.uint8
|
|
|
elif self.format in [TextureFormat.R16F, TextureFormat.RGBA16F]:
|
|
|
return np.float16
|
|
|
return np.float32
|
|
|
|
|
|
class Framebuffer:
|
|
|
def __init__(self, width: int, height: int,
|
|
|
color_formats: List[TextureFormat] = [TextureFormat.RGBA8],
|
|
|
depth_format: Optional[TextureFormat] = TextureFormat.R32F,
|
|
|
stencil_format: Optional[TextureFormat] = TextureFormat.R8,
|
|
|
samples: MSAASamples = MSAASamples.MSAA_1X):
|
|
|
"""
|
|
|
Initialize framebuffer with multiple render targets, depth, and stencil
|
|
|
|
|
|
Args:
|
|
|
width: Framebuffer width
|
|
|
height: Framebuffer height
|
|
|
color_formats: List of formats for color attachments
|
|
|
depth_format: Format for depth attachment (None to disable)
|
|
|
stencil_format: Format for stencil attachment (None to disable)
|
|
|
samples: MSAA sample count
|
|
|
"""
|
|
|
self.width = width
|
|
|
self.height = height
|
|
|
self.samples = samples
|
|
|
|
|
|
|
|
|
self.color_attachments = [
|
|
|
Attachment(width, height, AttachmentType.COLOR, format, samples)
|
|
|
for format in color_formats
|
|
|
]
|
|
|
|
|
|
|
|
|
self.depth_attachment = (
|
|
|
Attachment(width, height, AttachmentType.DEPTH, depth_format, samples)
|
|
|
if depth_format else None
|
|
|
)
|
|
|
|
|
|
|
|
|
self.stencil_attachment = (
|
|
|
Attachment(width, height, AttachmentType.STENCIL, stencil_format, samples)
|
|
|
if stencil_format else None
|
|
|
)
|
|
|
|
|
|
|
|
|
self.active_color = 0
|
|
|
|
|
|
|
|
|
self.clear_values = {
|
|
|
AttachmentType.COLOR: np.zeros(4),
|
|
|
AttachmentType.DEPTH: 1.0,
|
|
|
AttachmentType.STENCIL: 0
|
|
|
}
|
|
|
|
|
|
def bind(self, color_attachment: int = 0):
|
|
|
"""Bind specific color attachment for writing"""
|
|
|
assert 0 <= color_attachment < len(self.color_attachments)
|
|
|
self.active_color = color_attachment
|
|
|
|
|
|
def set_clear_values(self, color=None, depth=None, stencil=None):
|
|
|
"""Set clear values for attachments"""
|
|
|
if color is not None:
|
|
|
self.clear_values[AttachmentType.COLOR] = np.array(color)
|
|
|
if depth is not None:
|
|
|
self.clear_values[AttachmentType.DEPTH] = depth
|
|
|
if stencil is not None:
|
|
|
self.clear_values[AttachmentType.STENCIL] = stencil
|
|
|
|
|
|
def clear(self, color=True, depth=True, stencil=True):
|
|
|
"""Clear specified attachments"""
|
|
|
if color:
|
|
|
for attachment in self.color_attachments:
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
attachment.data.fill(self.clear_values[AttachmentType.COLOR])
|
|
|
else:
|
|
|
for s in range(self.samples.value):
|
|
|
attachment.data[..., s, :] = self.clear_values[AttachmentType.COLOR]
|
|
|
|
|
|
if depth and self.depth_attachment:
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
self.depth_attachment.data.fill(self.clear_values[AttachmentType.DEPTH])
|
|
|
else:
|
|
|
for s in range(self.samples.value):
|
|
|
self.depth_attachment.data[..., s, 0] = self.clear_values[AttachmentType.DEPTH]
|
|
|
|
|
|
if stencil and self.stencil_attachment:
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
self.stencil_attachment.data.fill(self.clear_values[AttachmentType.STENCIL])
|
|
|
else:
|
|
|
for s in range(self.samples.value):
|
|
|
self.stencil_attachment.data[..., s, 0] = self.clear_values[AttachmentType.STENCIL]
|
|
|
|
|
|
def write_color(self, x: int, y: int, color: np.ndarray, sample: int = 0):
|
|
|
"""Write color value to current attachment"""
|
|
|
attachment = self.color_attachments[self.active_color]
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
attachment.data[y, x] = color
|
|
|
else:
|
|
|
attachment.data[y, x, sample] = color
|
|
|
|
|
|
def write_depth(self, x: int, y: int, depth: float, sample: int = 0):
|
|
|
"""Write depth value"""
|
|
|
if self.depth_attachment:
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
self.depth_attachment.data[y, x] = depth
|
|
|
else:
|
|
|
self.depth_attachment.data[y, x, sample] = depth
|
|
|
|
|
|
def write_stencil(self, x: int, y: int, stencil: int, sample: int = 0):
|
|
|
"""Write stencil value"""
|
|
|
if self.stencil_attachment:
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
self.stencil_attachment.data[y, x] = stencil
|
|
|
else:
|
|
|
self.stencil_attachment.data[y, x, sample] = stencil
|
|
|
|
|
|
def read_color(self, x: int, y: int, attachment: int = 0) -> np.ndarray:
|
|
|
"""Read color value (resolves MSAA if needed)"""
|
|
|
attachment = self.color_attachments[attachment]
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
return attachment.data[y, x]
|
|
|
|
|
|
return np.mean(attachment.data[y, x], axis=0)
|
|
|
|
|
|
def read_depth(self, x: int, y: int) -> float:
|
|
|
"""Read depth value (resolves MSAA if needed)"""
|
|
|
if not self.depth_attachment:
|
|
|
return 1.0
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
return self.depth_attachment.data[y, x]
|
|
|
return np.mean(self.depth_attachment.data[y, x])
|
|
|
|
|
|
def read_stencil(self, x: int, y: int) -> int:
|
|
|
"""Read stencil value (uses first sample in MSAA)"""
|
|
|
if not self.stencil_attachment:
|
|
|
return 0
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
return self.stencil_attachment.data[y, x]
|
|
|
return self.stencil_attachment.data[y, x, 0]
|
|
|
|
|
|
def resolve_msaa(self) -> List[np.ndarray]:
|
|
|
"""Resolve MSAA framebuffer to non-MSAA textures"""
|
|
|
if self.samples == MSAASamples.MSAA_1X:
|
|
|
return [attachment.data for attachment in self.color_attachments]
|
|
|
|
|
|
resolved = []
|
|
|
for attachment in self.color_attachments:
|
|
|
|
|
|
resolved.append(np.mean(attachment.data, axis=2))
|
|
|
return resolved
|
|
|
|