PerceptionLabPortable / app /nodes /anttis_crystalmaker.py
Aluode's picture
Upload folder using huggingface_hub
3bb804c verified
"""
Antti's CrystalMaker Node - A 3D polyrhythmic field generator
Based on the PolyrhythmicSea class from crystal_kingdom.py
Place this file in the 'nodes' folder
"""
import numpy as np
from PyQt6 import QtGui
import cv2
import sys
import os
# --- This is the new, correct block ---
import __main__
BaseNode = __main__.BaseNode
PA_INSTANCE = getattr(__main__, "PA_INSTANCE", None)
# ------------------------------------
try:
# --- FIX: Change import to ndimage.convolve for periodic boundaries ---
from scipy.ndimage import convolve
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
print("Warning: CrystalMakerNode requires 'scipy'.")
print("Please run: pip install scipy")
class CrystalMakerNode(BaseNode):
NODE_CATEGORY = "Source"
NODE_COLOR = QtGui.QColor(100, 200, 250) # Crystalline blue
def __init__(self, grid_size=32, num_fields=10):
super().__init__()
self.node_title = "Antti's CrystalMaker"
self.inputs = {
'tension': 'signal',
'damping': 'signal',
'nonlinearity_a': 'signal',
'nonlinearity_b': 'signal'
}
self.outputs = {
'field_slice': 'image', # 2D slice of the 3D field
'total_energy': 'signal'
}
self.N = int(grid_size)
self.num_fields = int(num_fields)
# --- Physics Parameters from crystal_kingdom.py ---
self.dt = 0.05
self.polyrhythm_coupling = 0.1
self.nonlinearity_A = 1.0
self.nonlinearity_B = 1.0
self.damping_factor = 0.005
self.tension = 5.0
self.base_frequencies_min = 0.5
self.base_frequencies_max = 2.5
self.diffusion_coeffs_min = 0.05
self.diffusion_coeffs_max = 0.1
self.total_energy = 0.0
# --- Internal 3D State ---
self._initialize_fields_and_params()
# 3D Laplacian Kernel
self.kern = np.zeros((3,3,3), np.float32)
self.kern[1,1,1] = -6
for dx,dy,dz in [(1,1,0),(1,1,2),(1,0,1),(1,2,1),(0,1,1),(2,1,1)]:
self.kern[dx,dy,dz] = 1
if not SCIPY_AVAILABLE:
self.node_title = "CrystalMaker (No SciPy!)"
def _initialize_fields_and_params(self):
"""Initializes or re-initializes fields."""
shape = (self.N, self.N, self.N)
self.phi_fields = [(np.random.rand(*shape).astype(np.float32) - 0.5) * 0.5
for _ in range(self.num_fields)]
self.phi_o_fields = [np.copy(phi) for phi in self.phi_fields]
self.base_frequencies = np.linspace(self.base_frequencies_min, self.base_frequencies_max, self.num_fields)
self.diffusion_coeffs = np.linspace(self.diffusion_coeffs_max, self.diffusion_coeffs_min, self.num_fields)
self.field_phases = np.random.uniform(0, 2 * np.pi, self.num_fields)
self.phi = np.zeros(shape, dtype=np.float32)
self.phi_o = np.zeros(shape, dtype=np.float32)
self._update_summed_fields()
def _update_summed_fields(self):
"""Update the main summed field from individual phi fields"""
self.phi = np.sum(self.phi_fields, axis=0) / max(1, len(self.phi_fields))
self.phi_o = np.sum(self.phi_o_fields, axis=0) / max(1, len(self.phi_fields))
def _potential_deriv(self, field_k):
"""Calculate the derivative of the potential function for a field"""
return -self.nonlinearity_A * field_k + self.nonlinearity_B * (field_k**3)
def _laplacian(self, f):
"""3D Laplacian using convolution with periodic boundary ('wrap')"""
if not SCIPY_AVAILABLE:
return np.zeros_like(f)
# --- FIX: Use mode='wrap' with scipy.ndimage.convolve ---
return convolve(f, self.kern, mode='wrap')
# --- END FIX ---
def step(self):
if not SCIPY_AVAILABLE:
return
# --- Update parameters from inputs ---
# Map signals [-1, 1] to a useful range
self.tension = (self.get_blended_input('tension', 'sum') or 0.0) * 10.0 + 10.0 # Range [0, 20]
self.damping_factor = (self.get_blended_input('damping', 'sum') or 0.0) * 0.02 + 0.02 # Range [0, 0.04]
self.nonlinearity_A = (self.get_blended_input('nonlinearity_a', 'sum') or 0.0) + 1.0 # Range [0, 2]
self.nonlinearity_B = (self.get_blended_input('nonlinearity_b', 'sum') or 0.0) + 1.0 # Range [0, 2]
# --- Run simulation step (from crystal_kingdom.py) ---
new_phi_list = []
self.field_phases += self.base_frequencies * self.dt
self.field_phases %= (2 * np.pi)
for k in range(self.num_fields):
phi_k = self.phi_fields[k]
phi_o_k = self.phi_o_fields[k]
vel_k = phi_k - phi_o_k
lap_k = self._laplacian(phi_k)
potential_deriv_k = self._potential_deriv(phi_k)
other_fields_sum = (np.sum(self.phi_fields, axis=0) - phi_k)
coupling_force = self.polyrhythm_coupling * other_fields_sum / max(1, self.num_fields - 1)
driving_force_k = 0.005 * np.sin(self.field_phases[k])
c2 = 1.0 / (1.0 + self.tension * phi_k**2 + 1e-6)
acc = (c2 * self.diffusion_coeffs[k] * lap_k -
potential_deriv_k +
coupling_force +
driving_force_k)
new_phi_k = phi_k + (1 - self.damping_factor * self.dt) * vel_k + self.dt**2 * acc
new_phi_list.append(new_phi_k)
# Update fields
# Note: phi_o_fields update logic (phi_o_k = phi_k) seems missing from the original source step,
# but the physics uses phi_o_k to compute vel_k, so we need to update it here.
self.phi_o_fields = self.phi_fields # Save current as previous for the next step
self.phi_fields = new_phi_list
self._update_summed_fields()
# Calculate total energy (simplified)
self.total_energy = np.mean(self.phi**2)
def get_output(self, port_name):
if port_name == 'field_slice':
# Output the middle slice
z_mid = self.N // 2
field_slice = self.phi[z_mid, :, :]
# Normalize field for output
vmax = np.abs(field_slice).max() + 1e-9
return (field_slice / (2 * vmax)) + 0.5 # map [-v, v] to [0, 1]
elif port_name == 'total_energy':
return self.total_energy
return None
def get_display_image(self):
# Get the middle slice for the node's display
z_mid = self.N // 2
field_slice = self.phi[z_mid, :, :]
# Normalize field for display
vmax = np.abs(field_slice).max() + 1e-9
img_norm = np.clip((field_slice / (2 * vmax)) + 0.5, 0.0, 1.0)
img_u8 = (img_norm * 255).astype(np.uint8)
# Apply a colormap
img_color = cv2.applyColorMap(img_u8, cv2.COLORMAP_JET)
img_color = np.ascontiguousarray(img_color)
h, w = img_color.shape[:2]
return QtGui.QImage(img_color.data, w, h, 3*w, QtGui.QImage.Format.Format_BGR888)
def get_config_options(self):
return [
("Grid Size (3D)", "N", self.N, None),
("Num Fields", "num_fields", self.num_fields, None),
]