theapemachine commited on
Commit
e317bd9
·
verified ·
1 Parent(s): 46410a2

Remove: tensegrity/legacy/v1/blanket.py

Browse files
Files changed (1) hide show
  1. tensegrity/legacy/v1/blanket.py +0 -218
tensegrity/legacy/v1/blanket.py DELETED
@@ -1,218 +0,0 @@
1
- """
2
- Markov Blanket: The computational boundary of the agent.
3
-
4
- In Friston's formalism, the Markov blanket separates internal states (beliefs)
5
- from external states (world). It consists of:
6
- - Sensory states (S): what flows IN from the world (observations)
7
- - Active states (A): what flows OUT to the world (actions)
8
-
9
- The blanket enforces conditional independence:
10
- Internal ⊥ External | Blanket
11
-
12
- This is not a metaphor. It's the literal statistical boundary that defines
13
- where the agent ends and the world begins. The blanket nodes are the ONLY
14
- points of contact between the agent's belief states and external reality.
15
-
16
- Implementation: The blanket manages the flow of Morton-coded observations
17
- in and action selections out. It also maintains the observation buffer
18
- that feeds into the free energy engine.
19
- """
20
-
21
- import numpy as np
22
- from typing import Optional, Dict, Any, List, Tuple
23
- from collections import deque
24
-
25
- from tensegrity.legacy.v1.morton import MortonEncoder
26
-
27
-
28
- class MarkovBlanket:
29
- """
30
- The agent's interface with the world.
31
-
32
- Sensory states receive Morton-coded observations.
33
- Active states emit discrete actions.
34
-
35
- The blanket enforces the Markov property: internal states
36
- are conditionally independent of external states given the blanket.
37
-
38
- ``n_sensory`` / ``n_active`` mirror constructor channel counts and are
39
- reserved for future multi-channel I/O; ``sense`` still ingests vectors
40
- shaped for ``encoder.n_dims``, and ``act`` consumes the full softmax over
41
- actions passed in.
42
- """
43
-
44
- def __init__(self,
45
- encoder: MortonEncoder,
46
- n_sensory_channels: int = 1,
47
- n_active_channels: int = 1,
48
- observation_buffer_size: int = 64):
49
- """
50
- Args:
51
- encoder: MortonEncoder for sensory preprocessing
52
- n_sensory_channels: Number of parallel sensory channels
53
- n_active_channels: Number of action dimensions
54
- observation_buffer_size: How many past observations to retain
55
- """
56
- self.encoder = encoder
57
- self.n_sensory = n_sensory_channels
58
- self.n_active = n_active_channels
59
-
60
- # Current blanket state
61
- self.sensory_state: Optional[np.ndarray] = None # Morton codes
62
- self.active_state: Optional[np.ndarray] = None # Action indices
63
-
64
- # Observation buffer — recent history for temporal inference
65
- self.observation_buffer: deque = deque(maxlen=observation_buffer_size)
66
-
67
- # Running stats for surprise — per-coordinate counts (variable-length obs).
68
- self._sense_timestep = 0
69
- self._obs_sum: Optional[np.ndarray] = None
70
- self._obs_sq_sum: Optional[np.ndarray] = None
71
- self._obs_elem_count: Optional[np.ndarray] = None
72
-
73
- # Blanket surprise (how unexpected was the last observation?)
74
- self.surprise: float = 0.0
75
-
76
- def sense(self, raw_observation: np.ndarray, *, allow_multi_point_1d: bool = False) -> np.ndarray:
77
- """
78
- Process a raw observation through the sensory boundary.
79
-
80
- 1. Morton-encode the raw data
81
- 2. Update the observation buffer
82
- 3. Compute surprise (deviation from running statistics)
83
-
84
- Args:
85
- raw_observation: Array shaped ``(n_points, encoder.n_dims)``, or ``(n_dims,)``
86
- for one point. One-dimensional vectors whose length is not ``n_dims``
87
- are rejected unless ``allow_multi_point_1d=True`` is set, which treats
88
- the vector as a column (``reshape(-1, 1)``) of scalar observations —
89
- callers should prefer supplying an explicit `(n_points, n_dims)` array.
90
-
91
- Returns:
92
- Morton-coded observation as integer array
93
- """
94
- # Ensure proper shape for Morton encoding
95
- if raw_observation.ndim == 1:
96
- if len(raw_observation) == self.encoder.n_dims:
97
- raw_observation = raw_observation.reshape(1, -1)
98
- elif allow_multi_point_1d:
99
- raw_observation = raw_observation.reshape(-1, 1)
100
- else:
101
- raise ValueError(
102
- f"One-dimensional sensory input length {len(raw_observation)} does not match "
103
- f"encoder.n_dims ({self.encoder.n_dims}). Pass shape "
104
- "(n_points, n_dims), a length-n_dims vector for one observation, "
105
- "or opt in with allow_multi_point_1d=True for reshape(-1, 1)."
106
- )
107
-
108
- # Morton encode
109
- morton_codes = self.encoder.encode_continuous(raw_observation)
110
- if isinstance(morton_codes, (int, np.integer)):
111
- morton_codes = np.array([morton_codes])
112
-
113
- self._sense_timestep += 1
114
-
115
- # Update running statistics for surprise computation
116
- self._update_statistics(raw_observation)
117
-
118
- # Compute surprise: -log P(observation) under running model
119
- self.surprise = self._compute_surprise(raw_observation)
120
-
121
- # Store in buffer
122
- self.sensory_state = morton_codes
123
- self.observation_buffer.append({
124
- 'morton': morton_codes.copy(),
125
- 'raw': raw_observation.copy(),
126
- 'surprise': self.surprise,
127
- 'timestamp': self._sense_timestep
128
- })
129
-
130
- return morton_codes
131
-
132
- def act(self, action_distribution: np.ndarray) -> int:
133
- """
134
- Select an action through the active boundary.
135
-
136
- The action is sampled from the distribution provided by the
137
- inference engine (policy = softmax over expected free energies).
138
-
139
- Args:
140
- action_distribution: Probability distribution over actions.
141
-
142
- Returns:
143
- Selected action index.
144
- """
145
- # Ensure valid distribution
146
- action_distribution = np.asarray(action_distribution, dtype=np.float64)
147
- action_distribution = np.maximum(action_distribution, 1e-16)
148
- action_distribution /= action_distribution.sum()
149
-
150
- # Sample action
151
- action = np.random.choice(len(action_distribution), p=action_distribution)
152
- self.active_state = np.array([action])
153
- return int(action)
154
-
155
- def _update_statistics(self, observation: np.ndarray):
156
- """Update running statistics for surprise computation."""
157
- flat = np.asarray(observation, dtype=np.float64).flatten()
158
-
159
- if self._obs_sum is None:
160
- self._obs_sum = np.zeros(len(flat), dtype=np.float64)
161
- self._obs_sq_sum = np.zeros(len(flat), dtype=np.float64)
162
- self._obs_elem_count = np.zeros(len(flat), dtype=np.float64)
163
-
164
- lf, ls = len(flat), len(self._obs_sum)
165
- if lf > ls:
166
- self._obs_sum = np.pad(self._obs_sum, (0, lf - ls), mode='constant')
167
- self._obs_sq_sum = np.pad(self._obs_sq_sum, (0, lf - ls), mode='constant')
168
- self._obs_elem_count = np.pad(self._obs_elem_count, (0, lf - ls), mode='constant')
169
-
170
- n = min(lf, len(self._obs_sum))
171
- self._obs_sum[:n] += flat[:n]
172
- self._obs_sq_sum[:n] += flat[:n] ** 2
173
- self._obs_elem_count[:n] += 1.0
174
-
175
- def _compute_surprise(self, observation: np.ndarray) -> float:
176
- """
177
- Compute Bayesian surprise: -log P(o) under running Gaussian model.
178
-
179
- This is a simple proxy — the full surprise comes from the
180
- free energy engine. But this gives a fast heuristic at the boundary.
181
- """
182
- flat = np.asarray(observation, dtype=np.float64).flatten()
183
- assert self._obs_sum is not None and self._obs_elem_count is not None
184
- n = min(len(flat), len(self._obs_sum))
185
- cnt = self._obs_elem_count[:n]
186
- if n < 1 or float(np.min(cnt)) < 2.0:
187
- return 0.0
188
-
189
- mean = self._obs_sum[:n] / np.maximum(cnt, 1e-12)
190
- var = self._obs_sq_sum[:n] / np.maximum(cnt, 1e-12) - mean ** 2
191
- var = np.maximum(var, 1e-8) # Prevent division by zero
192
-
193
- # Gaussian log-likelihood (negative = surprise)
194
- log_prob = -0.5 * np.sum(((flat[:n] - mean) ** 2) / var + np.log(2 * np.pi * var))
195
- return float(-log_prob) # Higher = more surprising
196
-
197
- def get_observation_history(self, n: Optional[int] = None) -> List[Dict[str, Any]]:
198
- """Get the last n observations from the buffer."""
199
- if n is None:
200
- return list(self.observation_buffer)
201
- return list(self.observation_buffer)[-n:]
202
-
203
- def get_surprise_trajectory(self) -> np.ndarray:
204
- """Get the surprise values over time."""
205
- return np.array([obs['surprise'] for obs in self.observation_buffer])
206
-
207
- @property
208
- def state(self) -> Dict[str, Any]:
209
- """Current blanket state summary."""
210
- return {
211
- 'sensory': self.sensory_state,
212
- 'active': self.active_state,
213
- 'surprise': self.surprise,
214
- 'sense_timestep': self._sense_timestep,
215
- 'buffer_size': len(self.observation_buffer)
216
- }
217
-
218
-