Spaces:
Runtime error
Runtime error
| """ | |
| LOGOS Playback Window - UI Shell (SPCW Cake/Bake Protocol) | |
| Displays interpreter output with fixed viewport and bicubic interpolation | |
| META: Geometric/Grayscale structure | |
| DELTA: Thermal color palette (Blue->Red) | |
| """ | |
| import numpy as np | |
| from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QLabel | |
| from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal | |
| from PyQt5.QtGui import QImage, QPixmap | |
| from PIL import Image | |
| import logging | |
| from collections import deque | |
| class StreamRenderer(QThread): | |
| """ | |
| Worker thread for rendering stream data (Bake Renderer) | |
| Converts interpreter output to RGB image buffer | |
| """ | |
| frame_ready = pyqtSignal(np.ndarray, int, int, str) # frame_data, width, height, heat_signature | |
| def __init__(self, interpreter_output): | |
| super().__init__() | |
| self.interpreter_output = interpreter_output | |
| self.logger = logging.getLogger('StreamRenderer') | |
| def run(self): | |
| """Render interpreter output to RGB buffer""" | |
| wave_payload = self.interpreter_output['wave_payload'] | |
| chunk_type = self.interpreter_output['chunk_type'] | |
| render_buffer_size = self.interpreter_output['render_buffer_size'] | |
| heat_signature = self.interpreter_output['heat_signature'] | |
| # Create image from wave payload | |
| image_data = self._render_chunk(wave_payload, render_buffer_size, chunk_type) | |
| # Emit rendered frame | |
| self.frame_ready.emit(image_data, render_buffer_size, render_buffer_size, heat_signature) | |
| def _render_chunk(self, wave_payload, size, chunk_type): | |
| """ | |
| Render chunk based on type (META or DELTA) | |
| Args: | |
| wave_payload: bytes (508 bytes) | |
| size: Target image size (width/height) | |
| chunk_type: ChunkType.META or ChunkType.DELTA | |
| Returns: | |
| numpy array of shape (size, size, 3) with RGB values | |
| """ | |
| if chunk_type.value == "META": | |
| # META: Render as Structure (Geometric/Grayscale) | |
| return self._render_meta_structure(wave_payload, size) | |
| else: | |
| # DELTA: Render as Heat (Thermal color palette) | |
| return self._render_delta_heat(wave_payload, size) | |
| def _render_meta_structure(self, wave_payload, size): | |
| """ | |
| Render META chunk as Structure (Geometric/Grayscale) | |
| Maps byte values to geometric grid coordinates or grayscale intensity | |
| """ | |
| image = np.zeros((size, size, 3), dtype=np.uint8) | |
| if not wave_payload or len(wave_payload) == 0: | |
| return image | |
| payload_array = np.frombuffer(wave_payload, dtype=np.uint8) | |
| # Create geometric structure mapping | |
| # Strategy: Map 508 bytes to 2D grid with grayscale intensity | |
| # Calculate grid dimensions (close to square) | |
| grid_size = int(np.sqrt(len(payload_array))) + 1 | |
| grid_size = min(grid_size, size) # Don't exceed render size | |
| # Map payload bytes to grid coordinates | |
| for i, byte_value in enumerate(payload_array): | |
| if i >= grid_size * grid_size: | |
| break | |
| y = (i // grid_size) % size | |
| x = (i % grid_size) % size | |
| # Grayscale intensity from byte value | |
| gray = byte_value | |
| image[y, x] = [gray, gray, gray] | |
| # For remaining pixels, fill with geometric patterns | |
| # Create wave-like structures from byte patterns | |
| if len(payload_array) < size * size: | |
| remaining_start = len(payload_array) | |
| for i in range(remaining_start, size * size): | |
| y = i // size | |
| x = i % size | |
| # Geometric pattern based on position and payload | |
| pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0 | |
| base_value = payload_array[pattern_idx] if len(payload_array) > 0 else 128 | |
| # Add geometric structure (wave patterns) | |
| wave_pattern = int(127 * np.sin(x * 0.1) * np.cos(y * 0.1)) + 128 | |
| final_value = (base_value + wave_pattern) // 2 | |
| final_value = max(0, min(255, final_value)) | |
| image[y, x] = [final_value, final_value, final_value] | |
| return image | |
| def _render_delta_heat(self, wave_payload, size): | |
| """ | |
| Render DELTA chunk as Heat (Thermal color palette: Blue->Red) | |
| Maps byte values to thermal color visualization | |
| """ | |
| image = np.zeros((size, size, 3), dtype=np.uint8) | |
| if not wave_payload or len(wave_payload) == 0: | |
| return image | |
| payload_array = np.frombuffer(wave_payload, dtype=np.uint8) | |
| # Normalize payload to [0, 1] for thermal mapping | |
| if payload_array.max() != payload_array.min(): | |
| normalized = (payload_array.astype(np.float32) - payload_array.min()) / ( | |
| payload_array.max() - payload_array.min() | |
| ) | |
| else: | |
| normalized = np.full(len(payload_array), 0.5, dtype=np.float32) | |
| # Thermal color palette: Blue (cold) -> Cyan -> Yellow -> Red (hot) | |
| # Map normalized [0, 1] to RGB thermal colors | |
| for i, heat_value in enumerate(normalized): | |
| if i >= size * size: | |
| break | |
| y = i // size | |
| x = i % size | |
| # Thermal color mapping | |
| r, g, b = self._thermal_color(heat_value) | |
| image[y, x] = [r, g, b] | |
| # Fill remaining pixels with heat gradient | |
| if len(payload_array) < size * size: | |
| remaining_start = len(payload_array) | |
| for i in range(remaining_start, size * size): | |
| y = i // size | |
| x = i % size | |
| # Create heat gradient from payload pattern | |
| pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0 | |
| base_heat = normalized[pattern_idx] if len(payload_array) > 0 else 0.5 | |
| # Add phase hole noise effect | |
| noise = ((x + y) % 256) / 255.0 * 0.2 | |
| heat_value = np.clip(base_heat + noise, 0.0, 1.0) | |
| r, g, b = self._thermal_color(heat_value) | |
| image[y, x] = [r, g, b] | |
| return image | |
| def _thermal_color(self, heat_value): | |
| """ | |
| Convert heat value [0, 1] to thermal RGB color | |
| Blue (cold, 0.0) -> Cyan -> Yellow -> Red (hot, 1.0) | |
| Args: | |
| heat_value: Float in [0, 1] | |
| Returns: | |
| (r, g, b) tuple | |
| """ | |
| heat_value = np.clip(heat_value, 0.0, 1.0) | |
| if heat_value < 0.25: | |
| # Blue to Cyan | |
| t = heat_value / 0.25 | |
| r = 0 | |
| g = int(255 * t) | |
| b = 255 | |
| elif heat_value < 0.5: | |
| # Cyan to Yellow | |
| t = (heat_value - 0.25) / 0.25 | |
| r = int(255 * t) | |
| g = 255 | |
| b = int(255 * (1 - t)) | |
| elif heat_value < 0.75: | |
| # Yellow to Orange | |
| t = (heat_value - 0.5) / 0.25 | |
| r = 255 | |
| g = int(255 * (1 - t * 0.5)) | |
| b = 0 | |
| else: | |
| # Orange to Red | |
| t = (heat_value - 0.75) / 0.25 | |
| r = 255 | |
| g = int(255 * (1 - t) * 0.5) | |
| b = 0 | |
| return (r, g, b) | |
| class PlaybackWindow(QMainWindow): | |
| """ | |
| Playback Window with fixed viewport that displays state-based reconstruction | |
| Uses LogosDisplayInterpreter for persistent canvas state updates | |
| """ | |
| def __init__(self, display_interpreter, window_width=None, window_height=None, parent=None): | |
| super().__init__(parent) | |
| self.display_interpreter = display_interpreter | |
| self.window_width = window_width | |
| self.window_height = window_height | |
| self.logger = logging.getLogger('PlaybackWindow') | |
| # Setup UI | |
| self.init_ui() | |
| def init_ui(self): | |
| """Initialize the user interface""" | |
| self.setWindowTitle("LOGOS Playback Interpreter - State Saturation Engine") | |
| if self.window_width and self.window_height: | |
| self.setGeometry(100, 100, self.window_width, self.window_height) | |
| else: | |
| self.setGeometry(100, 100, 1024, 768) | |
| # Central widget | |
| central_widget = QWidget() | |
| self.setCentralWidget(central_widget) | |
| # Layout | |
| layout = QVBoxLayout() | |
| central_widget.setLayout(layout) | |
| # Viewport label for displaying frames | |
| self.viewport = QLabel() | |
| self.viewport.setAlignment(Qt.AlignCenter) | |
| self.viewport.setStyleSheet("background-color: black;") | |
| if self.window_width and self.window_height: | |
| self.viewport.setMinimumSize(self.window_width, self.window_height) | |
| layout.addWidget(self.viewport) | |
| # Status label | |
| self.status_label = QLabel("Waiting for stream data...") | |
| self.status_label.setAlignment(Qt.AlignCenter) | |
| layout.addWidget(self.status_label) | |
| def update_display(self): | |
| """Update viewport from display interpreter state""" | |
| # Get viewport frame (scaled with saturation overlay) | |
| target_size = ( | |
| self.window_width if self.window_width else self.display_interpreter.resolution[0], | |
| self.window_height if self.window_height else self.display_interpreter.resolution[1], | |
| ) | |
| viewport_frame = self.display_interpreter.get_viewport_frame(target_size) | |
| # Convert PIL Image to QPixmap | |
| qimage = QImage( | |
| viewport_frame.tobytes(), | |
| target_size[0], | |
| target_size[1], | |
| QImage.Format_RGB888 | |
| ) | |
| pixmap = QPixmap.fromImage(qimage) | |
| # Display in viewport | |
| self.viewport.setPixmap(pixmap) | |
| # Update status with saturation info | |
| stats = self.display_interpreter.get_saturation_stats() | |
| if self.display_interpreter.resolution: | |
| res = self.display_interpreter.resolution | |
| vp_w = self.window_width if self.window_width else res[0] | |
| vp_h = self.window_height if self.window_height else res[1] | |
| self.status_label.setText( | |
| f"Stage: {stats['stage']} | " | |
| f"Saturation: {stats['percent']:.1f}% ({stats['saturated']}/{stats['total']}) | " | |
| f"Resolution: {res[0]}x{res[1]} | " | |
| f"Viewport: {vp_w}x{vp_h}" | |
| ) | |
| else: | |
| self.status_label.setText( | |
| f"Stage: {stats['stage']} | Waiting for first META chunk..." | |
| ) | |
| class StreamHarmonizer: | |
| """ | |
| Handles buffer synchronization for audio/video/data alignment | |
| Based on META markers from StreamInterpreter | |
| """ | |
| def __init__(self): | |
| from collections import deque | |
| self.audio_buffer = deque() | |
| self.video_buffer = deque() | |
| self.data_buffer = deque() | |
| self.meta_sequence = [] | |
| self.logger = logging.getLogger('StreamHarmonizer') | |
| def register_meta_marker(self, marker_data): | |
| """ | |
| Register a META marker for synchronization | |
| Args: | |
| marker_data: Marker data from StreamInterpreter | |
| """ | |
| self.meta_sequence.append(marker_data) | |
| self.logger.debug(f"META marker registered: Heat={marker_data.get('heat_signature', 'N/A')}") | |
| def synchronize_buffers(self, audio_chunk, video_chunk, data_chunk, meta_markers): | |
| """ | |
| Synchronize buffers based on META markers | |
| Args: | |
| audio_chunk: Audio data chunk | |
| video_chunk: Video data chunk | |
| data_chunk: Data chunk | |
| meta_markers: List of META markers from interpreter | |
| Returns: | |
| synchronized: Dictionary with aligned buffers | |
| """ | |
| # Add chunks to respective buffers | |
| if audio_chunk is not None: | |
| self.audio_buffer.append(audio_chunk) | |
| if video_chunk is not None: | |
| self.video_buffer.append(video_chunk) | |
| if data_chunk is not None: | |
| self.data_buffer.append(data_chunk) | |
| # Align buffers based on META marker positions | |
| if meta_markers: | |
| # Use latest META marker as sync point | |
| sync_point = meta_markers[-1] | |
| # Ensure all buffers are aligned to this marker | |
| min_buffer_size = min( | |
| len(self.audio_buffer), | |
| len(self.video_buffer), | |
| len(self.data_buffer) | |
| ) | |
| # Trim buffers to sync point if needed | |
| while len(self.audio_buffer) > min_buffer_size: | |
| self.audio_buffer.popleft() | |
| while len(self.video_buffer) > min_buffer_size: | |
| self.video_buffer.popleft() | |
| while len(self.data_buffer) > min_buffer_size: | |
| self.data_buffer.popleft() | |
| heat_sig = sync_point.get('heat_signature', 'unknown') | |
| self.logger.info( | |
| f"Buffers synchronized at META marker Heat={heat_sig}, " | |
| f"Buffer sizes: Audio={len(self.audio_buffer)}, " | |
| f"Video={len(self.video_buffer)}, Data={len(self.data_buffer)}" | |
| ) | |
| return { | |
| 'audio': list(self.audio_buffer), | |
| 'video': list(self.video_buffer), | |
| 'data': list(self.data_buffer), | |
| 'sync_markers': meta_markers | |
| } | |