Spaces:
Running
Running
| """ | |
| Reactive Space Node - A simplified, audio-reactive version of the | |
| earth19.py particle simulation. | |
| Does not use Pygame, Torch, or OpenGL. | |
| """ | |
| import numpy as np | |
| from PyQt6 import QtGui | |
| import cv2 | |
| import sys | |
| import os | |
| # --- This is the new, correct block --- | |
| import __main__ | |
| BaseNode = __main__.BaseNode | |
| PA_INSTANCE = getattr(__main__, "PA_INSTANCE", None) | |
| QtGui = __main__.QtGui | |
| # ------------------------------------ | |
| # --- Color Map Dictionary --- | |
| # Maps string names to OpenCV colormap constants | |
| CMAP_DICT = { | |
| "gray": None, # Special case for no colormap | |
| "plasma": cv2.COLORMAP_PLASMA, | |
| "viridis": cv2.COLORMAP_VIRIDIS, | |
| "inferno": cv2.COLORMAP_INFERNO, | |
| "magma": cv2.COLORMAP_MAGMA, | |
| "hot": cv2.COLORMAP_HOT, | |
| "jet": cv2.COLORMAP_JET | |
| } | |
| class ReactiveSpaceNode(BaseNode): | |
| NODE_CATEGORY = "Source" | |
| NODE_COLOR = QtGui.QColor(50, 80, 160) # Deep space blue | |
| def __init__(self, particle_count=200, width=160, height=120, color_scheme='plasma'): | |
| super().__init__() | |
| self.node_title = "Reactive Space" | |
| # --- Inputs for audio-reactivity --- | |
| self.inputs = { | |
| 'bass_in': 'signal', # Controls Sun/Attractor | |
| 'highs_in': 'signal' # Controls Stars/Particles | |
| } | |
| self.outputs = {'image': 'image', 'signal': 'signal'} | |
| self.w, self.h = width, height | |
| self.particle_count = int(particle_count) | |
| # --- Color scheme --- | |
| self.color_scheme = str(color_scheme) | |
| # Particle state | |
| self.positions = np.random.rand(self.particle_count, 2).astype(np.float32) * [self.w, self.h] | |
| self.velocities = (np.random.rand(self.particle_count, 2).astype(np.float32) - 0.5) * 2.0 | |
| # The "density" image | |
| self.space = np.zeros((self.h, self.w), dtype=np.float32) | |
| self.display_img = np.zeros((self.h, self.w), dtype=np.float32) | |
| # Track last dimensions to detect resizing (NEW) | |
| self._last_w = self.w | |
| self._last_h = self.h | |
| self.time = 0.0 | |
| def _check_and_resize_arrays(self): | |
| """Reinitialize arrays if dimensions changed (NEW HELPER)""" | |
| if self.w != self._last_w or self.h != self._last_h: | |
| # Dimensions changed - reinitialize arrays | |
| old_space = self.space | |
| # Create new arrays | |
| self.space = np.zeros((self.h, self.w), dtype=np.float32) | |
| self.display_img = np.zeros((self.h, self.w), dtype=np.float32) | |
| # Try to preserve old content (resize it) | |
| try: | |
| # Resize old_space content to fit the new dimensions | |
| self.space = cv2.resize(old_space, (self.w, self.h), interpolation=cv2.INTER_LINEAR) | |
| except Exception: | |
| # If resize fails (e.g., old_space was empty or invalid), just use zeros | |
| pass | |
| # Clamp all particle positions to new bounds | |
| self.positions[:, 0] = np.clip(self.positions[:, 0], 0, self.w - 1) | |
| self.positions[:, 1] = np.clip(self.positions[:, 1], 0, self.h - 1) | |
| # Update tracking | |
| self._last_w = self.w | |
| self._last_h = self.h | |
| def step(self): | |
| # FIX: Check if node was resized and update arrays | |
| self._check_and_resize_arrays() | |
| self.time += 0.01 | |
| # --- Get audio-reactive signals --- | |
| bass_energy = self.get_blended_input('bass_in', 'sum') or 0.0 | |
| highs_energy = self.get_blended_input('highs_in', 'sum') or 0.0 | |
| # Central attractor | |
| attractor_pos = np.array([ | |
| self.w / 2 + np.sin(self.time * 0.5) * self.w * 0.3, | |
| self.h / 2 + np.cos(self.time * 0.3) * self.h * 0.3 | |
| ]) | |
| # Calculate forces (simple gravity) | |
| to_attractor = attractor_pos - self.positions | |
| dist_sq = np.sum(to_attractor**2, axis=1, keepdims=True) + 1e-3 | |
| base_gravity = 5.0 | |
| sun_pulse_strength = 1.0 + (bass_energy * 5.0) | |
| force = to_attractor / dist_sq * (base_gravity * sun_pulse_strength) | |
| # Update velocities | |
| self.velocities += force * 0.1 | |
| star_jiggle = (np.random.rand(self.particle_count, 2) - 0.5) * (highs_energy * 0.5) | |
| self.velocities += star_jiggle | |
| self.velocities *= 0.98 | |
| # Update positions | |
| self.positions += self.velocities | |
| # Clamp positions to valid range | |
| self.positions[:, 0] = np.clip(self.positions[:, 0], 0, self.w - 1) | |
| self.positions[:, 1] = np.clip(self.positions[:, 1], 0, self.h - 1) | |
| # Bounce velocities when hitting walls | |
| mask_x_low = self.positions[:, 0] <= 0 | |
| mask_x_high = self.positions[:, 0] >= self.w - 1 | |
| mask_y_low = self.positions[:, 1] <= 0 | |
| mask_y_high = self.positions[:, 1] >= self.h - 1 | |
| self.velocities[mask_x_low | mask_x_high, 0] *= -0.5 | |
| self.velocities[mask_y_low | mask_y_high, 1] *= -0.5 | |
| # Update the density image | |
| self.space *= 0.9 | |
| # Get integer positions | |
| int_pos = self.positions.astype(int) | |
| # Validate positions | |
| valid = (int_pos[:, 0] >= 0) & (int_pos[:, 0] < self.w) & \ | |
| (int_pos[:, 1] >= 0) & (int_pos[:, 1] < self.h) | |
| valid_pos = int_pos[valid] | |
| # "Splat" particles onto the image | |
| if valid_pos.shape[0] > 0: | |
| y_coords = np.clip(valid_pos[:, 1], 0, self.h - 1) | |
| x_coords = np.clip(valid_pos[:, 0], 0, self.w - 1) | |
| # Use assignment to set the density at particle locations | |
| self.space[y_coords, x_coords] = 1.0 | |
| # Blur to make it look like a density field | |
| self.display_img = cv2.GaussianBlur(self.space, (5, 5), 0) | |
| def get_output(self, port_name): | |
| if port_name == 'image': | |
| return self.display_img | |
| elif port_name == 'signal': | |
| # Output mean velocity as a signal | |
| return np.mean(np.linalg.norm(self.velocities, axis=1)) | |
| return None | |
| def get_display_image(self): | |
| # FIX: Use the actual current dimensions of the arrays for QImage creation. | |
| img_u8 = (np.clip(self.display_img, 0, 1) * 255).astype(np.uint8) | |
| cmap_cv2 = CMAP_DICT.get(self.color_scheme) | |
| if cmap_cv2 is not None: | |
| # Apply CV2 colormap | |
| img_color = cv2.applyColorMap(img_u8, cmap_cv2) | |
| img_color = np.ascontiguousarray(img_color) | |
| h, w = img_color.shape[:2] | |
| return QtGui.QImage(img_color.data, w, h, 3*w, QtGui.QImage.Format.Format_BGR888) | |
| else: | |
| # Just return grayscale at ACTUAL size | |
| img_u8 = np.ascontiguousarray(img_u8) | |
| h, w = img_u8.shape | |
| return QtGui.QImage(img_u8.data, w, h, w, QtGui.QImage.Format.Format_Grayscale8) | |
| def get_config_options(self): | |
| # Create color scheme options for the dropdown | |
| color_options = [(name.title(), name) for name in CMAP_DICT.keys()] | |
| return [ | |
| ("Particle Count", "particle_count", self.particle_count, None), | |
| ("Color Scheme", "color_scheme", self.color_scheme, color_options), | |
| ] | |