Spaces:
Runtime error
Runtime error
| """ | |
| LOGOS Display Interpreter - State Saturation Engine | |
| Reconstruction engine that maintains persistent canvas state (The Cake) | |
| Updates state using stream instructions (The Bake) | |
| """ | |
| import numpy as np | |
| from enum import Enum | |
| import logging | |
| from PIL import Image | |
| from .fractal_engine import LogosFractalEngine | |
| class Stage(Enum): | |
| """Pipeline stages""" | |
| ALLOCATION = "ALLOCATION" # Stage 1: Create output buffer from first META | |
| SATURATION = "SATURATION" # Stage 2: Fill buckets with initial data | |
| HARMONIC_DIFF = "HARMONIC_DIFF" # Stage 3: Apply heat diffs for animation | |
| class Mode(Enum): | |
| """Operating modes""" | |
| STREAMING = "STREAMING" # Real-time viewport updates | |
| DOWNLOAD = "DOWNLOAD" # Full resolution export | |
| class LogosDisplayInterpreter: | |
| """ | |
| State-based reconstruction engine (The Oven) | |
| Maintains persistent canvas state and updates it atomically | |
| """ | |
| BUCKET_SIZE = 512 # Bytes per bucket (one atom) | |
| BYTES_PER_PIXEL = 3 # RGB | |
| def __init__(self, mode=Mode.STREAMING, use_fractal_addressing=True): | |
| """ | |
| Initialize the Display Interpreter | |
| Args: | |
| mode: STREAMING (real-time) or DOWNLOAD (full fidelity export) | |
| use_fractal_addressing: If True, use fractal quadtree addressing (default: True) | |
| """ | |
| self.mode = mode | |
| self.stage = Stage.ALLOCATION | |
| self.use_fractal_addressing = use_fractal_addressing | |
| # Fractal engine for coordinate decoding | |
| if use_fractal_addressing: | |
| self.fractal_engine = LogosFractalEngine(min_bucket_size=64) | |
| else: | |
| self.fractal_engine = None | |
| # Canvas state (The Cake) | |
| self.canvas_state = None # numpy array (H, W, 3) uint8 | |
| # Fidelity map (tracks saturated buckets) | |
| self.fidelity_map = None # boolean array (num_buckets_y, num_buckets_x) | |
| # Resolution (determined by first META chunk) | |
| self.resolution = None # (width, height) | |
| # Bucket dimensions | |
| self.bucket_width = None | |
| self.bucket_height = None | |
| self.num_buckets_x = 0 | |
| self.num_buckets_y = 0 | |
| # Statistics | |
| self.total_buckets = 0 | |
| self.saturated_buckets = 0 | |
| self.first_meta_received = False | |
| # Setup logging | |
| self.logger = logging.getLogger('LogosDisplayInterpreter') | |
| def decode_bucket_position(self, heat_code_hex): | |
| """ | |
| Decode bucket (X, Y) coordinates from heat code | |
| Uses fractal quadtree addressing for non-linear spatial mapping | |
| Args: | |
| heat_code_hex: 8-character hex string (4 bytes = 32 bits) | |
| Returns: | |
| (bucket_x, bucket_y): Bucket coordinates | |
| """ | |
| # Convert hex to integer | |
| heat_int = int(heat_code_hex, 16) | |
| if self.use_fractal_addressing and self.fractal_engine: | |
| # Use fractal quadtree addressing | |
| # This provides non-linear spatial distribution (Infinite Canvas capability) | |
| if self.num_buckets_x > 0 and self.num_buckets_y > 0: | |
| bucket_x, bucket_y = self.fractal_engine.fractal_to_bucket_coords( | |
| heat_int, | |
| self.num_buckets_x, | |
| self.num_buckets_y | |
| ) | |
| else: | |
| # Canvas not allocated yet, use fallback | |
| # Use bits for simple coordinate extraction | |
| bucket_x = heat_int & 0xFF | |
| bucket_y = (heat_int >> 8) & 0xFF | |
| else: | |
| # Fallback: Linear addressing (legacy mode) | |
| # Extract bits for position encoding | |
| # Bits 0-7: X coordinate (0-255) | |
| # Bits 8-15: Y coordinate (0-255) | |
| bucket_x = heat_int & 0xFF # Lower 8 bits | |
| bucket_y = (heat_int >> 8) & 0xFF # Next 8 bits | |
| # Wrap to valid bucket range | |
| if self.num_buckets_x > 0 and self.num_buckets_y > 0: | |
| bucket_x = bucket_x % self.num_buckets_x | |
| bucket_y = bucket_y % self.num_buckets_y | |
| return (bucket_x, bucket_y) | |
| def get_fractal_zone_rect(self, heat_code_hex): | |
| """ | |
| Get fractal zone rectangle for a heat code | |
| Returns the exact spatial region (ZoneRect) for atom injection | |
| Args: | |
| heat_code_hex: 8-character hex string (4 bytes) | |
| Returns: | |
| ZoneRect: (x, y, width, height) defining target region | |
| """ | |
| if not self.resolution: | |
| return None | |
| heat_int = int(heat_code_hex, 16) | |
| if self.use_fractal_addressing and self.fractal_engine: | |
| return self.fractal_engine.resolve_fractal_address(heat_int, self.resolution) | |
| else: | |
| # Fallback: Map to bucket region | |
| bucket_x, bucket_y = self.decode_bucket_position(heat_code_hex) | |
| if self.bucket_width and self.bucket_height: | |
| x = bucket_x * self.bucket_width | |
| y = bucket_y * self.bucket_height | |
| return (x, y, self.bucket_width, self.bucket_height) | |
| return None | |
| def allocate_canvas(self, resolution): | |
| """ | |
| Stage 1: Allocate output buffer based on first META header | |
| Args: | |
| resolution: (width, height) tuple | |
| """ | |
| width, height = resolution | |
| self.resolution = (width, height) | |
| # Allocate canvas state (RGB) | |
| self.canvas_state = np.zeros((height, width, 3), dtype=np.uint8) | |
| # Calculate bucket dimensions | |
| # Each bucket is 512 bytes = 170.67 pixels (RGB), round to reasonable size | |
| pixels_per_bucket = self.BUCKET_SIZE // self.BYTES_PER_PIXEL # ~170 pixels | |
| self.bucket_width = max(1, pixels_per_bucket) | |
| self.bucket_height = self.bucket_width # Square buckets | |
| # Calculate number of buckets | |
| self.num_buckets_x = (width + self.bucket_width - 1) // self.bucket_width | |
| self.num_buckets_y = (height + self.bucket_height - 1) // self.bucket_height | |
| self.total_buckets = self.num_buckets_x * self.num_buckets_y | |
| # Allocate fidelity map | |
| self.fidelity_map = np.zeros((self.num_buckets_y, self.num_buckets_x), dtype=bool) | |
| self.stage = Stage.SATURATION | |
| self.logger.info( | |
| f"Canvas allocated: {width}x{height}, " | |
| f"Buckets: {self.num_buckets_x}x{self.num_buckets_y} " | |
| f"({self.total_buckets} total), " | |
| f"Bucket size: {self.bucket_width}x{self.bucket_height}" | |
| ) | |
| def process_atom(self, atom_data, chunk_type): | |
| """ | |
| Process a 512-byte atom and update canvas state | |
| Args: | |
| atom_data: dict from StreamInterpreter with: | |
| - heat_signature: 8-char hex string | |
| - wave_payload: 508 bytes | |
| chunk_type: ChunkType.META or ChunkType.DELTA | |
| """ | |
| heat_signature = atom_data['heat_signature'] | |
| wave_payload = atom_data['wave_payload'] | |
| # Decode bucket position from heat code | |
| bucket_x, bucket_y = self.decode_bucket_position(heat_signature) | |
| # Stage 1: First META chunk allocates canvas | |
| if not self.first_meta_received and chunk_type.value == "META": | |
| # Determine resolution from META chunk | |
| # Use heat signature to derive resolution hints | |
| heat_int = int(heat_signature, 16) | |
| # Extract resolution hints from heat code | |
| # Higher bits might indicate resolution class | |
| width = 512 + ((heat_int >> 16) & 0x3FF) * 256 # 512-1024 range | |
| height = 512 + ((heat_int >> 26) & 0x3FF) * 256 | |
| # Clamp to reasonable bounds | |
| width = max(256, min(4096, width)) | |
| height = max(256, min(4096, height)) | |
| self.allocate_canvas((width, height)) | |
| self.first_meta_received = True | |
| # Can't process atoms until canvas is allocated | |
| if self.canvas_state is None: | |
| self.logger.warning("Canvas not allocated yet, skipping atom") | |
| return | |
| # Update state at bucket position | |
| self._update_bucket(bucket_x, bucket_y, wave_payload, chunk_type) | |
| # Mark bucket as saturated | |
| if not self.fidelity_map[bucket_y, bucket_x]: | |
| self.fidelity_map[bucket_y, bucket_x] = True | |
| self.saturated_buckets += 1 | |
| # Check if all buckets are saturated (move to Stage 3) | |
| if self.stage == Stage.SATURATION: | |
| saturation_percent = (self.saturated_buckets / self.total_buckets) * 100 | |
| if saturation_percent >= 100.0: | |
| self.stage = Stage.HARMONIC_DIFF | |
| self.logger.info("Saturation complete, entering Harmonic Diff stage") | |
| def _update_bucket(self, bucket_x, bucket_y, wave_payload, chunk_type): | |
| """ | |
| Update canvas state at specific bucket position | |
| Args: | |
| bucket_x, bucket_y: Bucket coordinates | |
| wave_payload: 508 bytes of data | |
| chunk_type: META or DELTA | |
| """ | |
| # Calculate pixel region for this bucket | |
| px_start = bucket_x * self.bucket_width | |
| py_start = bucket_y * self.bucket_height | |
| px_end = min(px_start + self.bucket_width, self.resolution[0]) | |
| py_end = min(py_start + self.bucket_height, self.resolution[1]) | |
| # Convert payload to pixel data | |
| if chunk_type.value == "META": | |
| # META: Structure (grayscale geometric) | |
| pixel_data = self._decode_meta_payload(wave_payload, px_end - px_start, py_end - py_start) | |
| else: | |
| # DELTA: Heat (thermal color) | |
| pixel_data = self._decode_delta_payload(wave_payload, px_end - px_start, py_end - py_start) | |
| # Update canvas state | |
| # Blend with existing state if in Harmonic Diff stage | |
| if self.stage == Stage.HARMONIC_DIFF and chunk_type.value == "DELTA": | |
| # Blend DELTA (heat diffs) with existing state | |
| existing = self.canvas_state[py_start:py_end, px_start:px_end] | |
| blended = self._blend_heat_diff(existing, pixel_data) | |
| self.canvas_state[py_start:py_end, px_start:px_end] = blended | |
| else: | |
| # Overwrite (Saturation stage or META) | |
| self.canvas_state[py_start:py_end, px_start:px_end] = pixel_data | |
| def _decode_meta_payload(self, wave_payload, width, height): | |
| """Decode META payload as structure (geometric/grayscale)""" | |
| if not wave_payload: | |
| return np.zeros((height, width, 3), dtype=np.uint8) | |
| payload_array = np.frombuffer(wave_payload, dtype=np.uint8) | |
| # Create geometric structure from payload | |
| pixel_count = width * height | |
| if len(payload_array) >= pixel_count: | |
| # Direct mapping | |
| gray_values = payload_array[:pixel_count].reshape((height, width)) | |
| else: | |
| # Tile pattern | |
| tile_count = (pixel_count + len(payload_array) - 1) // len(payload_array) | |
| tiled = np.tile(payload_array, tile_count)[:pixel_count] | |
| gray_values = tiled.reshape((height, width)) | |
| # Convert to RGB grayscale | |
| return np.stack([gray_values, gray_values, gray_values], axis=2) | |
| def _decode_delta_payload(self, wave_payload, width, height): | |
| """Decode DELTA payload as heat (thermal color palette)""" | |
| if not wave_payload: | |
| return np.zeros((height, width, 3), dtype=np.uint8) | |
| payload_array = np.frombuffer(wave_payload, dtype=np.uint8) | |
| # Normalize to [0, 1] for thermal mapping | |
| if payload_array.max() != payload_array.min(): | |
| normalized = (payload_array.astype(np.float32) - payload_array.min()) / ( | |
| payload_array.max() - payload_array.min() + 1e-6 | |
| ) | |
| else: | |
| normalized = np.full(len(payload_array), 0.5, dtype=np.float32) | |
| # Map to thermal colors | |
| pixel_count = width * height | |
| if len(normalized) >= pixel_count: | |
| heat_values = normalized[:pixel_count].reshape((height, width)) | |
| else: | |
| tile_count = (pixel_count + len(normalized) - 1) // len(normalized) | |
| tiled = np.tile(normalized, tile_count)[:pixel_count] | |
| heat_values = tiled.reshape((height, width)) | |
| # Convert to RGB thermal colors | |
| rgb = np.zeros((height, width, 3), dtype=np.uint8) | |
| for y in range(height): | |
| for x in range(width): | |
| r, g, b = self._thermal_color(heat_values[y, x]) | |
| rgb[y, x] = [r, g, b] | |
| return rgb | |
| def _thermal_color(self, heat_value): | |
| """Convert heat [0, 1] to thermal RGB (Blue->Cyan->Yellow->Red)""" | |
| heat_value = np.clip(heat_value, 0.0, 1.0) | |
| if heat_value < 0.25: | |
| t = heat_value / 0.25 | |
| r, g, b = 0, int(255 * t), 255 | |
| elif heat_value < 0.5: | |
| t = (heat_value - 0.25) / 0.25 | |
| r, g, b = int(255 * t), 255, int(255 * (1 - t)) | |
| elif heat_value < 0.75: | |
| t = (heat_value - 0.5) / 0.25 | |
| r, g, b = 255, int(255 * (1 - t * 0.5)), 0 | |
| else: | |
| t = (heat_value - 0.75) / 0.25 | |
| r, g, b = 255, int(255 * (1 - t) * 0.5), 0 | |
| return (r, g, b) | |
| def _blend_heat_diff(self, existing, heat_diff): | |
| """Blend heat diff (DELTA) with existing state""" | |
| # Additive blending for heat effects | |
| blended = existing.astype(np.float32) + heat_diff.astype(np.float32) * 0.3 | |
| return np.clip(blended, 0, 255).astype(np.uint8) | |
| def get_viewport_frame(self, window_size): | |
| """ | |
| Output Method A: Get viewport frame for streaming (real-time playback) | |
| Args: | |
| window_size: (width, height) tuple for viewport | |
| Returns: | |
| PIL Image scaled to window_size with saturation overlay | |
| """ | |
| if self.canvas_state is None: | |
| # Return blank frame if canvas not allocated | |
| return Image.new('RGB', window_size, color='black') | |
| # Convert canvas state to PIL Image | |
| pil_image = Image.fromarray(self.canvas_state, mode='RGB') | |
| # Scale to window size using BICUBIC interpolation | |
| scaled = pil_image.resize(window_size, Image.Resampling.BICUBIC) | |
| # Overlay saturation map if not 100% saturated | |
| saturation_percent = (self.saturated_buckets / self.total_buckets * 100) if self.total_buckets > 0 else 0 | |
| if saturation_percent < 100.0: | |
| scaled = self._overlay_saturation_map(scaled, window_size, saturation_percent) | |
| return scaled | |
| def _overlay_saturation_map(self, base_image, window_size, saturation_percent): | |
| """Overlay visual heat map showing missing buckets""" | |
| # Create overlay showing bucket saturation | |
| overlay = Image.new('RGBA', window_size, (0, 0, 0, 0)) | |
| overlay_np = np.array(overlay) | |
| if self.fidelity_map is not None: | |
| # Scale fidelity map to window size | |
| scale_x = window_size[0] / self.num_buckets_x | |
| scale_y = window_size[1] / self.num_buckets_y | |
| for by in range(self.num_buckets_y): | |
| for bx in range(self.num_buckets_x): | |
| if not self.fidelity_map[by, bx]: | |
| # Missing bucket: draw semi-transparent red overlay | |
| x1 = int(bx * scale_x) | |
| y1 = int(by * scale_y) | |
| x2 = int((bx + 1) * scale_x) | |
| y2 = int((by + 1) * scale_y) | |
| overlay_np[y1:y2, x1:x2, 0] = 255 # Red | |
| overlay_np[y1:y2, x1:x2, 3] = 64 # Semi-transparent | |
| overlay = Image.fromarray(overlay_np, mode='RGBA') | |
| base_image = Image.alpha_composite(base_image.convert('RGBA'), overlay).convert('RGB') | |
| return base_image | |
| def get_full_fidelity_frame(self): | |
| """ | |
| Output Method B: Get full fidelity frame for download | |
| Returns raw canvas_state without scaling | |
| Returns: | |
| PIL Image at native resolution | |
| """ | |
| if self.canvas_state is None: | |
| raise RuntimeError("Canvas state not initialized") | |
| return Image.fromarray(self.canvas_state, mode='RGB') | |
| def get_saturation_stats(self): | |
| """Get saturation statistics""" | |
| if self.total_buckets == 0: | |
| return { | |
| 'saturated': 0, | |
| 'total': 0, | |
| 'percent': 0.0, | |
| 'stage': self.stage.value | |
| } | |
| return { | |
| 'saturated': self.saturated_buckets, | |
| 'total': self.total_buckets, | |
| 'percent': (self.saturated_buckets / self.total_buckets) * 100, | |
| 'stage': self.stage.value | |
| } | |