""" LOGOS Playback Window - UI Shell (SPCW Cake/Bake Protocol) Displays interpreter output with fixed viewport and bicubic interpolation META: Geometric/Grayscale structure DELTA: Thermal color palette (Blue->Red) """ import numpy as np from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QLabel from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal from PyQt5.QtGui import QImage, QPixmap from PIL import Image import logging from collections import deque class StreamRenderer(QThread): """ Worker thread for rendering stream data (Bake Renderer) Converts interpreter output to RGB image buffer """ frame_ready = pyqtSignal(np.ndarray, int, int, str) # frame_data, width, height, heat_signature def __init__(self, interpreter_output): super().__init__() self.interpreter_output = interpreter_output self.logger = logging.getLogger('StreamRenderer') def run(self): """Render interpreter output to RGB buffer""" wave_payload = self.interpreter_output['wave_payload'] chunk_type = self.interpreter_output['chunk_type'] render_buffer_size = self.interpreter_output['render_buffer_size'] heat_signature = self.interpreter_output['heat_signature'] # Create image from wave payload image_data = self._render_chunk(wave_payload, render_buffer_size, chunk_type) # Emit rendered frame self.frame_ready.emit(image_data, render_buffer_size, render_buffer_size, heat_signature) def _render_chunk(self, wave_payload, size, chunk_type): """ Render chunk based on type (META or DELTA) Args: wave_payload: bytes (508 bytes) size: Target image size (width/height) chunk_type: ChunkType.META or ChunkType.DELTA Returns: numpy array of shape (size, size, 3) with RGB values """ if chunk_type.value == "META": # META: Render as Structure (Geometric/Grayscale) return self._render_meta_structure(wave_payload, size) else: # DELTA: Render as Heat (Thermal color palette) return self._render_delta_heat(wave_payload, size) def _render_meta_structure(self, wave_payload, size): """ Render META chunk as Structure (Geometric/Grayscale) Maps byte values to geometric grid coordinates or grayscale intensity """ image = np.zeros((size, size, 3), dtype=np.uint8) if not wave_payload or len(wave_payload) == 0: return image payload_array = np.frombuffer(wave_payload, dtype=np.uint8) # Create geometric structure mapping # Strategy: Map 508 bytes to 2D grid with grayscale intensity # Calculate grid dimensions (close to square) grid_size = int(np.sqrt(len(payload_array))) + 1 grid_size = min(grid_size, size) # Don't exceed render size # Map payload bytes to grid coordinates for i, byte_value in enumerate(payload_array): if i >= grid_size * grid_size: break y = (i // grid_size) % size x = (i % grid_size) % size # Grayscale intensity from byte value gray = byte_value image[y, x] = [gray, gray, gray] # For remaining pixels, fill with geometric patterns # Create wave-like structures from byte patterns if len(payload_array) < size * size: remaining_start = len(payload_array) for i in range(remaining_start, size * size): y = i // size x = i % size # Geometric pattern based on position and payload pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0 base_value = payload_array[pattern_idx] if len(payload_array) > 0 else 128 # Add geometric structure (wave patterns) wave_pattern = int(127 * np.sin(x * 0.1) * np.cos(y * 0.1)) + 128 final_value = (base_value + wave_pattern) // 2 final_value = max(0, min(255, final_value)) image[y, x] = [final_value, final_value, final_value] return image def _render_delta_heat(self, wave_payload, size): """ Render DELTA chunk as Heat (Thermal color palette: Blue->Red) Maps byte values to thermal color visualization """ image = np.zeros((size, size, 3), dtype=np.uint8) if not wave_payload or len(wave_payload) == 0: return image payload_array = np.frombuffer(wave_payload, dtype=np.uint8) # Normalize payload to [0, 1] for thermal mapping if payload_array.max() != payload_array.min(): normalized = (payload_array.astype(np.float32) - payload_array.min()) / ( payload_array.max() - payload_array.min() ) else: normalized = np.full(len(payload_array), 0.5, dtype=np.float32) # Thermal color palette: Blue (cold) -> Cyan -> Yellow -> Red (hot) # Map normalized [0, 1] to RGB thermal colors for i, heat_value in enumerate(normalized): if i >= size * size: break y = i // size x = i % size # Thermal color mapping r, g, b = self._thermal_color(heat_value) image[y, x] = [r, g, b] # Fill remaining pixels with heat gradient if len(payload_array) < size * size: remaining_start = len(payload_array) for i in range(remaining_start, size * size): y = i // size x = i % size # Create heat gradient from payload pattern pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0 base_heat = normalized[pattern_idx] if len(payload_array) > 0 else 0.5 # Add phase hole noise effect noise = ((x + y) % 256) / 255.0 * 0.2 heat_value = np.clip(base_heat + noise, 0.0, 1.0) r, g, b = self._thermal_color(heat_value) image[y, x] = [r, g, b] return image def _thermal_color(self, heat_value): """ Convert heat value [0, 1] to thermal RGB color Blue (cold, 0.0) -> Cyan -> Yellow -> Red (hot, 1.0) Args: heat_value: Float in [0, 1] Returns: (r, g, b) tuple """ heat_value = np.clip(heat_value, 0.0, 1.0) if heat_value < 0.25: # Blue to Cyan t = heat_value / 0.25 r = 0 g = int(255 * t) b = 255 elif heat_value < 0.5: # Cyan to Yellow t = (heat_value - 0.25) / 0.25 r = int(255 * t) g = 255 b = int(255 * (1 - t)) elif heat_value < 0.75: # Yellow to Orange t = (heat_value - 0.5) / 0.25 r = 255 g = int(255 * (1 - t * 0.5)) b = 0 else: # Orange to Red t = (heat_value - 0.75) / 0.25 r = 255 g = int(255 * (1 - t) * 0.5) b = 0 return (r, g, b) class PlaybackWindow(QMainWindow): """ Playback Window with fixed viewport that displays state-based reconstruction Uses LogosDisplayInterpreter for persistent canvas state updates """ def __init__(self, display_interpreter, window_width=None, window_height=None, parent=None): super().__init__(parent) self.display_interpreter = display_interpreter self.window_width = window_width self.window_height = window_height self.logger = logging.getLogger('PlaybackWindow') # Setup UI self.init_ui() def init_ui(self): """Initialize the user interface""" self.setWindowTitle("LOGOS Playback Interpreter - State Saturation Engine") if self.window_width and self.window_height: self.setGeometry(100, 100, self.window_width, self.window_height) else: self.setGeometry(100, 100, 1024, 768) # Central widget central_widget = QWidget() self.setCentralWidget(central_widget) # Layout layout = QVBoxLayout() central_widget.setLayout(layout) # Viewport label for displaying frames self.viewport = QLabel() self.viewport.setAlignment(Qt.AlignCenter) self.viewport.setStyleSheet("background-color: black;") if self.window_width and self.window_height: self.viewport.setMinimumSize(self.window_width, self.window_height) layout.addWidget(self.viewport) # Status label self.status_label = QLabel("Waiting for stream data...") self.status_label.setAlignment(Qt.AlignCenter) layout.addWidget(self.status_label) def update_display(self): """Update viewport from display interpreter state""" # Get viewport frame (scaled with saturation overlay) target_size = ( self.window_width if self.window_width else self.display_interpreter.resolution[0], self.window_height if self.window_height else self.display_interpreter.resolution[1], ) viewport_frame = self.display_interpreter.get_viewport_frame(target_size) # Convert PIL Image to QPixmap qimage = QImage( viewport_frame.tobytes(), target_size[0], target_size[1], QImage.Format_RGB888 ) pixmap = QPixmap.fromImage(qimage) # Display in viewport self.viewport.setPixmap(pixmap) # Update status with saturation info stats = self.display_interpreter.get_saturation_stats() if self.display_interpreter.resolution: res = self.display_interpreter.resolution vp_w = self.window_width if self.window_width else res[0] vp_h = self.window_height if self.window_height else res[1] self.status_label.setText( f"Stage: {stats['stage']} | " f"Saturation: {stats['percent']:.1f}% ({stats['saturated']}/{stats['total']}) | " f"Resolution: {res[0]}x{res[1]} | " f"Viewport: {vp_w}x{vp_h}" ) else: self.status_label.setText( f"Stage: {stats['stage']} | Waiting for first META chunk..." ) class StreamHarmonizer: """ Handles buffer synchronization for audio/video/data alignment Based on META markers from StreamInterpreter """ def __init__(self): from collections import deque self.audio_buffer = deque() self.video_buffer = deque() self.data_buffer = deque() self.meta_sequence = [] self.logger = logging.getLogger('StreamHarmonizer') def register_meta_marker(self, marker_data): """ Register a META marker for synchronization Args: marker_data: Marker data from StreamInterpreter """ self.meta_sequence.append(marker_data) self.logger.debug(f"META marker registered: Heat={marker_data.get('heat_signature', 'N/A')}") def synchronize_buffers(self, audio_chunk, video_chunk, data_chunk, meta_markers): """ Synchronize buffers based on META markers Args: audio_chunk: Audio data chunk video_chunk: Video data chunk data_chunk: Data chunk meta_markers: List of META markers from interpreter Returns: synchronized: Dictionary with aligned buffers """ # Add chunks to respective buffers if audio_chunk is not None: self.audio_buffer.append(audio_chunk) if video_chunk is not None: self.video_buffer.append(video_chunk) if data_chunk is not None: self.data_buffer.append(data_chunk) # Align buffers based on META marker positions if meta_markers: # Use latest META marker as sync point sync_point = meta_markers[-1] # Ensure all buffers are aligned to this marker min_buffer_size = min( len(self.audio_buffer), len(self.video_buffer), len(self.data_buffer) ) # Trim buffers to sync point if needed while len(self.audio_buffer) > min_buffer_size: self.audio_buffer.popleft() while len(self.video_buffer) > min_buffer_size: self.video_buffer.popleft() while len(self.data_buffer) > min_buffer_size: self.data_buffer.popleft() heat_sig = sync_point.get('heat_signature', 'unknown') self.logger.info( f"Buffers synchronized at META marker Heat={heat_sig}, " f"Buffer sizes: Audio={len(self.audio_buffer)}, " f"Video={len(self.video_buffer)}, Data={len(self.data_buffer)}" ) return { 'audio': list(self.audio_buffer), 'video': list(self.video_buffer), 'data': list(self.data_buffer), 'sync_markers': meta_markers }