File size: 7,092 Bytes
3bb804c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""

Self-Organizing Observer Node (Modulatable)

-------------------------------------------

The "Ghost in the Machine" node.

It implements the Free Energy Principle to drive morphogenesis.



Features:

- Configurable Sensitivity: Tune how "neurotic" or "reactive" the observer is.

- Closed Loop Control: Drives growth, plasticity, and energy based on surprise.

- Meta-Cognition Ready: Accepts 'plasticity_mod' to allow chaining observers.



Inputs:

- Sensation: Real-time input (VAE Latent)

- Prediction: Memory expectation (Hebbian Latent)

- Field Energy: Quantum substrate activity

- Plasticity Mod: (NEW) Modulation from a higher-order observer.



Outputs:

- Growth Drive: Triggers morphogenesis

- Plasticity: Modulates learning rate

- Free Energy: The minimized quantity (Surprise + Entropy)

"""

import numpy as np
import cv2
import __main__
BaseNode = __main__.BaseNode
QtGui = __main__.QtGui

class SelfOrganizingObserverNode(BaseNode):
    NODE_CATEGORY = "Cognitive"
    NODE_COLOR = QtGui.QColor(255, 215, 0) # Gold (The Observer)

    def __init__(self, latent_dim=16, growth_sensitivity=15.0, plasticity_gain=5.0, entropy_weight=0.1):
        super().__init__()
        self.node_title = "Self-Organizing Observer"
        
        self.inputs = {
            'sensation': 'spectrum',      # From RealVAE (What is happening)
            'prediction': 'spectrum',     # From HebbianLearner (What I expect)
            'field_energy': 'signal',     # From Quantum/Phi node (System energy)
            'plasticity_mod': 'signal'    # NEW: From Meta-Observer (Force learning)
        }
        
        self.outputs = {
            'growth_drive': 'signal',     # To CorticalGrowth
            'plasticity': 'signal',       # To HebbianLearner
            'entropy_out': 'signal',      # System disorder
            'free_energy': 'signal',      # The quantity being minimized
            'attention_map': 'image'      # Visualization
        }
        
        # --- Configurable Parameters ---
        self.latent_dim = int(latent_dim)
        self.growth_sensitivity = float(growth_sensitivity) # How hard to drive growth when surprised
        self.plasticity_gain = float(plasticity_gain)       # How fast to learn when surprised
        self.entropy_weight = float(entropy_weight)         # How much to penalize pure chaos
        
        # Internal State
        self.attention_vis = np.zeros((64, 64, 3), dtype=np.float32)
        
        # Output variables
        self.growth_drive_val = 0.0
        self.plasticity_val = 0.0
        self.entropy_val = 0.0
        self.free_energy_val = 0.0

    def step(self):
        # 1. Gather Inputs
        sensation = self.get_blended_input('sensation', 'first')
        prediction = self.get_blended_input('prediction', 'first')
        energy = self.get_blended_input('field_energy', 'sum') or 0.5
        plasticity_mod = self.get_blended_input('plasticity_mod', 'sum')
        
        if sensation is None:
            return

        # Normalize sensation if needed
        if len(sensation) != self.latent_dim:
            new_sens = np.zeros(self.latent_dim, dtype=np.float32)
            min_len = min(len(sensation), self.latent_dim)
            new_sens[:min_len] = sensation[:min_len]
            sensation = new_sens
            
        if prediction is None:
            prediction = np.zeros_like(sensation)
            
        # 2. Calculate Free Energy components
        
        # A. Prediction Error (Surprise)
        error_vector = sensation - prediction
        surprise = np.mean(np.square(error_vector))
        
        # B. Entropy (Uncertainty of the input itself)
        current_entropy = np.var(sensation)
        
        # C. Variational Free Energy
        # F = Surprise + (Entropy * Weight)
        free_energy = surprise + (current_entropy * self.entropy_weight)
        
        # 3. Derive Control Signals (The "Will")
        
        # Growth Drive:
        # Peak growth happens at "moderate" surprise.
        # Too little = boredom (no growth). Too much = chaos (shutdown).
        # The sensitivity knob scales the amplitude of this drive.
        growth_drive = free_energy * np.exp(-free_energy * 2.0) * self.growth_sensitivity
        
        # Plasticity (Learning Rate):
        # Learn fast when wrong.
        base_plasticity = np.tanh(surprise * self.plasticity_gain)
        
        # Apply Modulation from Meta-Observer (if connected)
        if plasticity_mod is not None:
            # If the meta-observer is surprised, it forces this observer to learn HARDER
            plasticity = base_plasticity * (1.0 + plasticity_mod * 5.0)
        else:
            plasticity = base_plasticity
        
        # 4. Visualization (The "Mind's Eye")
        side = int(np.sqrt(self.latent_dim))
        if side * side == self.latent_dim:
            err_grid = error_vector.reshape((side, side))
            err_vis = cv2.resize(err_grid, (64, 64), interpolation=cv2.INTER_NEAREST)
            self.attention_vis = cv2.applyColorMap(
                (np.clip(np.abs(err_vis) * 5.0, 0, 1) * 255).astype(np.uint8), 
                cv2.COLORMAP_HOT
            ).astype(np.float32) / 255.0
            
        # 5. Store Outputs
        self.growth_drive_val = growth_drive
        self.plasticity_val = plasticity
        self.entropy_val = current_entropy
        self.free_energy_val = free_energy

    def get_output(self, port_name):
        if port_name == 'attention_map':
            return self.attention_vis
        elif port_name == 'growth_drive':
            return float(self.growth_drive_val)
        elif port_name == 'plasticity':
            return float(self.plasticity_val)
        elif port_name == 'entropy_out':
            return float(self.entropy_val)
        elif port_name == 'free_energy':
            return float(self.free_energy_val)
        return None

    def get_display_image(self):
        # Overlay text for feedback
        img = (self.attention_vis * 255).astype(np.uint8)
        font = cv2.FONT_HERSHEY_SIMPLEX
        cv2.putText(img, f"FE: {self.free_energy_val:.2f}", (2, 10), font, 0.3, (255, 255, 255), 1)
        cv2.putText(img, f"GR: {self.growth_drive_val:.2f}", (2, 60), font, 0.3, (0, 255, 0), 1)
        
        # Show plasticity if boosted
        if self.plasticity_val > 1.0:
             cv2.putText(img, f"PL++: {self.plasticity_val:.2f}", (2, 35), font, 0.3, (255, 0, 255), 1)
        
        return QtGui.QImage(img.data, 64, 64, 64*3, QtGui.QImage.Format.Format_RGB888)
        
    def get_config_options(self):
        return [
            ("Latent Dim", "latent_dim", self.latent_dim, None),
            ("Growth Sensitivity", "growth_sensitivity", self.growth_sensitivity, None),
            ("Plasticity Gain", "plasticity_gain", self.plasticity_gain, None),
            ("Entropy Weight", "entropy_weight", self.entropy_weight, None)
        ]