Spaces:
Runtime error
Runtime error
File size: 13,887 Bytes
ac73ca8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
"""
LOGOS Playback Window - UI Shell (SPCW Cake/Bake Protocol)
Displays interpreter output with fixed viewport and bicubic interpolation
META: Geometric/Grayscale structure
DELTA: Thermal color palette (Blue->Red)
"""
import numpy as np
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QLabel
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtGui import QImage, QPixmap
from PIL import Image
import logging
from collections import deque
class StreamRenderer(QThread):
"""
Worker thread for rendering stream data (Bake Renderer)
Converts interpreter output to RGB image buffer
"""
frame_ready = pyqtSignal(np.ndarray, int, int, str) # frame_data, width, height, heat_signature
def __init__(self, interpreter_output):
super().__init__()
self.interpreter_output = interpreter_output
self.logger = logging.getLogger('StreamRenderer')
def run(self):
"""Render interpreter output to RGB buffer"""
wave_payload = self.interpreter_output['wave_payload']
chunk_type = self.interpreter_output['chunk_type']
render_buffer_size = self.interpreter_output['render_buffer_size']
heat_signature = self.interpreter_output['heat_signature']
# Create image from wave payload
image_data = self._render_chunk(wave_payload, render_buffer_size, chunk_type)
# Emit rendered frame
self.frame_ready.emit(image_data, render_buffer_size, render_buffer_size, heat_signature)
def _render_chunk(self, wave_payload, size, chunk_type):
"""
Render chunk based on type (META or DELTA)
Args:
wave_payload: bytes (508 bytes)
size: Target image size (width/height)
chunk_type: ChunkType.META or ChunkType.DELTA
Returns:
numpy array of shape (size, size, 3) with RGB values
"""
if chunk_type.value == "META":
# META: Render as Structure (Geometric/Grayscale)
return self._render_meta_structure(wave_payload, size)
else:
# DELTA: Render as Heat (Thermal color palette)
return self._render_delta_heat(wave_payload, size)
def _render_meta_structure(self, wave_payload, size):
"""
Render META chunk as Structure (Geometric/Grayscale)
Maps byte values to geometric grid coordinates or grayscale intensity
"""
image = np.zeros((size, size, 3), dtype=np.uint8)
if not wave_payload or len(wave_payload) == 0:
return image
payload_array = np.frombuffer(wave_payload, dtype=np.uint8)
# Create geometric structure mapping
# Strategy: Map 508 bytes to 2D grid with grayscale intensity
# Calculate grid dimensions (close to square)
grid_size = int(np.sqrt(len(payload_array))) + 1
grid_size = min(grid_size, size) # Don't exceed render size
# Map payload bytes to grid coordinates
for i, byte_value in enumerate(payload_array):
if i >= grid_size * grid_size:
break
y = (i // grid_size) % size
x = (i % grid_size) % size
# Grayscale intensity from byte value
gray = byte_value
image[y, x] = [gray, gray, gray]
# For remaining pixels, fill with geometric patterns
# Create wave-like structures from byte patterns
if len(payload_array) < size * size:
remaining_start = len(payload_array)
for i in range(remaining_start, size * size):
y = i // size
x = i % size
# Geometric pattern based on position and payload
pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0
base_value = payload_array[pattern_idx] if len(payload_array) > 0 else 128
# Add geometric structure (wave patterns)
wave_pattern = int(127 * np.sin(x * 0.1) * np.cos(y * 0.1)) + 128
final_value = (base_value + wave_pattern) // 2
final_value = max(0, min(255, final_value))
image[y, x] = [final_value, final_value, final_value]
return image
def _render_delta_heat(self, wave_payload, size):
"""
Render DELTA chunk as Heat (Thermal color palette: Blue->Red)
Maps byte values to thermal color visualization
"""
image = np.zeros((size, size, 3), dtype=np.uint8)
if not wave_payload or len(wave_payload) == 0:
return image
payload_array = np.frombuffer(wave_payload, dtype=np.uint8)
# Normalize payload to [0, 1] for thermal mapping
if payload_array.max() != payload_array.min():
normalized = (payload_array.astype(np.float32) - payload_array.min()) / (
payload_array.max() - payload_array.min()
)
else:
normalized = np.full(len(payload_array), 0.5, dtype=np.float32)
# Thermal color palette: Blue (cold) -> Cyan -> Yellow -> Red (hot)
# Map normalized [0, 1] to RGB thermal colors
for i, heat_value in enumerate(normalized):
if i >= size * size:
break
y = i // size
x = i % size
# Thermal color mapping
r, g, b = self._thermal_color(heat_value)
image[y, x] = [r, g, b]
# Fill remaining pixels with heat gradient
if len(payload_array) < size * size:
remaining_start = len(payload_array)
for i in range(remaining_start, size * size):
y = i // size
x = i % size
# Create heat gradient from payload pattern
pattern_idx = (y * size + x) % len(payload_array) if len(payload_array) > 0 else 0
base_heat = normalized[pattern_idx] if len(payload_array) > 0 else 0.5
# Add phase hole noise effect
noise = ((x + y) % 256) / 255.0 * 0.2
heat_value = np.clip(base_heat + noise, 0.0, 1.0)
r, g, b = self._thermal_color(heat_value)
image[y, x] = [r, g, b]
return image
def _thermal_color(self, heat_value):
"""
Convert heat value [0, 1] to thermal RGB color
Blue (cold, 0.0) -> Cyan -> Yellow -> Red (hot, 1.0)
Args:
heat_value: Float in [0, 1]
Returns:
(r, g, b) tuple
"""
heat_value = np.clip(heat_value, 0.0, 1.0)
if heat_value < 0.25:
# Blue to Cyan
t = heat_value / 0.25
r = 0
g = int(255 * t)
b = 255
elif heat_value < 0.5:
# Cyan to Yellow
t = (heat_value - 0.25) / 0.25
r = int(255 * t)
g = 255
b = int(255 * (1 - t))
elif heat_value < 0.75:
# Yellow to Orange
t = (heat_value - 0.5) / 0.25
r = 255
g = int(255 * (1 - t * 0.5))
b = 0
else:
# Orange to Red
t = (heat_value - 0.75) / 0.25
r = 255
g = int(255 * (1 - t) * 0.5)
b = 0
return (r, g, b)
class PlaybackWindow(QMainWindow):
"""
Playback Window with fixed viewport that displays state-based reconstruction
Uses LogosDisplayInterpreter for persistent canvas state updates
"""
def __init__(self, display_interpreter, window_width=None, window_height=None, parent=None):
super().__init__(parent)
self.display_interpreter = display_interpreter
self.window_width = window_width
self.window_height = window_height
self.logger = logging.getLogger('PlaybackWindow')
# Setup UI
self.init_ui()
def init_ui(self):
"""Initialize the user interface"""
self.setWindowTitle("LOGOS Playback Interpreter - State Saturation Engine")
if self.window_width and self.window_height:
self.setGeometry(100, 100, self.window_width, self.window_height)
else:
self.setGeometry(100, 100, 1024, 768)
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Layout
layout = QVBoxLayout()
central_widget.setLayout(layout)
# Viewport label for displaying frames
self.viewport = QLabel()
self.viewport.setAlignment(Qt.AlignCenter)
self.viewport.setStyleSheet("background-color: black;")
if self.window_width and self.window_height:
self.viewport.setMinimumSize(self.window_width, self.window_height)
layout.addWidget(self.viewport)
# Status label
self.status_label = QLabel("Waiting for stream data...")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
def update_display(self):
"""Update viewport from display interpreter state"""
# Get viewport frame (scaled with saturation overlay)
target_size = (
self.window_width if self.window_width else self.display_interpreter.resolution[0],
self.window_height if self.window_height else self.display_interpreter.resolution[1],
)
viewport_frame = self.display_interpreter.get_viewport_frame(target_size)
# Convert PIL Image to QPixmap
qimage = QImage(
viewport_frame.tobytes(),
target_size[0],
target_size[1],
QImage.Format_RGB888
)
pixmap = QPixmap.fromImage(qimage)
# Display in viewport
self.viewport.setPixmap(pixmap)
# Update status with saturation info
stats = self.display_interpreter.get_saturation_stats()
if self.display_interpreter.resolution:
res = self.display_interpreter.resolution
vp_w = self.window_width if self.window_width else res[0]
vp_h = self.window_height if self.window_height else res[1]
self.status_label.setText(
f"Stage: {stats['stage']} | "
f"Saturation: {stats['percent']:.1f}% ({stats['saturated']}/{stats['total']}) | "
f"Resolution: {res[0]}x{res[1]} | "
f"Viewport: {vp_w}x{vp_h}"
)
else:
self.status_label.setText(
f"Stage: {stats['stage']} | Waiting for first META chunk..."
)
class StreamHarmonizer:
"""
Handles buffer synchronization for audio/video/data alignment
Based on META markers from StreamInterpreter
"""
def __init__(self):
from collections import deque
self.audio_buffer = deque()
self.video_buffer = deque()
self.data_buffer = deque()
self.meta_sequence = []
self.logger = logging.getLogger('StreamHarmonizer')
def register_meta_marker(self, marker_data):
"""
Register a META marker for synchronization
Args:
marker_data: Marker data from StreamInterpreter
"""
self.meta_sequence.append(marker_data)
self.logger.debug(f"META marker registered: Heat={marker_data.get('heat_signature', 'N/A')}")
def synchronize_buffers(self, audio_chunk, video_chunk, data_chunk, meta_markers):
"""
Synchronize buffers based on META markers
Args:
audio_chunk: Audio data chunk
video_chunk: Video data chunk
data_chunk: Data chunk
meta_markers: List of META markers from interpreter
Returns:
synchronized: Dictionary with aligned buffers
"""
# Add chunks to respective buffers
if audio_chunk is not None:
self.audio_buffer.append(audio_chunk)
if video_chunk is not None:
self.video_buffer.append(video_chunk)
if data_chunk is not None:
self.data_buffer.append(data_chunk)
# Align buffers based on META marker positions
if meta_markers:
# Use latest META marker as sync point
sync_point = meta_markers[-1]
# Ensure all buffers are aligned to this marker
min_buffer_size = min(
len(self.audio_buffer),
len(self.video_buffer),
len(self.data_buffer)
)
# Trim buffers to sync point if needed
while len(self.audio_buffer) > min_buffer_size:
self.audio_buffer.popleft()
while len(self.video_buffer) > min_buffer_size:
self.video_buffer.popleft()
while len(self.data_buffer) > min_buffer_size:
self.data_buffer.popleft()
heat_sig = sync_point.get('heat_signature', 'unknown')
self.logger.info(
f"Buffers synchronized at META marker Heat={heat_sig}, "
f"Buffer sizes: Audio={len(self.audio_buffer)}, "
f"Video={len(self.video_buffer)}, Data={len(self.data_buffer)}"
)
return {
'audio': list(self.audio_buffer),
'video': list(self.video_buffer),
'data': list(self.data_buffer),
'sync_markers': meta_markers
}
|