aedmark commited on
Commit
2143e07
·
verified ·
1 Parent(s): 7658e0d

Delete bone_physics.py

Browse files
Files changed (1) hide show
  1. bone_physics.py +0 -754
bone_physics.py DELETED
@@ -1,754 +0,0 @@
1
- import math
2
- import random
3
- import time
4
- from collections import Counter, deque
5
- from dataclasses import dataclass
6
- from typing import Dict, List, Any, Tuple, Optional, Deque
7
-
8
- from bone_config import BoneConfig
9
- from bone_core import LoreManifest
10
- from bone_lexicon import LexiconService
11
- from bone_types import (
12
- Prisma,
13
- PhysicsPacket,
14
- CycleContext,
15
- SpatialState,
16
- MaterialState,
17
- EnergyState,
18
- )
19
-
20
-
21
- @dataclass
22
- class PhysicsDelta:
23
- operator: str
24
- field: str
25
- value: float
26
- source: str
27
- message: Optional[str] = None
28
-
29
-
30
- TRIGRAM_MAP: Dict[str, Tuple[str, str, str, str]] = {
31
- "VEL": ("☳", "ZHEN", "Thunder", Prisma.GRN),
32
- "STR": ("☶", "GEN", "Mountain", Prisma.SLATE),
33
- "ENT": ("☵", "KAN", "Water", Prisma.BLU),
34
- "PHI": ("☲", "LI", "Fire", Prisma.RED),
35
- "PSI": ("☰", "QIAN", "Heaven", Prisma.WHT),
36
- "BET": ("☴", "XUN", "Wind", Prisma.CYN),
37
- "E": ("☷", "KUN", "Earth", Prisma.OCHRE),
38
- "DEL": ("☱", "DUI", "Lake", Prisma.MAG),
39
- }
40
-
41
- PHYS_CFG = {
42
- "V_MAX": getattr(BoneConfig.PHYSICS, "VOLTAGE_MAX", 20.0),
43
- "V_FLOOR": getattr(BoneConfig.PHYSICS, "VOLTAGE_FLOOR", 0.0),
44
- "V_CRIT": getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0),
45
- "DRAG_FLOOR": getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0),
46
- "DRAG_HALT": getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0),
47
- "FLUX_THRESHOLD": 0.5,
48
- "DEADBAND": 0.05
49
- }
50
-
51
- @dataclass
52
- class GeodesicVector:
53
- tension: float
54
- compression: float
55
- coherence: float
56
- abstraction: float
57
- dimensions: Dict[str, float]
58
-
59
-
60
- class GeodesicConstants:
61
- DENSITY_SCALAR = 20.0
62
- SQUELCH_LIMIT_MULT = 3.0
63
- MIN_VOLUME_SCALAR = 0.5
64
- SUBURBAN_FRICTION_LOG_BASE = 5.0
65
- HEAVY_FRICTION_MULT = 2.5
66
- SOLVENT_LUBRICATION_FACTOR = 0.05
67
- SHEAR_RESISTANCE_SCALAR = 2.0
68
- KINETIC_LIFT_RATIO = 0.5
69
- PLAY_LIFT_MULT = 2.5
70
- COMPRESSION_SCALAR = 10.0
71
- ABSTRACTION_BASE = 0.2
72
- MAX_VISCOSITY_DENSITY = 2.0
73
- MAX_LIFT_DENSITY = 2.0
74
- SAFE_VOL_THRESHOLD = 3
75
-
76
-
77
- class GeodesicEngine:
78
- @staticmethod
79
- def collapse_wavefunction(
80
- clean_words: List[str], counts: Dict[str, int]
81
- ) -> GeodesicVector:
82
- volume = max(1, len(clean_words))
83
- masses = GeodesicEngine._weigh_mass(counts)
84
- forces = GeodesicEngine._calculate_forces(masses, counts, volume)
85
- dimensions = GeodesicEngine._calculate_dimensions(
86
- masses, forces, counts, volume
87
- )
88
- return GeodesicVector(
89
- tension=forces["tension"],
90
- compression=forces["compression"],
91
- coherence=forces["coherence"],
92
- abstraction=forces["abstraction"],
93
- dimensions=dimensions,
94
- )
95
-
96
- @staticmethod
97
- def _weigh_mass(counts: Dict[str, int]) -> Dict[str, float]:
98
- keys = [
99
- "heavy",
100
- "kinetic",
101
- "constructive",
102
- "abstract",
103
- "play",
104
- "social",
105
- "explosive",
106
- "void",
107
- "liminal",
108
- "meat",
109
- "harvest",
110
- "pareidolia",
111
- "crisis_term",
112
- ]
113
- return {k: float(counts.get(k, 0)) for k in keys}
114
-
115
- @staticmethod
116
- def _calculate_forces(
117
- masses: Dict[str, float], counts: Dict[str, int], volume: int
118
- ) -> Dict[str, float]:
119
- cfg = BoneConfig.PHYSICS
120
- GC = GeodesicConstants
121
- safe_volume = max(1, volume)
122
- w_heavy = getattr(cfg, "WEIGHT_HEAVY", 2.0)
123
- w_kinetic = getattr(cfg, "WEIGHT_KINETIC", 1.5)
124
- w_explosive = getattr(cfg, "WEIGHT_EXPLOSIVE", 3.0)
125
- w_constructive = getattr(cfg, "WEIGHT_CONSTRUCTIVE", 1.2)
126
- raw_tension_mass = (
127
- (masses["heavy"] * w_heavy)
128
- + (masses["kinetic"] * w_kinetic)
129
- + (masses["explosive"] * w_explosive)
130
- + (masses["constructive"] * w_constructive)
131
- )
132
- total_kinetic = masses["kinetic"] + masses["explosive"]
133
- kinetic_gain = getattr(BoneConfig, "KINETIC_GAIN", 1.0)
134
- base_tension = (
135
- (raw_tension_mass / safe_volume) * GC.DENSITY_SCALAR * kinetic_gain
136
- )
137
- squelch_limit = (
138
- getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) * GC.SQUELCH_LIMIT_MULT
139
- )
140
- mass_scalar = min(1.0, safe_volume / squelch_limit)
141
- if safe_volume < GC.SAFE_VOL_THRESHOLD:
142
- mass_scalar *= GC.MIN_VOLUME_SCALAR
143
- tension = round(min(100.0, base_tension * mass_scalar), 2)
144
- shear_rate = total_kinetic / safe_volume
145
- suburban_friction = (
146
- math.log1p(counts.get("suburban", 0)) * GC.SUBURBAN_FRICTION_LOG_BASE
147
- )
148
- raw_friction = suburban_friction + (masses["heavy"] * GC.HEAVY_FRICTION_MULT)
149
- lubrication = 1.0 + (counts.get("solvents", 0) * GC.SOLVENT_LUBRICATION_FACTOR)
150
- dynamic_viscosity = (raw_friction / lubrication) / (
151
- 1.0 + (shear_rate * GC.SHEAR_RESISTANCE_SCALAR)
152
- )
153
- kinetic_lift = (total_kinetic * GC.KINETIC_LIFT_RATIO) / (
154
- masses["heavy"] * 0.5 + 1.0
155
- )
156
- lift = (masses["play"] * GC.PLAY_LIFT_MULT) + kinetic_lift
157
- viscosity_density = dynamic_viscosity / safe_volume
158
- lift_density = lift / safe_volume
159
- raw_compression = (viscosity_density - lift_density) * GC.COMPRESSION_SCALAR
160
- raw_compression *= getattr(BoneConfig, "SIGNAL_DRAG_MULTIPLIER", 1.0)
161
- compression = round(
162
- max(-5.0, min(PHYS_CFG["DRAG_HALT"], raw_compression * mass_scalar)), 2
163
- )
164
- structural_mass = masses["heavy"] + masses["constructive"] + masses["harvest"]
165
- structural_mass -= masses["void"] * 0.5
166
- structural_mass = max(0.0, structural_mass) # [MEADOWS] Plug the void leak
167
- shapley_thresh = getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0)
168
- total_abstract = (
169
- masses["abstract"] +
170
- masses["liminal"] +
171
- masses["pareidolia"] +
172
- masses["void"]
173
- )
174
- abstraction_val = (total_abstract / safe_volume) + GC.ABSTRACTION_BASE
175
- return {
176
- "tension": tension,
177
- "compression": compression,
178
- "coherence": round(min(1.0, structural_mass / max(1.0, shapley_thresh)), 3),
179
- "abstraction": round(min(1.0, abstraction_val), 2),
180
- }
181
-
182
- @staticmethod
183
- def _calculate_dimensions(masses, forces, counts, volume) -> Dict[str, float]:
184
- inv_vol = 1.0 / max(1, volume)
185
- base_mass = 0.1
186
- str_mass = masses["heavy"] * 2.0 + masses["constructive"] + masses["harvest"]
187
- ent_mass = (counts.get("antigen", 0) * 3.0) + masses["meat"] + masses["crisis_term"]
188
- psi_mass = forces["abstraction"]
189
- return {
190
- "VEL": max(
191
- 0.0,
192
- min(
193
- 1.0,
194
- (masses["kinetic"] * 2.0 - forces["compression"] + base_mass)
195
- * inv_vol,
196
- ),
197
- ),
198
- "STR": max(0.0, min(1.0, (str_mass + base_mass) * inv_vol)),
199
- "ENT": max(0.0, min(1.0, ent_mass * inv_vol)),
200
- "PHI": max(
201
- 0.0,
202
- min(1.0, (masses["heavy"] + masses["kinetic"] + base_mass) * inv_vol),
203
- ),
204
- "PSI": max(0.0, min(1.0, psi_mass)),
205
- "BET": max(0.0, min(1.0, (masses["social"] * 2.0) * inv_vol)),
206
- "DEL": max(0.0, min(1.0, (masses["play"] * 3.0) * inv_vol)),
207
- "E": max(0.0, min(1.0, (counts.get("solvents", 0)) * inv_vol)),
208
- }
209
-
210
-
211
- class TheGatekeeper:
212
- def __init__(self, lexicon_ref, memory_ref=None):
213
- self.lex = lexicon_ref
214
- self.mem = memory_ref
215
-
216
- def check_entry(
217
- self, ctx: CycleContext, current_atp: float = 20.0
218
- ) -> Tuple[bool, Optional[Dict]]:
219
- phys = ctx.physics
220
- starvation_threshold = getattr(BoneConfig.BIO, "ATP_STARVATION", 5.0)
221
- if current_atp < (starvation_threshold * 0.5):
222
- return False, self._pack_refusal(
223
- ctx,
224
- "DARK_SYSTEM",
225
- "Energy critical. The inputs dissolve into the void.",
226
- )
227
- if phys.counts.get("antigen", 0) > 2:
228
- return False, self._pack_refusal(
229
- ctx,
230
- "TOXICITY",
231
- f"{Prisma.RED}IMMUNE REACTION: Input rejected as pathogenic.{Prisma.RST}",
232
- )
233
- if self._audit_safety(ctx.clean_words):
234
- return False, self._pack_refusal(
235
- ctx,
236
- "CURSED_INPUT",
237
- f"{Prisma.RED}The Gatekeeper recoils. Cursed syntax detected.{Prisma.RST}",
238
- )
239
- text = ctx.input_text
240
- if "```" in text or "{{" in text or "}}" in text:
241
- return False, self._pack_refusal(
242
- ctx,
243
- "SYNTAX_ERR",
244
- f"{Prisma.RED}The mechanism jams. Syntax anomaly detected.{Prisma.RST}",
245
- )
246
- if len(text) > 10000:
247
- return False, self._pack_refusal(
248
- ctx,
249
- "OVERLOAD",
250
- f"{Prisma.OCHRE}Input too long. Compress your thought.{Prisma.RST}",
251
- )
252
- return True, None
253
-
254
- def _audit_safety(self, words: List[str]) -> bool:
255
- cursed = self.lex.get("cursed")
256
- return (
257
- not cursed.isdisjoint(words)
258
- if isinstance(cursed, set)
259
- else any(w in cursed for w in words)
260
- )
261
-
262
- @staticmethod
263
- def _pack_refusal(ctx, type_str, ui_msg):
264
- return {"type": type_str, "ui": ui_msg, "logs": ctx.logs + [ui_msg]}
265
-
266
-
267
- class QuantumObserver:
268
- def __init__(self, events):
269
- self.events = events
270
- self.voltage_history: Deque[float] = deque(maxlen=5)
271
- self.last_physics_packet: Optional[PhysicsPacket] = None
272
-
273
- def gaze(self, text: str, graph: Dict = None) -> Dict:
274
- clean_words = LexiconService.clean(text)
275
- counts = self._tally_categories(clean_words)
276
- geo = GeodesicEngine.collapse_wavefunction(clean_words, counts)
277
- self.voltage_history.append(geo.tension)
278
- smoothed_voltage = round(
279
- sum(self.voltage_history) / len(self.voltage_history), 2
280
- )
281
- e_metric, beta_val = self._calculate_metrics(text, counts)
282
- valence = LexiconService.get_valence(clean_words)
283
- graph_mass = self._calculate_graph_mass(clean_words, graph)
284
- energy = EnergyState(
285
- voltage=smoothed_voltage,
286
- entropy=e_metric,
287
- beta_index=beta_val,
288
- mass=round(graph_mass, 1),
289
- psi=geo.abstraction,
290
- kappa=geo.coherence,
291
- valence=valence,
292
- velocity=0.0,
293
- turbulence=0.0,
294
- )
295
- matter = MaterialState(
296
- clean_words=clean_words,
297
- raw_text=text,
298
- counts=counts,
299
- antigens=counts.get("antigen", 0),
300
- vector=geo.dimensions,
301
- truth_ratio=0.5,
302
- )
303
- space = SpatialState(
304
- narrative_drag=geo.compression,
305
- zone=self._determine_zone(geo.dimensions),
306
- atmosphere="NEUTRAL",
307
- flow_state=self._determine_flow(smoothed_voltage, geo.coherence),
308
- )
309
- self.last_physics_packet = PhysicsPacket(
310
- energy=energy, matter=matter, space=space
311
- )
312
- packet_dict = self.last_physics_packet.to_dict()
313
- if hasattr(self.events, "publish"):
314
- self.events.publish("PHYSICS_CALCULATED", packet_dict)
315
- return {"physics": self.last_physics_packet, "clean_words": clean_words}
316
-
317
- @staticmethod
318
- def _tally_categories(clean_words: List[str]) -> Counter:
319
- counts = Counter()
320
- solvents = LexiconService.get("solvents") or set()
321
- for w in clean_words:
322
- if w in solvents:
323
- counts["solvents"] += 1
324
- continue
325
- cats = LexiconService.get_categories_for_word(w)
326
- if cats:
327
- counts.update(cats)
328
- else:
329
- flavor, conf = LexiconService.taste(w)
330
- if flavor and conf > 0.5:
331
- counts[flavor] += 1
332
- return counts
333
-
334
- @staticmethod
335
- def _calculate_graph_mass(words: List[str], graph: Optional[Dict]) -> float:
336
- if not graph:
337
- return 0.0
338
- total_mass = 0.0
339
- existing_nodes = [w for w in words if w in graph]
340
- for w in existing_nodes:
341
- edges = graph[w].get("edges", {})
342
- edge_weight_sum = sum(edges.values()) if edges else 0.0
343
- node_mass = min(50.0, edge_weight_sum)
344
- total_mass += node_mass
345
- return total_mass
346
-
347
- @staticmethod
348
- def _calculate_metrics(
349
- text: str, counts: Dict[str, int]
350
- ) -> Tuple[float, float]:
351
- length = len(text)
352
- if length == 0:
353
- return 0.0, 0.0
354
- scalar = getattr(BoneConfig.PHYSICS, "TEXT_LENGTH_SCALAR", 1500.0)
355
- raw_chaos = length / scalar
356
- solvents = counts.get("solvents", 0)
357
- solvent_density = solvents / max(1.0, length / 5.0)
358
- glue_factor = min(1.0, solvent_density * 2.0)
359
- e_metric = min(1.0, raw_chaos * (1.0 - (glue_factor * 0.8)))
360
- structure_chars = sum(1 for char in text if char in "!?%@#$;,")
361
- heavy_words = (
362
- counts.get("heavy", 0)
363
- + counts.get("constructive", 0)
364
- + counts.get("sacred", 0)
365
- )
366
- structure_score = structure_chars + (heavy_words * 2)
367
- beta_index = min(
368
- 1.0, math.log1p(structure_score + 1) / math.log1p(length * 0.1 + 1)
369
- )
370
- if length < 50:
371
- beta_index *= length / 50.0
372
- return round(e_metric, 3), round(beta_index, 3)
373
-
374
- @staticmethod
375
- def _determine_flow(v: float, k: float) -> str:
376
- volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0)
377
- kappa_strong = 0.8
378
- if v > volt_flow and k > kappa_strong:
379
- return "SUPERCONDUCTIVE"
380
- if v > 10.0:
381
- return "TURBULENT"
382
- return "LAMINAR"
383
-
384
- @staticmethod
385
- def _determine_zone(vector: Dict[str, float]) -> str:
386
- if not vector:
387
- return "COURTYARD"
388
- dom = max(vector, key=vector.get)
389
- if dom in ["PSI", "DEL"]:
390
- return "AERIE"
391
- if dom in ["STR", "PHI"]:
392
- return "THE_FORGE"
393
- if dom in ["ENT", "VEL"]:
394
- return "THE_MUD"
395
- return "COURTYARD"
396
-
397
-
398
- class SurfaceTension:
399
- @staticmethod
400
- def audit_hubris(physics: Dict[str, Any]) -> Tuple[bool, str, str]:
401
- voltage = physics.get("voltage", 0.0)
402
- coherence = physics.get("kappa", 0.5)
403
- volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0)
404
- volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0)
405
- if voltage >= volt_crit and coherence < 0.4:
406
- return (
407
- True,
408
- f"⚠️ HUBRIS DETECTED: Voltage ({voltage:.1f}v) exceeds structural integrity. Wings melting.",
409
- "ICARUS_CRASH",
410
- )
411
- if voltage > volt_flow and coherence > 0.8:
412
- return (
413
- True,
414
- "🌊 SURFACE TENSION OPTIMAL: Entering Flow State.",
415
- "FLOW_BOOST",
416
- )
417
- return False, "", ""
418
-
419
-
420
- class ChromaScope:
421
- @staticmethod
422
- def modulate(text: str, vector: Dict[str, float]) -> str:
423
- if not vector or not any(vector.values()):
424
- return f"{Prisma.GRY}{text}{Prisma.RST}"
425
- primary_dim = max(vector, key=vector.get)
426
- if primary_dim in TRIGRAM_MAP:
427
- selected_color = TRIGRAM_MAP[primary_dim][3]
428
- else:
429
- selected_color = Prisma.GRY
430
- return f"{selected_color}{text}{Prisma.RST}"
431
-
432
-
433
- class ZoneInertia:
434
- def __init__(self, inertia=0.7):
435
- self.inertia = inertia
436
- self.min_dwell = getattr(BoneConfig.PHYSICS, "ZONE_MIN_DWELL", 2)
437
- self.current_zone = "COURTYARD"
438
- self.dwell_counter = 0
439
- self.last_vector: Optional[Tuple[float, float, float]] = None
440
- self.is_anchored = False
441
- self.strain_gauge = 0.0
442
-
443
- def toggle_anchor(self) -> bool:
444
- self.is_anchored = not self.is_anchored
445
- self.strain_gauge = 0.0
446
- return self.is_anchored
447
-
448
- def stabilize(
449
- self,
450
- proposed_zone: str,
451
- physics: Dict[str, Any],
452
- cosmic_state: Tuple[str, float, str],
453
- ) -> Tuple[str, Optional[str]]:
454
- beta = physics.get("beta_index", 1.0)
455
- truth = physics.get("truth_ratio", 0.5)
456
- grav_pull = 1.0 if cosmic_state[0] != "VOID_DRIFT" else 0.0
457
- current_vec = (beta, truth, grav_pull)
458
- self.dwell_counter += 1
459
- pressure = 0.0
460
- if self.last_vector:
461
- dist = math.dist(current_vec, self.last_vector)
462
- similarity = max(0.0, 1.0 - (dist / 2.0))
463
- pressure = 1.0 - similarity
464
- if self.is_anchored:
465
- return self._handle_anchored_state(proposed_zone, pressure)
466
- if proposed_zone == self.current_zone:
467
- self.dwell_counter = 0
468
- self.last_vector = current_vec
469
- return proposed_zone, None
470
- if self.dwell_counter < self.min_dwell:
471
- return self.current_zone, None
472
- return self._attempt_migration(proposed_zone, pressure)
473
-
474
- def _handle_anchored_state(
475
- self, proposed_zone: str, pressure: float
476
- ) -> Tuple[str, Optional[str]]:
477
- if proposed_zone == self.current_zone:
478
- self.strain_gauge = max(0.0, self.strain_gauge - 0.1)
479
- return self.current_zone, None
480
- self.strain_gauge += pressure
481
- limit = 2.5
482
- if self.strain_gauge > limit:
483
- self.is_anchored = False
484
- self.strain_gauge = 0.0
485
- self.current_zone = proposed_zone
486
- return (
487
- proposed_zone,
488
- f"{Prisma.RED}⚡ SNAP! The narrative current was too strong. Anchor failed.{Prisma.RST}",
489
- )
490
- return (
491
- self.current_zone,
492
- f"{Prisma.OCHRE}⚓ ANCHORED: Resisting drift to '{proposed_zone}' (Strain {self.strain_gauge:.1f}/{limit}){Prisma.RST}",
493
- )
494
-
495
- def _attempt_migration(
496
- self, proposed_zone: str, pressure: float
497
- ) -> Tuple[str, Optional[str]]:
498
- prob = (1.0 - self.inertia) + pressure
499
- if proposed_zone in ["AERIE", "THE_FORGE"]:
500
- prob += 0.2
501
- if random.random() < prob:
502
- old, self.current_zone = self.current_zone, proposed_zone
503
- self.dwell_counter = 0
504
- return (
505
- self.current_zone,
506
- f"{Prisma.CYN}>>> MIGRATION: {old} -> {proposed_zone}.{Prisma.RST}",
507
- )
508
- return self.current_zone, None
509
-
510
- @staticmethod
511
- def override_cosmic_drag(cosmic_drag_penalty: float, current_zone: str) -> float:
512
- if current_zone == "AERIE" and cosmic_drag_penalty > 0:
513
- return cosmic_drag_penalty * 0.3
514
- return cosmic_drag_penalty
515
-
516
-
517
- class CosmicDynamics:
518
- def __init__(self):
519
- self.voltage_history: Deque[float] = deque(maxlen=20)
520
- self.cached_wells: Dict = {}
521
- self.cached_hubs: Dict = {}
522
- self.last_scan_tick: int = 0
523
- self.SCAN_INTERVAL: int = 10
524
- self.logs = self._load_logs()
525
-
526
- @staticmethod
527
- def _load_logs():
528
- base = {
529
- "GRAVITY": "⚓ GRAVITY: The narrative is heavy. (Drag {drag:.1f})",
530
- "VOID": "VOID: Drifting outside the filaments.",
531
- "NEBULA": "NEBULA: Floating near '{node}' (Mass {mass}). Not enough mass for orbit.",
532
- "LAGRANGE": "LAGRANGE: Caught between '{p}' and '{s}'",
533
- "FLOW": "FLOW: Streaming towards '{node}'",
534
- "ORBIT": "ORBIT: Circling '{node}' (Mass {mass})"
535
- }
536
- manifest = LoreManifest.get_instance().get("narrative_data") or {}
537
- return manifest.get("COSMIC_LOGS", base)
538
-
539
- def commit(self, voltage: float):
540
- self.voltage_history.append(voltage)
541
-
542
- def check_gravity(
543
- self, current_drift: float, psi: float
544
- ) -> Tuple[float, List[str]]:
545
- logs = []
546
- new_drag = current_drift
547
- drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0)
548
- if new_drag < drag_floor:
549
- new_drag += 0.05
550
- if psi > 0.5:
551
- reduction = (psi - 0.5) * 0.2
552
- new_drag = max(0.0, new_drag - reduction)
553
- CRITICAL_DRIFT = getattr(BoneConfig.PHYSICS, "DRAG_CRITICAL", 8.0)
554
- if new_drag > CRITICAL_DRIFT:
555
- if random.random() < 0.3:
556
- msg = self.logs.get("GRAVITY", "⚓ GRAVITY").format(drag=new_drag)
557
- logs.append(f"{Prisma.GRY}{msg}{Prisma.RST}")
558
- return new_drag, logs
559
-
560
- def analyze_orbit(
561
- self, network: Any, clean_words: List[str]
562
- ) -> Tuple[str, float, str]:
563
- if (
564
- not clean_words
565
- or not network
566
- or not hasattr(network, "graph")
567
- or not network.graph
568
- ):
569
- return "VOID_DRIFT", 3.0, "VOID: Deep Space. No connection."
570
- current_time = int(time.time())
571
- if (
572
- not self.cached_wells
573
- or (current_time - self.last_scan_tick) > self.SCAN_INTERVAL
574
- ):
575
- gravity_wells, geodesic_hubs = self._scan_network_mass(network)
576
- self.cached_wells = gravity_wells
577
- self.cached_hubs = geodesic_hubs
578
- self.last_scan_tick = current_time
579
- else:
580
- gravity_wells = self.cached_wells
581
- geodesic_hubs = self.cached_hubs
582
- basin_pulls, active_filaments = self._calculate_pull(
583
- clean_words, network, gravity_wells
584
- )
585
- if sum(basin_pulls.values()) == 0:
586
- return self._handle_void_state(clean_words, geodesic_hubs)
587
- return self._resolve_orbit(
588
- basin_pulls, active_filaments, len(clean_words), gravity_wells
589
- )
590
-
591
- @staticmethod
592
- def _scan_network_mass(network) -> Tuple[Dict, Dict]:
593
- gravity_wells = {}
594
- geodesic_hubs = {}
595
- well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0)
596
- geo_strength = getattr(BoneConfig, "GEODESIC_STRENGTH", 10.0)
597
- for node in network.graph:
598
- mass = network.calculate_mass(node)
599
- if mass >= well_threshold:
600
- gravity_wells[node] = mass
601
- elif mass >= geo_strength:
602
- geodesic_hubs[node] = mass
603
- return gravity_wells, geodesic_hubs
604
-
605
- @staticmethod
606
- def _calculate_pull(words, network, gravity_wells) -> Tuple[Dict, int]:
607
- basin_pulls = {k: 0.0 for k in gravity_wells}
608
- active_filaments = 0
609
- word_counts = Counter(words)
610
- for w, count in word_counts.items():
611
- if w in gravity_wells:
612
- basin_pulls[w] += (gravity_wells[w] * 2.0) * count
613
- active_filaments += count
614
- for well, well_mass in gravity_wells.items():
615
- edges = network.graph.get(well, {}).get("edges", {})
616
- if not edges:
617
- continue
618
- intersection = set(word_counts.keys()).intersection(edges.keys())
619
- for match in intersection:
620
- basin_pulls[well] += (well_mass * 0.5) * word_counts[match]
621
- active_filaments += word_counts[match]
622
- return basin_pulls, active_filaments
623
-
624
- def _handle_void_state(self, words, geodesic_hubs) -> Tuple[str, float, str]:
625
- for w in words:
626
- hub_mass = geodesic_hubs.get(w)
627
- if hub_mass is not None:
628
- msg = self.logs.get("NEBULA", "NEBULA").format(
629
- node=w.upper(),
630
- mass=int(hub_mass)
631
- )
632
- return "PROTO_COSMOS", 1.0, msg
633
- return "VOID_DRIFT", 3.0, self.logs.get("VOID", "VOID")
634
-
635
- def _resolve_orbit(
636
- self, basin_pulls, active_filaments, word_count, gravity_wells
637
- ) -> Tuple[str, float, str]:
638
- sorted_basins = sorted(basin_pulls.items(), key=lambda x: x[1], reverse=True)
639
- primary_node, primary_str = sorted_basins[0]
640
- lagrange_tol = getattr(BoneConfig, "LAGRANGE_TOLERANCE", 2.0)
641
- if len(sorted_basins) > 1:
642
- secondary_node, secondary_str = sorted_basins[1]
643
- if secondary_str > 0 and (primary_str - secondary_str) < lagrange_tol:
644
- msg = self.logs.get("LAGRANGE", "LAGRANGE").format(p=primary_node.upper(), s=secondary_node.upper())
645
- return "LAGRANGE_POINT", 0.0, msg
646
- flow_ratio = active_filaments / max(1, word_count)
647
- well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0)
648
- if flow_ratio > 0.5 and primary_str < (well_threshold * 2):
649
- msg = self.logs.get("FLOW", "FLOW").format(node=primary_node.upper())
650
- return "WATERSHED_FLOW", 0.0, msg
651
- msg = self.logs.get("ORBIT", "ORBIT").format(node=primary_node.upper(), mass=int(gravity_wells[primary_node]))
652
- return "ORBITAL", 0.0, msg
653
-
654
-
655
- def apply_somatic_feedback(physics_packet: PhysicsPacket, qualia: Any) -> PhysicsPacket:
656
- feedback = physics_packet.snapshot()
657
- tone_effects = {
658
- "Urgent": {"velocity": 0.3, "narrative_drag": -0.5, "voltage": 0.5},
659
- "Strained": {"narrative_drag": 1.2, "voltage": -0.3, "kappa": -0.1},
660
- "Vibrating": {"entropy": 0.2, "voltage": 0.2, "psi": 0.1},
661
- "Resonant": {"valence": 0.3, "beta_index": 0.1, "kappa": 0.2},
662
- "Steady": {},
663
- }
664
- effects = tone_effects.get(qualia.tone, {})
665
- for key, delta in effects.items():
666
- if hasattr(feedback, key):
667
- current = getattr(feedback, key)
668
- setattr(feedback, key, current + delta)
669
- if "Gut Tightening" in qualia.somatic_sensation:
670
- feedback.narrative_drag += 0.7
671
- if "Electric Vibration" in qualia.somatic_sensation:
672
- feedback.voltage += 0.8
673
- if "Golden Glow" in qualia.somatic_sensation:
674
- feedback.valence += 0.5
675
- feedback.psi += 0.2
676
- volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0)
677
- drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0)
678
- drag_halt = getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0)
679
- feedback.voltage = max(0.0, min(feedback.voltage, volt_crit * 1.5))
680
- feedback.narrative_drag = max(drag_floor, min(feedback.narrative_drag, drag_halt))
681
- return feedback
682
-
683
-
684
- class CycleStabilizer:
685
- def __init__(self, events_ref, governor_ref):
686
- self.events = events_ref
687
- self.governor = governor_ref
688
- self.last_tick_time = time.time()
689
- self.pending_drag = 0.0
690
- self.manifolds = getattr(BoneConfig.PHYSICS, "MANIFOLDS", {})
691
- self.HARD_FUSE_VOLTAGE = 100.0
692
- if hasattr(self.events, "subscribe"):
693
- self.events.subscribe(
694
- "DOMESTICATION_PENALTY", self._on_domestication_penalty
695
- )
696
-
697
- def _on_domestication_penalty(self, payload):
698
- amount = payload.get("drag_penalty", 0.0)
699
- self.pending_drag += amount
700
-
701
- def stabilize(self, ctx: CycleContext, current_phase: str):
702
- p = ctx.physics
703
- if p.voltage >= self.HARD_FUSE_VOLTAGE:
704
- ctx.log(
705
- f"{Prisma.RED}⚡ FUSE BLOWN: Voltage > {self.HARD_FUSE_VOLTAGE}V.{Prisma.RST}"
706
- )
707
- p.voltage, p.narrative_drag = 10.0, 5.0
708
- p.flow_state = "SAFE_MODE"
709
- ctx.record_flux(
710
- current_phase, "voltage", self.HARD_FUSE_VOLTAGE, 10.0, "FUSE_BLOWN"
711
- )
712
- return True
713
- if self.pending_drag > 0:
714
- ctx.physics.narrative_drag += self.pending_drag
715
- ctx.log(
716
- f"{Prisma.GRY}⚖️ DOMESTICATION: Drag +{self.pending_drag:.1f}{Prisma.RST}"
717
- )
718
- self.pending_drag = 0.0
719
- now = time.time()
720
- dt = max(0.001, min(1.0, now - self.last_tick_time))
721
- self.last_tick_time = now
722
- manifold = getattr(p, "manifold", "DEFAULT")
723
- cfg = self.manifolds.get(manifold, self.manifolds["DEFAULT"])
724
- target_v = cfg["voltage"]
725
- if getattr(p, "flow_state", "LAMINAR") in ["SUPERCONDUCTIVE", "FLOW_BOOST"]:
726
- target_v = p.voltage
727
- cfg["drag"] = max(0.1, cfg["drag"] * 0.5)
728
- self.governor.recalibrate(target_v, cfg["drag"])
729
- v_force, d_force = self.governor.regulate(p, dt=dt)
730
- c1 = self._apply_force(
731
- ctx,
732
- current_phase,
733
- p,
734
- "voltage",
735
- v_force,
736
- (PHYS_CFG["V_FLOOR"], PHYS_CFG["V_MAX"]),
737
- )
738
- c2 = self._apply_force(ctx, current_phase, p, "narrative_drag", d_force)
739
- return c1 or c2
740
-
741
- @staticmethod
742
- def _apply_force(ctx, phase, p, field, force, limits=None):
743
- if abs(force) <= PHYS_CFG["DEADBAND"]:
744
- return False
745
- old_val = getattr(p, field)
746
- new_val = old_val + force
747
- if limits:
748
- new_val = max(limits[0], min(limits[1], new_val))
749
- else:
750
- new_val = max(0.0, new_val)
751
- setattr(p, field, new_val)
752
- if abs(force) > PHYS_CFG["FLUX_THRESHOLD"]:
753
- ctx.record_flux(phase, field, old_val, new_val, "PID_CORRECTION")
754
- return True