LOGOS-SPCW-Matroska / logos /playback_window.py
GitHub Copilot
Refactor: Restructure into Machine Shop protocol (logos package, gradio ui)
ac73ca8
"""
LOGOS Playback Window - UI Shell (SPCW Cake/Bake Protocol)
Displays interpreter output with fixed viewport and bicubic interpolation
META: Geometric/Grayscale structure
DELTA: Thermal color palette (Blue->Red)
"""
import numpy as np
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QLabel
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtGui import QImage, QPixmap
from PIL import Image
import logging
from collections import deque
class StreamRenderer(QThread):
"""
Worker thread for rendering stream data (Bake Renderer)
Converts interpreter output to RGB image buffer
"""
frame_ready = pyqtSignal(np.ndarray, int, int, str) # frame_data, width, height, heat_signature
def __init__(self, interpreter_output):
super().__init__()
self.interpreter_output = interpreter_output
self.logger = logging.getLogger('StreamRenderer')
def run(self):
"""Render interpreter output to RGB buffer"""
wave_payload = self.interpreter_output['wave_payload']
chunk_type = self.interpreter_output['chunk_type']
render_buffer_size = self.interpreter_output['render_buffer_size']
heat_signature = self.interpreter_output['heat_signature']
# Create image from wave payload
image_data = self._render_chunk(wave_payload, render_buffer_size, chunk_type)
# Emit rendered frame
self.frame_ready.emit(image_data, render_buffer_size, render_buffer_size, heat_signature)
def _render_chunk(self, wave_payload, size, chunk_type):
"""
Render chunk based on type (META or DELTA)
Args:
wave_payload: bytes (508 bytes)
size: Target image size (width/height)
chunk_type: ChunkType.META or ChunkType.DELTA
Returns:
numpy array of shape (size, size, 3) with RGB values
"""
if chunk_type.value == "META":
# META: Render as Structure (Geometric/Grayscale)
return self._render_meta_structure(wave_payload, size)
else:
# DELTA: Render as Heat (Thermal color palette)
return self._render_delta_heat(wave_payload, size)
def _render_meta_structure(self, wave_payload, size):
"""
Render META chunk as Structure (Geometric/Grayscale)
Maps byte values to geometric grid coordinates or grayscale intensity
"""
image = np.zeros((size, size, 3), dtype=np.uint8)
if not wave_payload or len(wave_payload) == 0:
return image
payload_array = np.frombuffer(wave_payload, dtype=np.uint8)
# Create geometric structure mapping
# Strategy: Map 508 bytes to 2D grid with grayscale intensity
# Calculate grid dimensions (close to square)
grid_size = int(np.sqrt(len(payload_array))) + 1
grid_size = min(grid_size, size) # Don't exceed render size
# Map payload bytes to grid coordinates
for i, byte_value in enumerate(payload_array):
if i >= grid_size * grid_size:
break
y = (i // grid_size) % size
x = (i % grid_size) % size
# Grayscale intensity from byte value
gray = byte_value
image[y, x] = [gray, gray, gray]
# For remaining pixels, fill with geometric patterns
# Create wave-like structures from byte patterns
if len(payload_array) < size * size:
remaining_start = len(payload_array)
for i in range(remaining_start, size * size):
y = i // size
x = i % size
# Geometric pattern based on position and payload
pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0
base_value = payload_array[pattern_idx] if len(payload_array) > 0 else 128
# Add geometric structure (wave patterns)
wave_pattern = int(127 * np.sin(x * 0.1) * np.cos(y * 0.1)) + 128
final_value = (base_value + wave_pattern) // 2
final_value = max(0, min(255, final_value))
image[y, x] = [final_value, final_value, final_value]
return image
def _render_delta_heat(self, wave_payload, size):
"""
Render DELTA chunk as Heat (Thermal color palette: Blue->Red)
Maps byte values to thermal color visualization
"""
image = np.zeros((size, size, 3), dtype=np.uint8)
if not wave_payload or len(wave_payload) == 0:
return image
payload_array = np.frombuffer(wave_payload, dtype=np.uint8)
# Normalize payload to [0, 1] for thermal mapping
if payload_array.max() != payload_array.min():
normalized = (payload_array.astype(np.float32) - payload_array.min()) / (
payload_array.max() - payload_array.min()
)
else:
normalized = np.full(len(payload_array), 0.5, dtype=np.float32)
# Thermal color palette: Blue (cold) -> Cyan -> Yellow -> Red (hot)
# Map normalized [0, 1] to RGB thermal colors
for i, heat_value in enumerate(normalized):
if i >= size * size:
break
y = i // size
x = i % size
# Thermal color mapping
r, g, b = self._thermal_color(heat_value)
image[y, x] = [r, g, b]
# Fill remaining pixels with heat gradient
if len(payload_array) < size * size:
remaining_start = len(payload_array)
for i in range(remaining_start, size * size):
y = i // size
x = i % size
# Create heat gradient from payload pattern
pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0
base_heat = normalized[pattern_idx] if len(payload_array) > 0 else 0.5
# Add phase hole noise effect
noise = ((x + y) % 256) / 255.0 * 0.2
heat_value = np.clip(base_heat + noise, 0.0, 1.0)
r, g, b = self._thermal_color(heat_value)
image[y, x] = [r, g, b]
return image
def _thermal_color(self, heat_value):
"""
Convert heat value [0, 1] to thermal RGB color
Blue (cold, 0.0) -> Cyan -> Yellow -> Red (hot, 1.0)
Args:
heat_value: Float in [0, 1]
Returns:
(r, g, b) tuple
"""
heat_value = np.clip(heat_value, 0.0, 1.0)
if heat_value < 0.25:
# Blue to Cyan
t = heat_value / 0.25
r = 0
g = int(255 * t)
b = 255
elif heat_value < 0.5:
# Cyan to Yellow
t = (heat_value - 0.25) / 0.25
r = int(255 * t)
g = 255
b = int(255 * (1 - t))
elif heat_value < 0.75:
# Yellow to Orange
t = (heat_value - 0.5) / 0.25
r = 255
g = int(255 * (1 - t * 0.5))
b = 0
else:
# Orange to Red
t = (heat_value - 0.75) / 0.25
r = 255
g = int(255 * (1 - t) * 0.5)
b = 0
return (r, g, b)
class PlaybackWindow(QMainWindow):
"""
Playback Window with fixed viewport that displays state-based reconstruction
Uses LogosDisplayInterpreter for persistent canvas state updates
"""
def __init__(self, display_interpreter, window_width=None, window_height=None, parent=None):
super().__init__(parent)
self.display_interpreter = display_interpreter
self.window_width = window_width
self.window_height = window_height
self.logger = logging.getLogger('PlaybackWindow')
# Setup UI
self.init_ui()
def init_ui(self):
"""Initialize the user interface"""
self.setWindowTitle("LOGOS Playback Interpreter - State Saturation Engine")
if self.window_width and self.window_height:
self.setGeometry(100, 100, self.window_width, self.window_height)
else:
self.setGeometry(100, 100, 1024, 768)
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Layout
layout = QVBoxLayout()
central_widget.setLayout(layout)
# Viewport label for displaying frames
self.viewport = QLabel()
self.viewport.setAlignment(Qt.AlignCenter)
self.viewport.setStyleSheet("background-color: black;")
if self.window_width and self.window_height:
self.viewport.setMinimumSize(self.window_width, self.window_height)
layout.addWidget(self.viewport)
# Status label
self.status_label = QLabel("Waiting for stream data...")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
def update_display(self):
"""Update viewport from display interpreter state"""
# Get viewport frame (scaled with saturation overlay)
target_size = (
self.window_width if self.window_width else self.display_interpreter.resolution[0],
self.window_height if self.window_height else self.display_interpreter.resolution[1],
)
viewport_frame = self.display_interpreter.get_viewport_frame(target_size)
# Convert PIL Image to QPixmap
qimage = QImage(
viewport_frame.tobytes(),
target_size[0],
target_size[1],
QImage.Format_RGB888
)
pixmap = QPixmap.fromImage(qimage)
# Display in viewport
self.viewport.setPixmap(pixmap)
# Update status with saturation info
stats = self.display_interpreter.get_saturation_stats()
if self.display_interpreter.resolution:
res = self.display_interpreter.resolution
vp_w = self.window_width if self.window_width else res[0]
vp_h = self.window_height if self.window_height else res[1]
self.status_label.setText(
f"Stage: {stats['stage']} | "
f"Saturation: {stats['percent']:.1f}% ({stats['saturated']}/{stats['total']}) | "
f"Resolution: {res[0]}x{res[1]} | "
f"Viewport: {vp_w}x{vp_h}"
)
else:
self.status_label.setText(
f"Stage: {stats['stage']} | Waiting for first META chunk..."
)
class StreamHarmonizer:
"""
Handles buffer synchronization for audio/video/data alignment
Based on META markers from StreamInterpreter
"""
def __init__(self):
from collections import deque
self.audio_buffer = deque()
self.video_buffer = deque()
self.data_buffer = deque()
self.meta_sequence = []
self.logger = logging.getLogger('StreamHarmonizer')
def register_meta_marker(self, marker_data):
"""
Register a META marker for synchronization
Args:
marker_data: Marker data from StreamInterpreter
"""
self.meta_sequence.append(marker_data)
self.logger.debug(f"META marker registered: Heat={marker_data.get('heat_signature', 'N/A')}")
def synchronize_buffers(self, audio_chunk, video_chunk, data_chunk, meta_markers):
"""
Synchronize buffers based on META markers
Args:
audio_chunk: Audio data chunk
video_chunk: Video data chunk
data_chunk: Data chunk
meta_markers: List of META markers from interpreter
Returns:
synchronized: Dictionary with aligned buffers
"""
# Add chunks to respective buffers
if audio_chunk is not None:
self.audio_buffer.append(audio_chunk)
if video_chunk is not None:
self.video_buffer.append(video_chunk)
if data_chunk is not None:
self.data_buffer.append(data_chunk)
# Align buffers based on META marker positions
if meta_markers:
# Use latest META marker as sync point
sync_point = meta_markers[-1]
# Ensure all buffers are aligned to this marker
min_buffer_size = min(
len(self.audio_buffer),
len(self.video_buffer),
len(self.data_buffer)
)
# Trim buffers to sync point if needed
while len(self.audio_buffer) > min_buffer_size:
self.audio_buffer.popleft()
while len(self.video_buffer) > min_buffer_size:
self.video_buffer.popleft()
while len(self.data_buffer) > min_buffer_size:
self.data_buffer.popleft()
heat_sig = sync_point.get('heat_signature', 'unknown')
self.logger.info(
f"Buffers synchronized at META marker Heat={heat_sig}, "
f"Buffer sizes: Audio={len(self.audio_buffer)}, "
f"Video={len(self.video_buffer)}, Data={len(self.data_buffer)}"
)
return {
'audio': list(self.audio_buffer),
'video': list(self.video_buffer),
'data': list(self.data_buffer),
'sync_markers': meta_markers
}