PerceptionLabPortable / app /nodes /ReactiveSpaceNode.py
Aluode's picture
Upload folder using huggingface_hub
3bb804c verified
"""
Reactive Space Node - A simplified, audio-reactive version of the
earth19.py particle simulation.
Does not use Pygame, Torch, or OpenGL.
"""
import numpy as np
from PyQt6 import QtGui
import cv2
import sys
import os
# --- This is the new, correct block ---
import __main__
BaseNode = __main__.BaseNode
PA_INSTANCE = getattr(__main__, "PA_INSTANCE", None)
QtGui = __main__.QtGui
# ------------------------------------
# --- Color Map Dictionary ---
# Maps string names to OpenCV colormap constants
CMAP_DICT = {
"gray": None, # Special case for no colormap
"plasma": cv2.COLORMAP_PLASMA,
"viridis": cv2.COLORMAP_VIRIDIS,
"inferno": cv2.COLORMAP_INFERNO,
"magma": cv2.COLORMAP_MAGMA,
"hot": cv2.COLORMAP_HOT,
"jet": cv2.COLORMAP_JET
}
class ReactiveSpaceNode(BaseNode):
NODE_CATEGORY = "Source"
NODE_COLOR = QtGui.QColor(50, 80, 160) # Deep space blue
def __init__(self, particle_count=200, width=160, height=120, color_scheme='plasma'):
super().__init__()
self.node_title = "Reactive Space"
# --- Inputs for audio-reactivity ---
self.inputs = {
'bass_in': 'signal', # Controls Sun/Attractor
'highs_in': 'signal' # Controls Stars/Particles
}
self.outputs = {'image': 'image', 'signal': 'signal'}
self.w, self.h = width, height
self.particle_count = int(particle_count)
# --- Color scheme ---
self.color_scheme = str(color_scheme)
# Particle state
self.positions = np.random.rand(self.particle_count, 2).astype(np.float32) * [self.w, self.h]
self.velocities = (np.random.rand(self.particle_count, 2).astype(np.float32) - 0.5) * 2.0
# The "density" image
self.space = np.zeros((self.h, self.w), dtype=np.float32)
self.display_img = np.zeros((self.h, self.w), dtype=np.float32)
# Track last dimensions to detect resizing (NEW)
self._last_w = self.w
self._last_h = self.h
self.time = 0.0
def _check_and_resize_arrays(self):
"""Reinitialize arrays if dimensions changed (NEW HELPER)"""
if self.w != self._last_w or self.h != self._last_h:
# Dimensions changed - reinitialize arrays
old_space = self.space
# Create new arrays
self.space = np.zeros((self.h, self.w), dtype=np.float32)
self.display_img = np.zeros((self.h, self.w), dtype=np.float32)
# Try to preserve old content (resize it)
try:
# Resize old_space content to fit the new dimensions
self.space = cv2.resize(old_space, (self.w, self.h), interpolation=cv2.INTER_LINEAR)
except Exception:
# If resize fails (e.g., old_space was empty or invalid), just use zeros
pass
# Clamp all particle positions to new bounds
self.positions[:, 0] = np.clip(self.positions[:, 0], 0, self.w - 1)
self.positions[:, 1] = np.clip(self.positions[:, 1], 0, self.h - 1)
# Update tracking
self._last_w = self.w
self._last_h = self.h
def step(self):
# FIX: Check if node was resized and update arrays
self._check_and_resize_arrays()
self.time += 0.01
# --- Get audio-reactive signals ---
bass_energy = self.get_blended_input('bass_in', 'sum') or 0.0
highs_energy = self.get_blended_input('highs_in', 'sum') or 0.0
# Central attractor
attractor_pos = np.array([
self.w / 2 + np.sin(self.time * 0.5) * self.w * 0.3,
self.h / 2 + np.cos(self.time * 0.3) * self.h * 0.3
])
# Calculate forces (simple gravity)
to_attractor = attractor_pos - self.positions
dist_sq = np.sum(to_attractor**2, axis=1, keepdims=True) + 1e-3
base_gravity = 5.0
sun_pulse_strength = 1.0 + (bass_energy * 5.0)
force = to_attractor / dist_sq * (base_gravity * sun_pulse_strength)
# Update velocities
self.velocities += force * 0.1
star_jiggle = (np.random.rand(self.particle_count, 2) - 0.5) * (highs_energy * 0.5)
self.velocities += star_jiggle
self.velocities *= 0.98
# Update positions
self.positions += self.velocities
# Clamp positions to valid range
self.positions[:, 0] = np.clip(self.positions[:, 0], 0, self.w - 1)
self.positions[:, 1] = np.clip(self.positions[:, 1], 0, self.h - 1)
# Bounce velocities when hitting walls
mask_x_low = self.positions[:, 0] <= 0
mask_x_high = self.positions[:, 0] >= self.w - 1
mask_y_low = self.positions[:, 1] <= 0
mask_y_high = self.positions[:, 1] >= self.h - 1
self.velocities[mask_x_low | mask_x_high, 0] *= -0.5
self.velocities[mask_y_low | mask_y_high, 1] *= -0.5
# Update the density image
self.space *= 0.9
# Get integer positions
int_pos = self.positions.astype(int)
# Validate positions
valid = (int_pos[:, 0] >= 0) & (int_pos[:, 0] < self.w) & \
(int_pos[:, 1] >= 0) & (int_pos[:, 1] < self.h)
valid_pos = int_pos[valid]
# "Splat" particles onto the image
if valid_pos.shape[0] > 0:
y_coords = np.clip(valid_pos[:, 1], 0, self.h - 1)
x_coords = np.clip(valid_pos[:, 0], 0, self.w - 1)
# Use assignment to set the density at particle locations
self.space[y_coords, x_coords] = 1.0
# Blur to make it look like a density field
self.display_img = cv2.GaussianBlur(self.space, (5, 5), 0)
def get_output(self, port_name):
if port_name == 'image':
return self.display_img
elif port_name == 'signal':
# Output mean velocity as a signal
return np.mean(np.linalg.norm(self.velocities, axis=1))
return None
def get_display_image(self):
# FIX: Use the actual current dimensions of the arrays for QImage creation.
img_u8 = (np.clip(self.display_img, 0, 1) * 255).astype(np.uint8)
cmap_cv2 = CMAP_DICT.get(self.color_scheme)
if cmap_cv2 is not None:
# Apply CV2 colormap
img_color = cv2.applyColorMap(img_u8, cmap_cv2)
img_color = np.ascontiguousarray(img_color)
h, w = img_color.shape[:2]
return QtGui.QImage(img_color.data, w, h, 3*w, QtGui.QImage.Format.Format_BGR888)
else:
# Just return grayscale at ACTUAL size
img_u8 = np.ascontiguousarray(img_u8)
h, w = img_u8.shape
return QtGui.QImage(img_u8.data, w, h, w, QtGui.QImage.Format.Format_Grayscale8)
def get_config_options(self):
# Create color scheme options for the dropdown
color_options = [(name.title(), name) for name in CMAP_DICT.keys()]
return [
("Particle Count", "particle_count", self.particle_count, None),
("Color Scheme", "color_scheme", self.color_scheme, color_options),
]