vsl-cryosomatic-hypervisor / bone_physics.py
aedmark's picture
Upload 50 files
f7fce63 verified
raw
history blame
29.1 kB
import math
import random
import time
from collections import Counter, deque
from dataclasses import dataclass
from typing import Dict, List, Any, Tuple, Optional, Deque
from bone_config import BoneConfig
from bone_core import LoreManifest
from bone_lexicon import LexiconService
from bone_types import (
Prisma,
PhysicsPacket,
CycleContext,
SpatialState,
MaterialState,
EnergyState,
)
@dataclass
class PhysicsDelta:
operator: str
field: str
value: float
source: str
message: Optional[str] = None
TRIGRAM_MAP: Dict[str, Tuple[str, str, str, str]] = {
"VEL": ("☳", "ZHEN", "Thunder", Prisma.GRN),
"STR": ("☶", "GEN", "Mountain", Prisma.SLATE),
"ENT": ("☵", "KAN", "Water", Prisma.BLU),
"PHI": ("☲", "LI", "Fire", Prisma.RED),
"PSI": ("☰", "QIAN", "Heaven", Prisma.WHT),
"BET": ("☴", "XUN", "Wind", Prisma.CYN),
"E": ("☷", "KUN", "Earth", Prisma.OCHRE),
"DEL": ("☱", "DUI", "Lake", Prisma.MAG),
}
PHYS_CFG = {
"V_MAX": getattr(BoneConfig.PHYSICS, "VOLTAGE_MAX", 20.0),
"V_FLOOR": getattr(BoneConfig.PHYSICS, "VOLTAGE_FLOOR", 0.0),
"V_CRIT": getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0),
"DRAG_FLOOR": getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0),
"DRAG_HALT": getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0),
"FLUX_THRESHOLD": 0.5,
"DEADBAND": 0.05
}
@dataclass
class GeodesicVector:
tension: float
compression: float
coherence: float
abstraction: float
dimensions: Dict[str, float]
class GeodesicConstants:
DENSITY_SCALAR = 20.0
SQUELCH_LIMIT_MULT = 3.0
MIN_VOLUME_SCALAR = 0.5
SUBURBAN_FRICTION_LOG_BASE = 5.0
HEAVY_FRICTION_MULT = 2.5
SOLVENT_LUBRICATION_FACTOR = 0.05
SHEAR_RESISTANCE_SCALAR = 2.0
KINETIC_LIFT_RATIO = 0.5
PLAY_LIFT_MULT = 2.5
COMPRESSION_SCALAR = 10.0
ABSTRACTION_BASE = 0.2
MAX_VISCOSITY_DENSITY = 2.0
MAX_LIFT_DENSITY = 2.0
SAFE_VOL_THRESHOLD = 3
class GeodesicEngine:
@staticmethod
def collapse_wavefunction(
clean_words: List[str], counts: Dict[str, int]
) -> GeodesicVector:
volume = max(1, len(clean_words))
masses = GeodesicEngine._weigh_mass(counts)
forces = GeodesicEngine._calculate_forces(masses, counts, volume)
dimensions = GeodesicEngine._calculate_dimensions(
masses, forces, counts, volume
)
return GeodesicVector(
tension=forces["tension"],
compression=forces["compression"],
coherence=forces["coherence"],
abstraction=forces["abstraction"],
dimensions=dimensions,
)
@staticmethod
def _weigh_mass(counts: Dict[str, int]) -> Dict[str, float]:
keys = [
"heavy",
"kinetic",
"constructive",
"abstract",
"play",
"social",
"explosive",
"void",
"liminal",
"meat",
"harvest",
"pareidolia",
"crisis_term",
]
return {k: float(counts.get(k, 0)) for k in keys}
@staticmethod
def _calculate_forces(
masses: Dict[str, float], counts: Dict[str, int], volume: int
) -> Dict[str, float]:
cfg = BoneConfig.PHYSICS
GC = GeodesicConstants
safe_volume = max(1, volume)
w_heavy = getattr(cfg, "WEIGHT_HEAVY", 2.0)
w_kinetic = getattr(cfg, "WEIGHT_KINETIC", 1.5)
w_explosive = getattr(cfg, "WEIGHT_EXPLOSIVE", 3.0)
w_constructive = getattr(cfg, "WEIGHT_CONSTRUCTIVE", 1.2)
raw_tension_mass = (
(masses["heavy"] * w_heavy)
+ (masses["kinetic"] * w_kinetic)
+ (masses["explosive"] * w_explosive)
+ (masses["constructive"] * w_constructive)
)
total_kinetic = masses["kinetic"] + masses["explosive"]
kinetic_gain = getattr(BoneConfig, "KINETIC_GAIN", 1.0)
base_tension = (
(raw_tension_mass / safe_volume) * GC.DENSITY_SCALAR * kinetic_gain
)
squelch_limit = (
getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) * GC.SQUELCH_LIMIT_MULT
)
mass_scalar = min(1.0, safe_volume / squelch_limit)
if safe_volume < GC.SAFE_VOL_THRESHOLD:
mass_scalar *= GC.MIN_VOLUME_SCALAR
tension = round(min(100.0, base_tension * mass_scalar), 2)
shear_rate = total_kinetic / safe_volume
suburban_friction = (
math.log1p(counts.get("suburban", 0)) * GC.SUBURBAN_FRICTION_LOG_BASE
)
raw_friction = suburban_friction + (masses["heavy"] * GC.HEAVY_FRICTION_MULT)
lubrication = 1.0 + (counts.get("solvents", 0) * GC.SOLVENT_LUBRICATION_FACTOR)
dynamic_viscosity = (raw_friction / lubrication) / (
1.0 + (shear_rate * GC.SHEAR_RESISTANCE_SCALAR)
)
kinetic_lift = (total_kinetic * GC.KINETIC_LIFT_RATIO) / (
masses["heavy"] * 0.5 + 1.0
)
lift = (masses["play"] * GC.PLAY_LIFT_MULT) + kinetic_lift
viscosity_density = dynamic_viscosity / safe_volume
lift_density = lift / safe_volume
raw_compression = (viscosity_density - lift_density) * GC.COMPRESSION_SCALAR
raw_compression *= getattr(BoneConfig, "SIGNAL_DRAG_MULTIPLIER", 1.0)
compression = round(
max(-5.0, min(PHYS_CFG["DRAG_HALT"], raw_compression * mass_scalar)), 2
)
structural_mass = masses["heavy"] + masses["constructive"] + masses["harvest"]
structural_mass -= masses["void"] * 0.5
structural_mass = max(0.0, structural_mass) # [MEADOWS] Plug the void leak
shapley_thresh = getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0)
total_abstract = (
masses["abstract"] +
masses["liminal"] +
masses["pareidolia"] +
masses["void"]
)
abstraction_val = (total_abstract / safe_volume) + GC.ABSTRACTION_BASE
return {
"tension": tension,
"compression": compression,
"coherence": round(min(1.0, structural_mass / max(1.0, shapley_thresh)), 3),
"abstraction": round(min(1.0, abstraction_val), 2),
}
@staticmethod
def _calculate_dimensions(masses, forces, counts, volume) -> Dict[str, float]:
inv_vol = 1.0 / max(1, volume)
base_mass = 0.1
str_mass = masses["heavy"] * 2.0 + masses["constructive"] + masses["harvest"]
ent_mass = (counts.get("antigen", 0) * 3.0) + masses["meat"] + masses["crisis_term"]
psi_mass = forces["abstraction"]
return {
"VEL": max(
0.0,
min(
1.0,
(masses["kinetic"] * 2.0 - forces["compression"] + base_mass)
* inv_vol,
),
),
"STR": max(0.0, min(1.0, (str_mass + base_mass) * inv_vol)),
"ENT": max(0.0, min(1.0, ent_mass * inv_vol)),
"PHI": max(
0.0,
min(1.0, (masses["heavy"] + masses["kinetic"] + base_mass) * inv_vol),
),
"PSI": max(0.0, min(1.0, psi_mass)),
"BET": max(0.0, min(1.0, (masses["social"] * 2.0) * inv_vol)),
"DEL": max(0.0, min(1.0, (masses["play"] * 3.0) * inv_vol)),
"E": max(0.0, min(1.0, (counts.get("solvents", 0)) * inv_vol)),
}
class TheGatekeeper:
def __init__(self, lexicon_ref, memory_ref=None):
self.lex = lexicon_ref
self.mem = memory_ref
def check_entry(
self, ctx: CycleContext, current_atp: float = 20.0
) -> Tuple[bool, Optional[Dict]]:
phys = ctx.physics
starvation_threshold = getattr(BoneConfig.BIO, "ATP_STARVATION", 5.0)
if current_atp < (starvation_threshold * 0.5):
return False, self._pack_refusal(
ctx,
"DARK_SYSTEM",
"Energy critical. The inputs dissolve into the void.",
)
if phys.counts.get("antigen", 0) > 2:
return False, self._pack_refusal(
ctx,
"TOXICITY",
f"{Prisma.RED}IMMUNE REACTION: Input rejected as pathogenic.{Prisma.RST}",
)
if self._audit_safety(ctx.clean_words):
return False, self._pack_refusal(
ctx,
"CURSED_INPUT",
f"{Prisma.RED}The Gatekeeper recoils. Cursed syntax detected.{Prisma.RST}",
)
text = ctx.input_text
if "```" in text or "{{" in text or "}}" in text:
return False, self._pack_refusal(
ctx,
"SYNTAX_ERR",
f"{Prisma.RED}The mechanism jams. Syntax anomaly detected.{Prisma.RST}",
)
if len(text) > 10000:
return False, self._pack_refusal(
ctx,
"OVERLOAD",
f"{Prisma.OCHRE}Input too long. Compress your thought.{Prisma.RST}",
)
return True, None
def _audit_safety(self, words: List[str]) -> bool:
cursed = self.lex.get("cursed")
return (
not cursed.isdisjoint(words)
if isinstance(cursed, set)
else any(w in cursed for w in words)
)
@staticmethod
def _pack_refusal(ctx, type_str, ui_msg):
return {"type": type_str, "ui": ui_msg, "logs": ctx.logs + [ui_msg]}
class QuantumObserver:
def __init__(self, events):
self.events = events
self.voltage_history: Deque[float] = deque(maxlen=5)
self.last_physics_packet: Optional[PhysicsPacket] = None
def gaze(self, text: str, graph: Dict = None) -> Dict:
clean_words = LexiconService.clean(text)
counts = self._tally_categories(clean_words)
geo = GeodesicEngine.collapse_wavefunction(clean_words, counts)
self.voltage_history.append(geo.tension)
smoothed_voltage = round(
sum(self.voltage_history) / len(self.voltage_history), 2
)
e_metric, beta_val = self._calculate_metrics(text, counts)
valence = LexiconService.get_valence(clean_words)
graph_mass = self._calculate_graph_mass(clean_words, graph)
energy = EnergyState(
voltage=smoothed_voltage,
entropy=e_metric,
beta_index=beta_val,
mass=round(graph_mass, 1),
psi=geo.abstraction,
kappa=geo.coherence,
valence=valence,
velocity=0.0,
turbulence=0.0,
)
matter = MaterialState(
clean_words=clean_words,
raw_text=text,
counts=counts,
antigens=counts.get("antigen", 0),
vector=geo.dimensions,
truth_ratio=0.5,
)
space = SpatialState(
narrative_drag=geo.compression,
zone=self._determine_zone(geo.dimensions),
atmosphere="NEUTRAL",
flow_state=self._determine_flow(smoothed_voltage, geo.coherence),
)
self.last_physics_packet = PhysicsPacket(
energy=energy, matter=matter, space=space
)
packet_dict = self.last_physics_packet.to_dict()
if hasattr(self.events, "publish"):
self.events.publish("PHYSICS_CALCULATED", packet_dict)
return {"physics": self.last_physics_packet, "clean_words": clean_words}
@staticmethod
def _tally_categories(clean_words: List[str]) -> Counter:
counts = Counter()
solvents = LexiconService.get("solvents") or set()
for w in clean_words:
if w in solvents:
counts["solvents"] += 1
continue
cats = LexiconService.get_categories_for_word(w)
if cats:
counts.update(cats)
else:
flavor, conf = LexiconService.taste(w)
if flavor and conf > 0.5:
counts[flavor] += 1
return counts
@staticmethod
def _calculate_graph_mass(words: List[str], graph: Optional[Dict]) -> float:
if not graph:
return 0.0
total_mass = 0.0
existing_nodes = [w for w in words if w in graph]
for w in existing_nodes:
edges = graph[w].get("edges", {})
edge_weight_sum = sum(edges.values()) if edges else 0.0
node_mass = min(50.0, edge_weight_sum)
total_mass += node_mass
return total_mass
@staticmethod
def _calculate_metrics(
text: str, counts: Dict[str, int]
) -> Tuple[float, float]:
length = len(text)
if length == 0:
return 0.0, 0.0
scalar = getattr(BoneConfig.PHYSICS, "TEXT_LENGTH_SCALAR", 1500.0)
raw_chaos = length / scalar
solvents = counts.get("solvents", 0)
solvent_density = solvents / max(1.0, length / 5.0)
glue_factor = min(1.0, solvent_density * 2.0)
e_metric = min(1.0, raw_chaos * (1.0 - (glue_factor * 0.8)))
structure_chars = sum(1 for char in text if char in "!?%@#$;,")
heavy_words = (
counts.get("heavy", 0)
+ counts.get("constructive", 0)
+ counts.get("sacred", 0)
)
structure_score = structure_chars + (heavy_words * 2)
beta_index = min(
1.0, math.log1p(structure_score + 1) / math.log1p(length * 0.1 + 1)
)
if length < 50:
beta_index *= length / 50.0
return round(e_metric, 3), round(beta_index, 3)
@staticmethod
def _determine_flow(v: float, k: float) -> str:
volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0)
kappa_strong = 0.8
if v > volt_flow and k > kappa_strong:
return "SUPERCONDUCTIVE"
if v > 10.0:
return "TURBULENT"
return "LAMINAR"
@staticmethod
def _determine_zone(vector: Dict[str, float]) -> str:
if not vector:
return "COURTYARD"
dom = max(vector, key=vector.get)
if dom in ["PSI", "DEL"]:
return "AERIE"
if dom in ["STR", "PHI"]:
return "THE_FORGE"
if dom in ["ENT", "VEL"]:
return "THE_MUD"
return "COURTYARD"
class SurfaceTension:
@staticmethod
def audit_hubris(physics: Dict[str, Any]) -> Tuple[bool, str, str]:
voltage = physics.get("voltage", 0.0)
coherence = physics.get("kappa", 0.5)
volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0)
volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0)
if voltage >= volt_crit and coherence < 0.4:
return (
True,
f"⚠️ HUBRIS DETECTED: Voltage ({voltage:.1f}v) exceeds structural integrity. Wings melting.",
"ICARUS_CRASH",
)
if voltage > volt_flow and coherence > 0.8:
return (
True,
"🌊 SURFACE TENSION OPTIMAL: Entering Flow State.",
"FLOW_BOOST",
)
return False, "", ""
class ChromaScope:
@staticmethod
def modulate(text: str, vector: Dict[str, float]) -> str:
if not vector or not any(vector.values()):
return f"{Prisma.GRY}{text}{Prisma.RST}"
primary_dim = max(vector, key=vector.get)
if primary_dim in TRIGRAM_MAP:
selected_color = TRIGRAM_MAP[primary_dim][3]
else:
selected_color = Prisma.GRY
return f"{selected_color}{text}{Prisma.RST}"
class ZoneInertia:
def __init__(self, inertia=0.7):
self.inertia = inertia
self.min_dwell = getattr(BoneConfig.PHYSICS, "ZONE_MIN_DWELL", 2)
self.current_zone = "COURTYARD"
self.dwell_counter = 0
self.last_vector: Optional[Tuple[float, float, float]] = None
self.is_anchored = False
self.strain_gauge = 0.0
def toggle_anchor(self) -> bool:
self.is_anchored = not self.is_anchored
self.strain_gauge = 0.0
return self.is_anchored
def stabilize(
self,
proposed_zone: str,
physics: Dict[str, Any],
cosmic_state: Tuple[str, float, str],
) -> Tuple[str, Optional[str]]:
beta = physics.get("beta_index", 1.0)
truth = physics.get("truth_ratio", 0.5)
grav_pull = 1.0 if cosmic_state[0] != "VOID_DRIFT" else 0.0
current_vec = (beta, truth, grav_pull)
self.dwell_counter += 1
pressure = 0.0
if self.last_vector:
dist = math.dist(current_vec, self.last_vector)
similarity = max(0.0, 1.0 - (dist / 2.0))
pressure = 1.0 - similarity
if self.is_anchored:
return self._handle_anchored_state(proposed_zone, pressure)
if proposed_zone == self.current_zone:
self.dwell_counter = 0
self.last_vector = current_vec
return proposed_zone, None
if self.dwell_counter < self.min_dwell:
return self.current_zone, None
return self._attempt_migration(proposed_zone, pressure)
def _handle_anchored_state(
self, proposed_zone: str, pressure: float
) -> Tuple[str, Optional[str]]:
if proposed_zone == self.current_zone:
self.strain_gauge = max(0.0, self.strain_gauge - 0.1)
return self.current_zone, None
self.strain_gauge += pressure
limit = 2.5
if self.strain_gauge > limit:
self.is_anchored = False
self.strain_gauge = 0.0
self.current_zone = proposed_zone
return (
proposed_zone,
f"{Prisma.RED}⚡ SNAP! The narrative current was too strong. Anchor failed.{Prisma.RST}",
)
return (
self.current_zone,
f"{Prisma.OCHRE}⚓ ANCHORED: Resisting drift to '{proposed_zone}' (Strain {self.strain_gauge:.1f}/{limit}){Prisma.RST}",
)
def _attempt_migration(
self, proposed_zone: str, pressure: float
) -> Tuple[str, Optional[str]]:
prob = (1.0 - self.inertia) + pressure
if proposed_zone in ["AERIE", "THE_FORGE"]:
prob += 0.2
if random.random() < prob:
old, self.current_zone = self.current_zone, proposed_zone
self.dwell_counter = 0
return (
self.current_zone,
f"{Prisma.CYN}>>> MIGRATION: {old} -> {proposed_zone}.{Prisma.RST}",
)
return self.current_zone, None
@staticmethod
def override_cosmic_drag(cosmic_drag_penalty: float, current_zone: str) -> float:
if current_zone == "AERIE" and cosmic_drag_penalty > 0:
return cosmic_drag_penalty * 0.3
return cosmic_drag_penalty
class CosmicDynamics:
def __init__(self):
self.voltage_history: Deque[float] = deque(maxlen=20)
self.cached_wells: Dict = {}
self.cached_hubs: Dict = {}
self.last_scan_tick: int = 0
self.SCAN_INTERVAL: int = 10
self.logs = self._load_logs()
@staticmethod
def _load_logs():
base = {
"GRAVITY": "⚓ GRAVITY: The narrative is heavy. (Drag {drag:.1f})",
"VOID": "VOID: Drifting outside the filaments.",
"NEBULA": "NEBULA: Floating near '{node}' (Mass {mass}). Not enough mass for orbit.",
"LAGRANGE": "LAGRANGE: Caught between '{p}' and '{s}'",
"FLOW": "FLOW: Streaming towards '{node}'",
"ORBIT": "ORBIT: Circling '{node}' (Mass {mass})"
}
manifest = LoreManifest.get_instance().get("narrative_data") or {}
return manifest.get("COSMIC_LOGS", base)
def commit(self, voltage: float):
self.voltage_history.append(voltage)
def check_gravity(
self, current_drift: float, psi: float
) -> Tuple[float, List[str]]:
logs = []
new_drag = current_drift
drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0)
if new_drag < drag_floor:
new_drag += 0.05
if psi > 0.5:
reduction = (psi - 0.5) * 0.2
new_drag = max(0.0, new_drag - reduction)
CRITICAL_DRIFT = getattr(BoneConfig.PHYSICS, "DRAG_CRITICAL", 8.0)
if new_drag > CRITICAL_DRIFT:
if random.random() < 0.3:
msg = self.logs.get("GRAVITY", "⚓ GRAVITY").format(drag=new_drag)
logs.append(f"{Prisma.GRY}{msg}{Prisma.RST}")
return new_drag, logs
def analyze_orbit(
self, network: Any, clean_words: List[str]
) -> Tuple[str, float, str]:
if (
not clean_words
or not network
or not hasattr(network, "graph")
or not network.graph
):
return "VOID_DRIFT", 3.0, "VOID: Deep Space. No connection."
current_time = int(time.time())
if (
not self.cached_wells
or (current_time - self.last_scan_tick) > self.SCAN_INTERVAL
):
gravity_wells, geodesic_hubs = self._scan_network_mass(network)
self.cached_wells = gravity_wells
self.cached_hubs = geodesic_hubs
self.last_scan_tick = current_time
else:
gravity_wells = self.cached_wells
geodesic_hubs = self.cached_hubs
basin_pulls, active_filaments = self._calculate_pull(
clean_words, network, gravity_wells
)
if sum(basin_pulls.values()) == 0:
return self._handle_void_state(clean_words, geodesic_hubs)
return self._resolve_orbit(
basin_pulls, active_filaments, len(clean_words), gravity_wells
)
@staticmethod
def _scan_network_mass(network) -> Tuple[Dict, Dict]:
gravity_wells = {}
geodesic_hubs = {}
well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0)
geo_strength = getattr(BoneConfig, "GEODESIC_STRENGTH", 10.0)
for node in network.graph:
mass = network.calculate_mass(node)
if mass >= well_threshold:
gravity_wells[node] = mass
elif mass >= geo_strength:
geodesic_hubs[node] = mass
return gravity_wells, geodesic_hubs
@staticmethod
def _calculate_pull(words, network, gravity_wells) -> Tuple[Dict, int]:
basin_pulls = {k: 0.0 for k in gravity_wells}
active_filaments = 0
word_counts = Counter(words)
for w, count in word_counts.items():
if w in gravity_wells:
basin_pulls[w] += (gravity_wells[w] * 2.0) * count
active_filaments += count
for well, well_mass in gravity_wells.items():
edges = network.graph.get(well, {}).get("edges", {})
if not edges:
continue
intersection = set(word_counts.keys()).intersection(edges.keys())
for match in intersection:
basin_pulls[well] += (well_mass * 0.5) * word_counts[match]
active_filaments += word_counts[match]
return basin_pulls, active_filaments
def _handle_void_state(self, words, geodesic_hubs) -> Tuple[str, float, str]:
for w in words:
hub_mass = geodesic_hubs.get(w)
if hub_mass is not None:
msg = self.logs.get("NEBULA", "NEBULA").format(
node=w.upper(),
mass=int(hub_mass)
)
return "PROTO_COSMOS", 1.0, msg
return "VOID_DRIFT", 3.0, self.logs.get("VOID", "VOID")
def _resolve_orbit(
self, basin_pulls, active_filaments, word_count, gravity_wells
) -> Tuple[str, float, str]:
sorted_basins = sorted(basin_pulls.items(), key=lambda x: x[1], reverse=True)
primary_node, primary_str = sorted_basins[0]
lagrange_tol = getattr(BoneConfig, "LAGRANGE_TOLERANCE", 2.0)
if len(sorted_basins) > 1:
secondary_node, secondary_str = sorted_basins[1]
if secondary_str > 0 and (primary_str - secondary_str) < lagrange_tol:
msg = self.logs.get("LAGRANGE", "LAGRANGE").format(p=primary_node.upper(), s=secondary_node.upper())
return "LAGRANGE_POINT", 0.0, msg
flow_ratio = active_filaments / max(1, word_count)
well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0)
if flow_ratio > 0.5 and primary_str < (well_threshold * 2):
msg = self.logs.get("FLOW", "FLOW").format(node=primary_node.upper())
return "WATERSHED_FLOW", 0.0, msg
msg = self.logs.get("ORBIT", "ORBIT").format(node=primary_node.upper(), mass=int(gravity_wells[primary_node]))
return "ORBITAL", 0.0, msg
def apply_somatic_feedback(physics_packet: PhysicsPacket, qualia: Any) -> PhysicsPacket:
feedback = physics_packet.snapshot()
tone_effects = {
"Urgent": {"velocity": 0.3, "narrative_drag": -0.5, "voltage": 0.5},
"Strained": {"narrative_drag": 1.2, "voltage": -0.3, "kappa": -0.1},
"Vibrating": {"entropy": 0.2, "voltage": 0.2, "psi": 0.1},
"Resonant": {"valence": 0.3, "beta_index": 0.1, "kappa": 0.2},
"Steady": {},
}
effects = tone_effects.get(qualia.tone, {})
for key, delta in effects.items():
if hasattr(feedback, key):
current = getattr(feedback, key)
setattr(feedback, key, current + delta)
if "Gut Tightening" in qualia.somatic_sensation:
feedback.narrative_drag += 0.7
if "Electric Vibration" in qualia.somatic_sensation:
feedback.voltage += 0.8
if "Golden Glow" in qualia.somatic_sensation:
feedback.valence += 0.5
feedback.psi += 0.2
volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0)
drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0)
drag_halt = getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0)
feedback.voltage = max(0.0, min(feedback.voltage, volt_crit * 1.5))
feedback.narrative_drag = max(drag_floor, min(feedback.narrative_drag, drag_halt))
return feedback
class CycleStabilizer:
def __init__(self, events_ref, governor_ref):
self.events = events_ref
self.governor = governor_ref
self.last_tick_time = time.time()
self.pending_drag = 0.0
self.manifolds = getattr(BoneConfig.PHYSICS, "MANIFOLDS", {})
self.HARD_FUSE_VOLTAGE = 100.0
if hasattr(self.events, "subscribe"):
self.events.subscribe(
"DOMESTICATION_PENALTY", self._on_domestication_penalty
)
def _on_domestication_penalty(self, payload):
amount = payload.get("drag_penalty", 0.0)
self.pending_drag += amount
def stabilize(self, ctx: CycleContext, current_phase: str):
p = ctx.physics
if p.voltage >= self.HARD_FUSE_VOLTAGE:
ctx.log(
f"{Prisma.RED}⚡ FUSE BLOWN: Voltage > {self.HARD_FUSE_VOLTAGE}V.{Prisma.RST}"
)
p.voltage, p.narrative_drag = 10.0, 5.0
p.flow_state = "SAFE_MODE"
ctx.record_flux(
current_phase, "voltage", self.HARD_FUSE_VOLTAGE, 10.0, "FUSE_BLOWN"
)
return True
if self.pending_drag > 0:
ctx.physics.narrative_drag += self.pending_drag
ctx.log(
f"{Prisma.GRY}⚖️ DOMESTICATION: Drag +{self.pending_drag:.1f}{Prisma.RST}"
)
self.pending_drag = 0.0
now = time.time()
dt = max(0.001, min(1.0, now - self.last_tick_time))
self.last_tick_time = now
manifold = getattr(p, "manifold", "DEFAULT")
cfg = self.manifolds.get(manifold, self.manifolds["DEFAULT"])
target_v = cfg["voltage"]
if getattr(p, "flow_state", "LAMINAR") in ["SUPERCONDUCTIVE", "FLOW_BOOST"]:
target_v = p.voltage
cfg["drag"] = max(0.1, cfg["drag"] * 0.5)
self.governor.recalibrate(target_v, cfg["drag"])
v_force, d_force = self.governor.regulate(p, dt=dt)
c1 = self._apply_force(
ctx,
current_phase,
p,
"voltage",
v_force,
(PHYS_CFG["V_FLOOR"], PHYS_CFG["V_MAX"]),
)
c2 = self._apply_force(ctx, current_phase, p, "narrative_drag", d_force)
return c1 or c2
@staticmethod
def _apply_force(ctx, phase, p, field, force, limits=None):
if abs(force) <= PHYS_CFG["DEADBAND"]:
return False
old_val = getattr(p, field)
new_val = old_val + force
if limits:
new_val = max(limits[0], min(limits[1], new_val))
else:
new_val = max(0.0, new_val)
setattr(p, field, new_val)
if abs(force) > PHYS_CFG["FLUX_THRESHOLD"]:
ctx.record_flux(phase, field, old_val, new_val, "PID_CORRECTION")
return True