PerceptionLabPortable / app /nodes /PCAscannernode.py
Aluode's picture
Upload folder using huggingface_hub
3bb804c verified
"""
PC Scanner Node - Automatically scans through all principal components
Creates a contact sheet showing what each PC controls
"""
import numpy as np
from PyQt6 import QtGui
import cv2
import __main__
BaseNode = __main__.BaseNode
QtGui = __main__.QtGui
class PCScannerNode(BaseNode):
"""
Systematically scans through all PCs to visualize their effects.
Creates a grid showing: [PC0-, PC0+, PC1-, PC1+, ...]
"""
NODE_CATEGORY = "AI / Physics"
NODE_COLOR = QtGui.QColor(220, 180, 100)
def __init__(self, scan_amplitude=2.0, grid_cols=4):
super().__init__()
self.node_title = "PC Scanner"
self.inputs = {
'latent_in': 'spectrum',
'reconstructed_image': 'image', # Feedback from iFFT
'scan_speed': 'signal', # How fast to scan
'trigger': 'signal', # Start scan
'amplitude': 'signal' # How much to modify each PC
}
self.outputs = {
'latent_out': 'spectrum', # Modified latent for current scan
'contact_sheet': 'image', # The full grid
'current_pc': 'signal', # Which PC we're scanning
'progress': 'signal' # 0-1 scan progress
}
self.scan_amplitude = float(scan_amplitude)
self.grid_cols = int(grid_cols)
# Scanning state
self.is_scanning = False
self.scan_index = 0 # Which PC we're currently scanning
self.scan_direction = 1 # 1 for +, -1 for -
self.frame_counter = 0
self.frames_per_scan = 30 # How many frames to wait per PC
# Storage
self.base_latent = None
self.current_latent = None
self.captured_images = {} # {(pc_idx, direction): image}
self.contact_sheet = None
# Dimensions
self.cell_size = 64
def step(self):
# Get inputs
latent_in = self.get_blended_input('latent_in', 'first')
reconstructed = self.get_blended_input('reconstructed_image', 'mean')
trigger = self.get_blended_input('trigger', 'sum') or 0.0
scan_speed = self.get_blended_input('scan_speed', 'sum')
amplitude_signal = self.get_blended_input('amplitude', 'sum')
if amplitude_signal is not None:
amplitude = amplitude_signal * 5.0
else:
amplitude = self.scan_amplitude
if scan_speed is not None:
self.frames_per_scan = max(5, int(30 / (scan_speed + 0.1)))
# Store base latent
if latent_in is not None and self.base_latent is None:
self.base_latent = latent_in.copy()
self.current_latent = latent_in.copy()
# Trigger scan
if trigger > 0.5 and not self.is_scanning:
self.start_scan()
# Scanning logic
if self.is_scanning and self.base_latent is not None:
self.frame_counter += 1
# Capture reconstructed image
if reconstructed is not None and self.frame_counter > 5: # Wait a few frames for stabilization
key = (self.scan_index, self.scan_direction)
if key not in self.captured_images:
# Resize and store
img_resized = cv2.resize(reconstructed, (self.cell_size, self.cell_size))
self.captured_images[key] = img_resized.copy()
# Time to move to next scan?
if self.frame_counter >= self.frames_per_scan:
self.advance_scan()
# Generate current modified latent
self.current_latent = self.base_latent.copy()
if self.scan_index < len(self.base_latent):
self.current_latent[self.scan_index] += amplitude * self.scan_direction
# Build contact sheet
if len(self.captured_images) > 0:
self.build_contact_sheet()
def start_scan(self):
"""Start a new scan"""
self.is_scanning = True
self.scan_index = 0
self.scan_direction = -1 # Start with negative
self.frame_counter = 0
self.captured_images = {}
print("PC Scanner: Starting scan...")
def advance_scan(self):
"""Move to next PC/direction"""
self.frame_counter = 0
if self.scan_direction == -1:
# Switch to positive
self.scan_direction = 1
else:
# Move to next PC
self.scan_direction = -1
self.scan_index += 1
# Check if scan complete
if self.scan_index >= len(self.base_latent):
self.is_scanning = False
self.current_latent = self.base_latent.copy()
print(f"PC Scanner: Scan complete! Captured {len(self.captured_images)} images.")
def build_contact_sheet(self):
"""Build the grid visualization"""
num_pcs = len(self.base_latent) if self.base_latent is not None else 8
# Calculate grid dimensions
# Each PC gets 2 cells (- and +)
total_cells = num_pcs * 2
rows = (total_cells + self.grid_cols - 1) // self.grid_cols
# Create canvas
canvas_width = self.grid_cols * self.cell_size
canvas_height = rows * self.cell_size
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.float32)
# Fill grid
cell_idx = 0
for pc_idx in range(num_pcs):
for direction in [-1, 1]:
key = (pc_idx, direction)
row = cell_idx // self.grid_cols
col = cell_idx % self.grid_cols
y_start = row * self.cell_size
x_start = col * self.cell_size
if key in self.captured_images:
img = self.captured_images[key]
# Ensure correct shape
if img.ndim == 2:
img = np.stack([img, img, img], axis=-1)
elif img.shape[2] == 1:
img = np.repeat(img, 3, axis=2)
canvas[y_start:y_start+self.cell_size,
x_start:x_start+self.cell_size] = img
else:
# Empty cell - draw placeholder
cv2.rectangle(canvas, (x_start, y_start),
(x_start+self.cell_size-1, y_start+self.cell_size-1),
(0.2, 0.2, 0.2), 1)
# Label
label = f"PC{pc_idx}" + ("-" if direction == -1 else "+")
cv2.putText(canvas, label, (x_start+2, y_start+12),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, (1, 1, 1), 1)
# Highlight current scan position
if self.is_scanning and pc_idx == self.scan_index and direction == self.scan_direction:
cv2.rectangle(canvas, (x_start, y_start),
(x_start+self.cell_size-1, y_start+self.cell_size-1),
(0, 1, 0), 2)
cell_idx += 1
self.contact_sheet = canvas
def get_output(self, port_name):
if port_name == 'latent_out':
return self.current_latent
elif port_name == 'contact_sheet':
return self.contact_sheet
elif port_name == 'current_pc':
return float(self.scan_index) if self.is_scanning else -1.0
elif port_name == 'progress':
if self.base_latent is not None and self.is_scanning:
total = len(self.base_latent) * 2
current = self.scan_index * 2 + (0 if self.scan_direction == -1 else 1)
return current / total
return 0.0
return None
def get_display_image(self):
if self.contact_sheet is not None:
# Display the contact sheet
img = (np.clip(self.contact_sheet, 0, 1) * 255).astype(np.uint8)
h, w = img.shape[:2]
return QtGui.QImage(img.data, w, h, w*3, QtGui.QImage.Format.Format_RGB888)
else:
# Show status
img = np.zeros((256, 256, 3), dtype=np.uint8)
if self.is_scanning:
status = f"Scanning PC{self.scan_index}{'-' if self.scan_direction == -1 else '+'}"
progress = int(self.get_output('progress') * 100)
cv2.putText(img, status, (10, 128), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0,255,0), 2)
cv2.putText(img, f"{progress}%", (10, 160), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (255,255,255), 1)
# Progress bar
bar_width = int(256 * self.get_output('progress'))
cv2.rectangle(img, (0, 240), (bar_width, 256), (0,255,0), -1)
else:
cv2.putText(img, "Ready to scan", (10, 128), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255,255,255), 1)
cv2.putText(img, "Send trigger signal", (10, 160), cv2.FONT_HERSHEY_SIMPLEX,
0.4, (200,200,200), 1)
return QtGui.QImage(img.data, 256, 256, 256*3, QtGui.QImage.Format.Format_RGB888)
def get_config_options(self):
return [
("Scan Amplitude", "scan_amplitude", self.scan_amplitude, None),
("Grid Columns", "grid_cols", self.grid_cols, None)
]