RFTSystems commited on
Commit
4794088
·
verified ·
1 Parent(s): 65fb737

Create minimal_self_full.py

Browse files
Files changed (1) hide show
  1. minimal_self_full.py +327 -0
minimal_self_full.py ADDED
@@ -0,0 +1,327 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import random
3
+ from typing import List, Optional
4
+ from collections import Counter
5
+ import pandas as pd
6
+ import matplotlib.pyplot as plt
7
+
8
+ # --- Classes ---
9
+
10
+ class SocialEntity:
11
+ def __init__(self, start_pos: np.ndarray, actions: List[np.ndarray], bounds: tuple = (0, 2), seed: int = 44):
12
+ random.seed(seed + 2)
13
+ np.random.seed(seed + 2)
14
+ self.pos = start_pos.astype(float)
15
+ self.actions = actions
16
+ self.bounds = bounds
17
+ self.last_action = np.array([0, 0])
18
+
19
+ def move(self):
20
+ chosen_action = random.choice(self.actions)
21
+ self.last_action = chosen_action.copy()
22
+ self.pos = np.clip(self.pos + chosen_action, self.bounds[0], self.bounds[1])
23
+
24
+
25
+ class MovingObstacle:
26
+ def __init__(self, start_pos: np.ndarray, actions: List[np.ndarray], bounds: tuple = (0, 2), seed: int = 42):
27
+ random.seed(seed + 1)
28
+ np.random.seed(seed + 1)
29
+ self.pos = start_pos.astype(float)
30
+ self.actions = actions
31
+ self.bounds = bounds
32
+
33
+ def move(self):
34
+ chosen_action = random.choice(self.actions)
35
+ self.pos = np.clip(self.pos + chosen_action, self.bounds[0], self.bounds[1])
36
+
37
+
38
+ class MinimalSelf:
39
+ def __init__(self, seed: int = 42, error_window: int = 5, uncertainty_factor: float = 0.2,
40
+ initial_body_bit_strength: float = 1.0, body_bit_decay_rate: float = 0.01,
41
+ body_bit_reinforce_factor: float = 0.1,
42
+ learning_rate: float = 0.1, discount_factor: float = 0.9, epsilon: float = 0.2,
43
+ reward_type: str = "original"):
44
+
45
+ random.seed(seed)
46
+ np.random.seed(seed)
47
+
48
+ # Embodied state
49
+ self.pos = np.array([1, 1]).astype(float)
50
+ self.body_bit_strength = initial_body_bit_strength
51
+ self.body_bit_decay_rate = body_bit_decay_rate
52
+ self.body_bit_reinforce_factor = body_bit_reinforce_factor
53
+
54
+ # Exploration
55
+ self.visited_positions = set()
56
+ self.previous_body_bit_strength = initial_body_bit_strength
57
+
58
+ # Actions
59
+ self.actions = [
60
+ np.array([0, 1]), # N
61
+ np.array([1, 0]), # E
62
+ np.array([0, -1]), # S
63
+ np.array([-1, 0]), # W
64
+ ]
65
+ self.action_map = {tuple(a.astype(int)): i for i, a in enumerate(self.actions)}
66
+ self.reverse_action_map = {i: a for i, a in enumerate(self.actions)}
67
+
68
+ self.last_action = np.array([0, 0])
69
+
70
+ # Error tracking
71
+ self.errors_history: List[float] = []
72
+ self.error_window = error_window
73
+ self.uncertainty_factor = uncertainty_factor
74
+
75
+ # Environment
76
+ self.env_bounds = (0, 2)
77
+ self.obstacle = None
78
+ self.social_entity = None
79
+ self.previous_social_entity_action = np.array([0, 0])
80
+
81
+ # Q-learning
82
+ self.q_table = np.zeros((self.env_bounds[1] + 1, self.env_bounds[1] + 1, len(self.actions)))
83
+ self.learning_rate = learning_rate
84
+ self.discount_factor = discount_factor
85
+ self.epsilon = epsilon
86
+ self.prev_state = None
87
+ self.prev_action_idx = None
88
+ self.reward_type = reward_type
89
+
90
+ def set_obstacle(self, obstacle: MovingObstacle):
91
+ self.obstacle = obstacle
92
+
93
+ def set_social_entity(self, social_entity: SocialEntity):
94
+ self.social_entity = social_entity
95
+
96
+ def sensory_input(self) -> np.ndarray:
97
+ self.pos = np.clip(self.pos, self.env_bounds[0], self.env_bounds[1])
98
+ sensation_vector = [self.pos[0], self.pos[1], self.body_bit_strength]
99
+ if self.obstacle:
100
+ sensation_vector.extend([self.obstacle.pos[0], self.obstacle.pos[1]])
101
+ if self.social_entity:
102
+ sensation_vector.extend([self.social_entity.pos[0], self.social_entity.pos[1],
103
+ self.social_entity.last_action[0], self.social_entity.last_action[1]])
104
+ return np.array(sensation_vector, dtype=float)
105
+
106
+ def counterfactual_sensory(self, action: np.ndarray) -> np.ndarray:
107
+ imagined_pos = self.pos + action
108
+ imagined_pos = np.clip(imagined_pos, self.env_bounds[0], self.env_bounds[1])
109
+ counterfactual_vector = [imagined_pos[0], imagined_pos[1], self.body_bit_strength]
110
+ if self.obstacle:
111
+ counterfactual_vector.extend([self.obstacle.pos[0], self.obstacle.pos[1]])
112
+ if self.social_entity:
113
+ counterfactual_vector.extend([self.social_entity.pos[0], self.social_entity.pos[1],
114
+ self.social_entity.last_action[0], self.social_entity.last_action[1]])
115
+ return np.array(counterfactual_vector, dtype=float)
116
+
117
+ def choose_action(self) -> np.ndarray:
118
+ current_pos_int = tuple(self.pos.astype(int))
119
+ if random.random() < self.epsilon:
120
+ chosen_action_idx = random.randrange(len(self.actions))
121
+ else:
122
+ chosen_action_idx = np.argmax(self.q_table[current_pos_int])
123
+ self.prev_state = current_pos_int
124
+ self.prev_action_idx = chosen_action_idx
125
+ return self.reverse_action_map[chosen_action_idx].copy()
126
+
127
+ def step(self) -> dict:
128
+ body_bit_strength_at_start = self.body_bit_strength
129
+ agent_chosen_action = self.choose_action()
130
+ predicted = self.counterfactual_sensory(agent_chosen_action)
131
+ self.pos += agent_chosen_action
132
+
133
+ if self.social_entity:
134
+ self.social_entity.move()
135
+ if self.obstacle:
136
+ self.obstacle.move()
137
+
138
+ actual = self.sensory_input()
139
+
140
+ # Prediction error
141
+ prediction_error = float(np.linalg.norm(predicted[:2] - actual[:2]))
142
+ self.errors_history.append(prediction_error)
143
+ if len(self.errors_history) > self.error_window:
144
+ self.errors_history.pop(0)
145
+ mean_abs_error = float(np.mean(self.errors_history)) if self.errors_history else 0.0
146
+ max_total_error = float(np.sqrt(8.0))
147
+ predictive_rate = 100.0 * (1.0 - (mean_abs_error / max_total_error)) if max_total_error > 0 else 100.0
148
+ predictive_rate = float(np.clip(predictive_rate, 0.0, 100.0))
149
+ simulated_internal_uncertainty = random.uniform(0.0, self.uncertainty_factor)
150
+ c_min = (max_total_error - mean_abs_error) * (1.0 - simulated_internal_uncertainty) if max_total_error > 0 else 0.0
151
+ c_min = float(c_min)
152
+
153
+ self.last_action = agent_chosen_action.copy()
154
+ reinforcement = (predictive_rate / 100.0) * self.body_bit_reinforce_factor
155
+ self.body_bit_strength += (reinforcement - self.body_bit_decay_rate)
156
+ self.body_bit_strength = np.clip(self.body_bit_strength, 0.0, 2.0)
157
+
158
+ # Q-learning update
159
+ reward = (predictive_rate / 100.0) + (self.body_bit_strength / 2.0)
160
+ if self.prev_state is not None and self.prev_action_idx is not None:
161
+ current_pos_tuple = tuple(self.pos.astype(int))
162
+ old_q_value = self.q_table[self.prev_state][self.prev_action_idx]
163
+ next_max_q = np.max(self.q_table[current_pos_tuple])
164
+ new_q_value = old_q_value + self.learning_rate * (reward + self.discount_factor * next_max_q - old_q_value)
165
+ self.q_table[self.prev_state][self.prev_action_idx] = new_q_value
166
+
167
+ return {
168
+ "sensation": actual,
169
+ "action": agent_chosen_action.copy(),
170
+ "error": prediction_error,
171
+ "position": self.pos.copy(),
172
+ "predictive_rate": predictive_rate,
173
+ "C_min": c_min,
174
+ "body_bit_strength": self.body_bit_strength,
175
+ "reward": reward
176
+ }
177
+
178
+ # --- Helper Functions ---
179
+
180
+ def compute_phi(history: List[dict]) -> float:
181
+ if not history:
182
+ return 0.0
183
+ recent = history[-20:] if len(history) >= 20 else history
184
+ positions = [tuple(h["sensation"][:2].astype(int)) for h in recent]
185
+ body_bit_strengths = [h["sensation"][2] for h in recent]
186
+ avg_body_bit_strength = np.mean(body_bit_strengths)
187
+ unique_positions = set(positions)
188
+ max_possible_unique_positions = min(len(recent), 9)
189
+ position_diversity_score = len(unique_positions) / max_possible_unique_positions if max_possible_unique_positions > 0 else 0.0
190
+ integrated_phi = avg_body_bit_strength * position_diversity_score
191
+ return float(np.clip(integrated_phi, 0.0, 2.0))
192
+
193
+
194
+ def run_simulation(agent_instance: MinimalSelf, num_steps: int,
195
+ obstacle_instance: Optional[MovingObstacle] = None,
196
+ social_entity_instance: Optional[SocialEntity] = None) -> List[dict]:
197
+ history: List[dict] = []
198
+ if obstacle_instance:
199
+ agent_instance.set_obstacle(obstacle_instance)
200
+ if social_entity_instance:
201
+ agent_instance.set_social_entity(social_entity_instance)
202
+
203
+ for t in range(num_steps):
204
+ hist = agent_instance.step()
205
+ hist["t"] = t
206
+ history.append(hist)
207
+
208
+ return history
209
+
210
+
211
+ def plot_time_series(df, title, metrics):
212
+ fig, axes = plt.subplots(len(metrics), 1, figsize=(12, 3 * len(metrics)), sharex=True)
213
+ if len(metrics) == 1:
214
+ axes = [axes]
215
+
216
+ for i, metric in enumerate(metrics):
217
+ if metric in df.columns:
218
+ axes[i].plot(df['t'], df[metric], label=metric)
219
+ axes[i].set_ylabel(metric)
220
+ axes[i].legend()
221
+ axes[i].grid(True)
222
+ else:
223
+ axes[i].set_ylabel(metric + ' (N/A)')
224
+ axes[i].text(0.5, 0.5, f'{metric} not available', ha='center', va='center',
225
+ transform=axes[i].transAxes)
226
+ axes[i].grid(True)
227
+
228
+ axes[-1].set_xlabel("Time Step")
229
+ fig.suptitle(title, fontsize=16)
230
+ plt.tight_layout(rect=[0, 0.03, 1, 0.96])
231
+ return fig
232
+
233
+ # --- Simulation Execution ---
234
+
235
+ if __name__ == "__main__":
236
+ NUM_STEPS = 5000
237
+ all_histories = {}
238
+ all_dataframes = {}
239
+
240
+ # Re-usable actions
241
+ entity_actions = [np.array([0, 1]), np.array([1, 0]), np.array([0, -1]), np.array([-1, 0])]
242
+
243
+ # 1. No Learning Baseline
244
+ print(f"\nRunning 'No Learning' Baseline for {NUM_STEPS} steps...")
245
+ no_learning_agent = MinimalSelf(seed=123, initial_body_bit_strength=1.0,
246
+ body_bit_decay_rate=0.0, body_bit_reinforce_factor=0.0,
247
+ epsilon=0.0, learning_rate=0.0, reward_type="original")
248
+ history_no_learning = run_simulation(no_learning_agent, NUM_STEPS)
249
+ all_histories['no_learning'] = history_no_learning
250
+ print("Baseline completed.")
251
+
252
+ # 2. Q-Learning Original Reward Simple Environment
253
+ q_original_simple_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
254
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
255
+ reward_type="original")
256
+ history_q_original_simple = run_simulation(q_original_simple_agent, NUM_STEPS)
257
+ all_histories['q_original_simple'] = history_q_original_simple
258
+
259
+ # 3. Q-Learning Original Reward Complex Environment
260
+ moving_obstacle = MovingObstacle(start_pos=np.array([0, 0]), actions=entity_actions, seed=43)
261
+ q_original_complex_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
262
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
263
+ reward_type="original")
264
+ history_q_original_complex = run_simulation(q_original_complex_agent, NUM_STEPS,
265
+ obstacle_instance=moving_obstacle)
266
+ all_histories['q_original_complex'] = history_q_original_complex
267
+
268
+ # 4. Explore & Grow Simple Environment
269
+ explore_grow_simple_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
270
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
271
+ reward_type="explore_grow")
272
+ history_explore_grow_simple = run_simulation(explore_grow_simple_agent, NUM_STEPS)
273
+ all_histories['explore_grow_simple'] = history_explore_grow_simple
274
+
275
+ # 5. Explore & Grow Complex Environment
276
+ moving_obstacle2 = MovingObstacle(start_pos=np.array([0, 0]), actions=entity_actions, seed=43)
277
+ explore_grow_complex_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
278
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
279
+ reward_type="explore_grow")
280
+ history_explore_grow_complex = run_simulation(explore_grow_complex_agent, NUM_STEPS,
281
+ obstacle_instance=moving_obstacle2)
282
+ all_histories['explore_grow_complex'] = history_explore_grow_complex
283
+
284
+ # 6. Social Simple Environment
285
+ social_entity_simple = SocialEntity(start_pos=np.array([2, 2]), actions=entity_actions, seed=44)
286
+ q_social_simple_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
287
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
288
+ reward_type="social")
289
+ history_q_social_simple = run_simulation(q_social_simple_agent, NUM_STEPS,
290
+ social_entity_instance=social_entity_simple)
291
+ all_histories['q_social_simple'] = history_q_social_simple
292
+
293
+ # 7. Social Complex Environment
294
+ social_entity_complex = SocialEntity(start_pos=np.array([2, 2]), actions=entity_actions, seed=44)
295
+ moving_obstacle3 = MovingObstacle(start_pos=np.array([0, 0]), actions=entity_actions, seed=43)
296
+ q_social_complex_agent = MinimalSelf(seed=123, epsilon=0.2, learning_rate=0.1,
297
+ body_bit_reinforce_factor=0.1, body_bit_decay_rate=0.01,
298
+ reward_type="social")
299
+ history_q_social_complex = run_simulation(q_social_complex_agent, NUM_STEPS,
300
+ obstacle_instance=moving_obstacle3,
301
+ social_entity_instance=social_entity_complex)
302
+ all_histories['q_social_complex'] = history_q_social_complex
303
+
304
+ # Convert histories to DataFrames
305
+ for name, history_list in all_histories.items():
306
+ all_dataframes[f'df_{name}'] = pd.DataFrame(history_list)
307
+
308
+ # Print average metrics
309
+ print("\n--- Average Metrics Comparison ---")
310
+ metrics_for_avg = ['predictive_rate', 'C_min', 'body_bit_strength', 'reward']
311
+ for name, df in all_dataframes.items():
312
+ print(f"\n{name}:")
313
+ existing_metrics = [m for m in metrics_for_avg if m in df.columns]
314
+ print(df[existing_metrics].mean())
315
+
316
+ # Final Phi values
317
+ print("\n--- Final Phi Values ---")
318
+ for name, history_list in all_histories.items():
319
+ final_phi = compute_phi(history_list)
320
+ print(f"{name}: {final_phi:.2f}")
321
+
322
+ # Example plot for one run
323
+ metrics_for_plot = ['predictive_rate', 'C_min', 'body_bit_strength', 'reward']
324
+ plot_time_series(all_dataframes['df_q_original_simple'],
325
+ "Q-Learning Original Reward Simple Environment", metrics_for_plot)
326
+ plt.show()
327
+