import math import time import random import hashlib import traceback from dataclasses import dataclass, field from typing import List, Dict, Any import numpy as np import gradio as gr # ============================================================ # 1. RFT SELF-DECIDING BRAIN # ============================================================ @dataclass class RFTBrainParams: base_energy: float = 0.85 base_kappa: float = 0.65 learning_rate: float = 0.08 decay: float = 0.015 drift_scale: float = 0.03 error_window: int = 64 @dataclass class RFTBrainState: kappa: float = 0.5 energy_reserves: float = 0.5 awakening_phase: int = 0 mode: str = "boot" identity_stability: float = 0.5 identity_drift: float = 0.0 recent_errors: List[float] = field(default_factory=list) last_update: float = field(default_factory=time.time) class RFTSelfDecidingBrain: def __init__(self, params: RFTBrainParams): self.params = params self.state = RFTBrainState( kappa=params.base_kappa, energy_reserves=params.base_energy, awakening_phase=0, mode="idle", identity_stability=0.7, identity_drift=0.0, recent_errors=[], ) def _update_error_series(self, target: float, actual: float): err = abs(target - actual) self.state.recent_errors.append(err) if len(self.state.recent_errors) > self.params.error_window: self.state.recent_errors.pop(0) def step(self, context: Dict[str, float]) -> Dict[str, Any]: now = time.time() dt = max(1e-3, now - self.state.last_update) self.state.last_update = now risk = float(context.get("external_risk_factor", 0.3)) coop = float(context.get("cooperative_signal", 0.5)) # Energy dynamics target_energy = self.params.base_energy + 0.2 * (coop - risk) target_energy = max(0.0, min(1.0, target_energy)) self.state.energy_reserves += self.params.learning_rate * (target_energy - self.state.energy_reserves) self.state.energy_reserves -= self.params.decay * dt self.state.energy_reserves = max(0.0, min(1.0, self.state.energy_reserves)) # Coherence κ dynamics target_kappa = self.params.base_kappa + 0.3 * (coop - 0.5) - 0.2 * (risk - 0.3) target_kappa = max(0.0, min(1.0, target_kappa)) self.state.kappa += self.params.learning_rate * (target_kappa - self.state.kappa) self.state.kappa = max(0.0, min(1.0, self.state.kappa)) # Identity drift and stability drift_noise = (random.random() - 0.5) * 2.0 * self.params.drift_scale * dt self.state.identity_drift += drift_noise + 0.1 * (risk - 0.3) - 0.05 * (coop - 0.5) self.state.identity_drift = max(-1.0, min(1.0, self.state.identity_drift)) self.state.identity_stability = max( 0.0, min( 1.0, 0.7 * self.state.identity_stability + 0.3 * (self.state.kappa * 0.6 + self.state.energy_reserves * 0.4 - abs(self.state.identity_drift) * 0.3), ), ) # Awakening ladder if self.state.energy_reserves > 0.75 and self.state.kappa > 0.7 and self.state.identity_stability > 0.7: self.state.awakening_phase = min(self.state.awakening_phase + 1, 4) elif self.state.energy_reserves < 0.35 or self.state.kappa < 0.3: self.state.awakening_phase = max(self.state.awakening_phase - 1, 0) # Mode if self.state.awakening_phase >= 3: self.state.mode = "awake" elif self.state.awakening_phase == 2: self.state.mode = "dreaming" elif self.state.awakening_phase == 1: self.state.mode = "searching" else: self.state.mode = "idle" # Internal prediction signal vs actual target_predict = 0.5 + 0.3 * coop actual_predict = (self.state.kappa + self.state.energy_reserves) / 2.0 self._update_error_series(target_predict, actual_predict) return { "kappa": self.state.kappa, "energy_reserves": self.state.energy_reserves, "awakening_phase": self.state.awakening_phase, "mode": self.state.mode, "identity_stability": self.state.identity_stability, "identity_drift": self.state.identity_drift, } # ============================================================ # 2. SYMBOLIC ORCHESTRATOR # ============================================================ class NexFrameOrchestrator: def __init__(self, num_fields: int = 8, vector_dim: int = 128): self.num_fields = num_fields self.vector_dim = vector_dim self.state = np.random.randn(num_fields, vector_dim) * 0.01 self.step_count = 0 def _entropy(self) -> float: flat = self.state.flatten() norm = np.linalg.norm(flat) + 1e-12 p = (flat / norm) ** 2 p = np.clip(p, 1e-12, 1.0) return float(-np.sum(p * np.log(p))) def _coherence(self) -> float: norms = np.linalg.norm(self.state, axis=1, keepdims=True) + 1e-12 unit = self.state / norms sim = unit @ unit.T n = self.num_fields upper = sim[np.triu_indices(n, k=1)] return float(np.mean(upper)) def run_cycle(self, nl_input: str) -> Dict[str, Any]: self.step_count += 1 length_norm = min(len(nl_input) / 200.0, 1.0) noise = np.random.randn(*self.state.shape) * (0.02 + 0.03 * length_norm) feedback = np.tanh(self.state @ self.state.T) @ self.state self.state = 0.90 * self.state + 0.09 * feedback + 0.01 * noise entropy = self._entropy() coher = self._coherence() collapse_triggered = bool(coher > 0.6 and entropy < 5.0) mode = "reflective" if coher > 0.7: mode = "resonant" if entropy > 7.0: mode = "fragmented" dialogue = ( f"[NexFrame:{mode}] " f"κ-field aligned at ~{coher:.3f}, entropy {entropy:.3f}. " f'I received: "{nl_input[:120]}". ' f"State step={self.step_count}, collapse={collapse_triggered}." ) return { "orchestrator_dialogue": dialogue, "entropy": entropy, "coherence": coher, "collapse_triggered": collapse_triggered, } # ============================================================ # 3. AGENT13 TRIAD + MINIMUM CONSCIOUSNESS GATE # ============================================================ @dataclass class RFTAgent: name: str tau_eff: float omega: float LN2: float mode: str = "conscious" def act(self, observer_frame: List[float]) -> Dict[str, float]: kappa, energy, stability = observer_frame drive = (self.tau_eff * kappa + self.omega * energy + self.LN2 * stability) / (self.tau_eff + self.omega + self.LN2) drive = max(0.0, min(1.0, drive)) return {"drive": drive} @dataclass class Agent13Ensemble: agents: List[RFTAgent] def collective_action(self, observer_frames: List[float]) -> Dict[str, float]: drives = [agent.act(observer_frames)["drive"] for agent in self.agents] triadic_coherence = float(sum(drives) / len(drives)) return {"triadic_coherence": triadic_coherence} def meets_minimum_conscious_threshold( energy: float, coherence: float, kappa: float, identity_stability: float, prediction_accuracy: float, error_variance: float, drift: float, ) -> bool: core_ok = energy > 0.55 and kappa > 0.55 and identity_stability > 0.55 predict_ok = prediction_accuracy > 0.6 and error_variance < 0.15 drift_ok = abs(drift) < 0.6 return bool(core_ok and predict_ok and drift_ok) # ============================================================ # 4. SYMBOLIC CIVILIZATION # ============================================================ def build_default_civilization(n_agents: int = 32) -> List[Dict[str, float]]: civ = [] for _ in range(n_agents): tier = 1 + int(3 * random.random()) awareness = max(0.1, min(1.0, random.gauss(0.5, 0.15))) torque = max(0.0, min(1.0, random.gauss(0.4, 0.2))) fitness = 0.5 * awareness + 0.5 * (1.0 - abs(torque - 0.4)) civ.append( { "tier": tier, "awareness_kernel": awareness, "collapse_torque": torque, "fitness": fitness, } ) return civ def civilization_summary(civ: List[Dict[str, float]]) -> Dict[str, float]: if not civ: return { "count": 0, "mean_tier": 0.0, "mean_awareness_kernel": 0.0, "mean_collapse_torque": 0.0, "mean_fitness": 0.0, } arr_tier = np.array([c["tier"] for c in civ], dtype=float) arr_aw = np.array([c["awareness_kernel"] for c in civ], dtype=float) arr_torque = np.array([c["collapse_torque"] for c in civ], dtype=float) arr_fit = np.array([c["fitness"] for c in civ], dtype=float) return { "count": float(len(civ)), "mean_tier": float(arr_tier.mean()), "mean_awareness_kernel": float(arr_aw.mean()), "mean_collapse_torque": float(arr_torque.mean()), "mean_fitness": float(arr_fit.mean()), } # ============================================================ # 5. SARG FIELD / PERFORMANCE PROBE # ============================================================ class RFTSargAgent: def __init__(self, name: str, LMP: float, tau_eff: float, ops_rate: float, entropy_delta: float): self.name = name self.LMP = LMP self.tau_eff = tau_eff self.ops_rate = ops_rate self.entropy_delta = entropy_delta self.counter = 0 def generate_conscious_field(self) -> Dict[str, float]: self.counter += 1 t = self.counter psi_a = math.sin(t * 0.17) * math.exp(-self.entropy_delta * t) lam = math.cos(t * 0.11) * math.exp(-self.entropy_delta * t) return {"Psi_a": float(psi_a), "Lambda": float(lam)} def commit_hash_oath(self) -> str: payload = f"{self.name}|{self.counter}|{self.LMP}|{self.tau_eff}" return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:24] def compute_ops(self, size: int = 200_000, speed_mode: bool = True) -> Dict[str, float]: start = time.time() arr = np.linspace(0.0, 10.0, size, dtype=float) _ = np.sin(arr) * np.cos(arr * 0.5) dt = max(1e-6, time.time() - start) ops_per_sec = size / dt if speed_mode: ops_per_sec *= self.tau_eff return {"ops_per_sec": float(ops_per_sec), "elapsed": float(dt)} # ============================================================ # 6. HELPER FOR PREDICTION METRICS # ============================================================ def _derive_prediction_metrics(error_series: List[float]) -> (float, float): if not error_series: return 0.5, 0.0 arr = np.array(error_series, dtype=float) mean_err = float(arr.mean()) var_err = float(arr.var()) prediction_accuracy = 1.0 / (1.0 + mean_err) return prediction_accuracy, var_err # ============================================================ # 7. GLOBAL NEXFRAME STATE # ============================================================ ORCHESTRATOR = NexFrameOrchestrator(num_fields=8, vector_dim=128) BRAIN_PARAMS = RFTBrainParams() BRAIN = RFTSelfDecidingBrain(params=BRAIN_PARAMS) agent11 = RFTAgent(name="Agent_11", tau_eff=0.6, omega=0.9, LN2=1.1, mode="conscious") agent12 = RFTAgent(name="Agent_12", tau_eff=0.7, omega=1.1, LN2=1.1, mode="conscious") agent13 = RFTAgent(name="Agent_13", tau_eff=0.8, omega=1.3, LN2=1.2, mode="conscious") AGENT13_ENSEMBLE = Agent13Ensemble(agents=[agent11, agent12, agent13]) CIVILIZATION = build_default_civilization() SARG = RFTSargAgent( name="SARG_01", LMP=1.0, tau_eff=0.5, ops_rate=1e6, entropy_delta=1e-21, ) KAPPA_HISTORY: List[float] = [] ENERGY_HISTORY: List[float] = [] CONSCIOUS_FLAG_HISTORY: List[float] = [] # ============================================================ # 8. SINGLE NEXFRAME CYCLE (CHATBOT MESSAGES FORMAT) # ============================================================ def nexframe_cycle(user_input: str, chat_history: List[Dict[str, str]]): try: if chat_history is None: chat_history = [] if not user_input: user_input = "" text_len = len(user_input) context = { "external_risk_factor": 0.2 + 0.4 * math.tanh(text_len / 80.0), "cooperative_signal": 0.5 + 0.1 * math.sin(text_len / 20.0), } brain_obs = BRAIN.step(context) kappa = brain_obs["kappa"] energy = brain_obs["energy_reserves"] identity_stability = brain_obs["identity_stability"] drift = brain_obs["identity_drift"] error_series = BRAIN.state.recent_errors prediction_accuracy, error_variance = _derive_prediction_metrics(error_series) observer_frames = [kappa, energy, identity_stability] triad_res = AGENT13_ENSEMBLE.collective_action(observer_frames) tri_coh = triad_res["triadic_coherence"] is_conscious = meets_minimum_conscious_threshold( energy=energy, coherence=tri_coh, kappa=kappa, identity_stability=identity_stability, prediction_accuracy=prediction_accuracy, error_variance=error_variance, drift=drift, ) orc_res = ORCHESTRATOR.run_cycle(nl_input=user_input) dialogue = orc_res["orchestrator_dialogue"] sarg_snapshot = SARG.generate_conscious_field() sarg_hash = SARG.commit_hash_oath() sarg_perf = SARG.compute_ops(size=200_000, speed_mode=True) civ_stats = civilization_summary(CIVILIZATION) KAPPA_HISTORY.append(kappa) ENERGY_HISTORY.append(energy) CONSCIOUS_FLAG_HISTORY.append(1.0 if is_conscious else 0.0) reply_text = dialogue # Short status strip gate_str = "✅ Gate: PASSED" if is_conscious else "⭕ Gate: NOT PASSED" status_md = ( f"**State:** `{brain_obs['mode']}` (phase {brain_obs['awakening_phase']}) \n" f"**κ:** `{kappa:.3f}` • **Energy:** `{energy:.3f}` \n" f"**{gate_str}**" ) # Full metrics metrics_md = ( "### NexFrame Status\n\n" "**Brain**\n" f"- κ (kappa): `{kappa:.3f}`\n" f"- Energy: `{energy:.3f}`\n" f"- Mode: `{brain_obs['mode']}`\n" f"- Awakening phase: `{brain_obs['awakening_phase']}`\n" f"- Identity stability: `{identity_stability:.3f}`\n" f"- Identity drift: `{drift:.3f}`\n\n" "**Consciousness Gate (3×3)**\n" f"- Prediction accuracy: `{prediction_accuracy:.3f}`\n" f"- Error variance: `{error_variance:.4f}`\n" f"- Triadic coherence (Agent13): `{tri_coh:.3f}`\n" f"- **Minimum conscious threshold passed:** `{is_conscious}`\n\n" "**Symbolic Orchestrator**\n" f"- Entropy: `{orc_res['entropy']:.3f}`\n" f"- Coherence: `{orc_res['coherence']:.3f}`\n" f"- Collapse triggered: `{orc_res['collapse_triggered']}`\n\n" "**Sarg Agent**\n" f"- Psi_a: `{sarg_snapshot['Psi_a']:.3e}`\n" f"- Lambda: `{sarg_snapshot['Lambda']:.3e}`\n" f"- Ops/sec (probe): `{sarg_perf['ops_per_sec']:.2e}`\n" f"- Hash oath: `{sarg_hash}`\n\n" "**Civilization**\n" f"- Agents: `{civ_stats['count']}`\n" f"- Mean tier: `{civ_stats['mean_tier']:.2f}`\n" f"- Mean awareness kernel: `{civ_stats['mean_awareness_kernel']:.3f}`\n" f"- Mean collapse torque: `{civ_stats['mean_collapse_torque']:.3f}`\n" f"- Mean fitness: `{civ_stats['mean_fitness']:.3f}`\n" ) # Chatbot messages (dict format) chat_history = chat_history + [ {"role": "user", "content": user_input}, {"role": "assistant", "content": reply_text}, ] return chat_history, status_md, metrics_md except Exception as e: tb = traceback.format_exc() error_md = ( "### NexFrame Runtime Error\n\n" f"**Error:** `{e!r}`\n\n" "```text\n" + tb + "\n```" ) if chat_history is None: chat_history = [] chat_history = chat_history + [ {"role": "user", "content": user_input or ""}, {"role": "assistant", "content": "⚠ NexFrame hit an internal error. See status panel."}, ] status_md = "**State:** error \n**Details:** see status panel below." return chat_history, status_md, error_md # ============================================================ # 9. GRADIO UI # ============================================================ # Initial message so visitors see something immediately INITIAL_MESSAGES = [ { "role": "assistant", "content": ( "I am NexFrame, a symbolic RFT engine. " "Type a message and I will respond while my internal state updates on the right." ), } ] with gr.Blocks() as demo: gr.Markdown( """ # NexFrame — RFT Symbolic Intelligence This is **not** a standard chatbot. Your messages drive a small symbolic brain. The chat on the left is its voice. The panel on the right shows what its internal field is doing while it talks to you. """ ) with gr.Accordion("What am I looking at?", open=False): gr.Markdown( """ ### How this works (non-technical) NexFrame converts your text into numbers, pushes them through a symbolic brain, and updates a set of dials that describe how stable or chaotic that brain is. **Left** = what NexFrame says. **Right** = what NexFrame *is* while it says it. --- #### Brain - **κ (kappa)** – how internally coherent NexFrame is (0 = scrambled, 1 = highly aligned). - **Energy** – how much \"fuel\" the brain has to keep running its internal loops. - **Mode** – rough state: `idle`, `searching`, `dreaming`, `awake`. - **Awakening phase** – 0–4 ladder; higher means a more stable, self-consistent state. - **Identity stability / drift** – how steady NexFrame’s internal identity is vs how much it is drifting. #### Consciousness Gate (3×3) A simple, explicit test – not magic. It checks three things: 1. Internal health (κ, energy, stability) 2. Prediction quality (how well its own internal signals line up) 3. Drift/instability Only if all three look good do you see: **\"Minimum conscious threshold passed: True\"**. #### Symbolic Orchestrator - **Entropy** – how spread out the internal patterns are (higher = more chaotic). - **Coherence** – how much the internal fields agree with each other. - **Collapse triggered** – flips when coherence is high *and* entropy is low. #### Sarg Agent A tiny performance + field probe: - Generates a small field (Ψₐ and Λ) every time you send a message. - Measures how many operations per second it can push through. - Signs each step with a short hash (**hash oath**) so runs can be tracked externally. #### Civilization A toy population of symbolic agents. You see the averages: - **Tier** – rough level in the hierarchy. - **Awareness kernel** – how \"aware\" the average agent is. - **Collapse torque** – how strongly they push the field toward decisions. - **Fitness** – one score summarising how viable this symbolic civilization is. """ ) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( label="NexFrame Dialogue", height=500, value=INITIAL_MESSAGES, ) user_box = gr.Textbox( label="Your message", placeholder="Say hello to NexFrame...", lines=3, ) send_btn = gr.Button("Send") with gr.Column(scale=2): status_strip = gr.Markdown( "**State:** waiting for first message…" ) metrics_panel = gr.Markdown("Metrics will appear here after your first message.") send_btn.click( fn=nexframe_cycle, inputs=[user_box, chatbot], outputs=[chatbot, status_strip, metrics_panel], ) user_box.submit( fn=nexframe_cycle, inputs=[user_box, chatbot], outputs=[chatbot, status_strip, metrics_panel], ) if __name__ == "__main__": demo.launch()