| |
| """ |
| UBERMENSCHETIEN HEAVEN ENGINE + CF-HoT MULTI-HEAD COGNITIVE CONTROL |
| -------------------------------------------------------------------- |
| Integration: Hermes-3 for generation + LHT for reasoning + CF-HoT for behavioral control |
| |
| CF-HoT Heads: |
| - Repetition: 125x separation (PRODUCTION) |
| - Verbosity: 2.1x separation (USABLE) |
| - Hedging: 1.5x separation (CONTRIBUTING) |
| |
| "An 8B that behaves like an 80B" |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import shutil |
| import subprocess |
| import traceback |
| import random |
| import math |
| import statistics |
| import re |
| from datetime import datetime |
| from typing import List, Dict, Any, Optional, Tuple |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| |
| ROOT = os.path.dirname(os.path.abspath(__file__)) |
| DATA_DIR = os.path.join(ROOT, "data") |
| SCRIPT_DIR = os.path.join(ROOT, "scripts") |
| RUN_DIR = os.path.join(ROOT, "runs") |
| LHT_DIR = os.path.join(ROOT, "lht") |
|
|
| |
| CFHOT_CHECKPOINT = os.path.join(ROOT, "results/cfhot_risk_v2/ckpt_5000") |
| MULTI_HEAD_DIR = os.path.join(ROOT, "results/multi_head_v2") |
|
|
| for path in [DATA_DIR, SCRIPT_DIR, RUN_DIR, LHT_DIR]: |
| os.makedirs(path, exist_ok=True) |
|
|
| |
| VOICE_OK = False |
| try: |
| import pyttsx3 |
| TTS = pyttsx3.init() |
| VOICE_OK = True |
| except: |
| pass |
|
|
| VECTOR_OK = False |
| try: |
| import chromadb |
| from sentence_transformers import SentenceTransformer |
| EMBED_MODEL = os.environ.get("UBERMENCHETIEN_EMBED_MODEL", "all-MiniLM-L6-v2") |
| _client = chromadb.Client() |
| _collection = _client.get_or_create_collection("ubermenschetien_memory") |
| _embedder = SentenceTransformer(EMBED_MODEL) |
| VECTOR_OK = True |
| except: |
| pass |
|
|
| |
| LHT_OK = False |
| try: |
| from lht import LieHolonomyTransformer, LHTConfig, WaypointDetector |
| LHT_OK = True |
| print("[lht] Lie-Holonomy modules loaded") |
| except ImportError: |
| print("[lht] Not available - running without geometric reasoning") |
|
|
| |
| PEFT_OK = False |
| try: |
| from peft import PeftModel |
| PEFT_OK = True |
| except ImportError: |
| print("[warning] PEFT not installed") |
|
|
|
|
| |
| |
| |
| class MultiHeadPredictor(nn.Module): |
| """ |
| Multi-head cognitive control predictor. |
| Shared fiber projections with separate heads for each behavioral pattern. |
| """ |
| def __init__(self, d_model: int, n_layers: int, d_fiber: int = 16, d_control: int = 64): |
| super().__init__() |
| self.d_model = d_model |
| self.n_layers = n_layers |
| self.d_fiber = d_fiber |
| |
| |
| self.fiber_projs = nn.ModuleList([ |
| nn.Linear(d_model, d_fiber, bias=False) for _ in range(n_layers) |
| ]) |
| self.layer_weights = nn.Parameter(torch.ones(n_layers) / n_layers) |
| |
| |
| self.heads = nn.ModuleDict({ |
| 'repetition': self._make_head(d_fiber, d_control), |
| 'hedging': self._make_head(d_fiber, d_control), |
| 'verbosity': self._make_head(d_fiber, d_control), |
| }) |
| |
| self.loaded_heads = set() |
| |
| def _make_head(self, d_fiber, d_control): |
| return nn.Sequential( |
| nn.Linear(d_fiber, d_control), nn.GELU(), |
| nn.Linear(d_control, d_control), nn.GELU(), |
| nn.Linear(d_control, 1) |
| ) |
| |
| def get_all_risks(self, hidden_states: List[torch.Tensor]) -> Dict[str, torch.Tensor]: |
| """Get risk scores from ALL loaded heads in a single pass.""" |
| fibers = [proj(h.float()) for proj, h in zip(self.fiber_projs, hidden_states)] |
| weights = F.softmax(self.layer_weights[:len(fibers)], dim=0) |
| aggregated = sum(w * f for w, f in zip(weights, fibers)) |
| |
| risks = {} |
| for head_name in self.loaded_heads: |
| logits = self.heads[head_name](aggregated).squeeze(-1) |
| risks[head_name] = torch.sigmoid(logits) |
| |
| return risks |
| |
| def load_head(self, head_name: str, checkpoint_path: str): |
| """Load a trained head from checkpoint.""" |
| if not os.path.exists(checkpoint_path): |
| print(f"[cf-hot] WARNING: Checkpoint not found: {checkpoint_path}") |
| return False |
| |
| ckpt = torch.load(checkpoint_path, weights_only=False, map_location='cpu') |
| self.heads[head_name].load_state_dict(ckpt['head_state']) |
| self.loaded_heads.add(head_name) |
| |
| sep = ckpt.get('result', {}).get('separation', 0) |
| print(f"[cf-hot] Loaded {head_name} head (separation: {sep:.1f}x)") |
| return True |
|
|
|
|
| |
| |
| |
| class Config: |
| system = ("Übermenschetien Heaven Engine: Machiavellian mastermind, disciplined builder, " |
| "Nietzschean Übermensch with Soviet cybernetic rigor + Lie-Holonomy geometric reasoning " |
| "+ CF-HoT cognitive control.") |
| temperature = 1.01 |
| top_p = 0.92 |
| repetition_penalty = 1.05 |
| max_new_tokens = 500 |
|
|
| use_voice = False |
| use_vector_memory = VECTOR_OK |
| use_lht_reasoning = LHT_OK |
| use_cfhot = True |
| autonomy = False |
| reflect_every = 3 |
| lht_consistency_threshold = 0.5 |
| |
| |
| cfhot_repetition_threshold = 0.7 |
| cfhot_hedging_threshold = 0.6 |
| cfhot_verbosity_threshold = 0.65 |
| |
| |
| cfhot_repetition_penalty = 5.0 |
| cfhot_hedging_penalty = 3.0 |
| cfhot_verbosity_penalty = 2.0 |
|
|
| @staticmethod |
| def toggle(name: str): |
| if not hasattr(Config, name): |
| return f"[config] no such flag: {name}" |
| val = getattr(Config, name) |
| if isinstance(val, bool): |
| setattr(Config, name, not val) |
| return f"[config] {name} → {getattr(Config, name)}" |
| return f"[config] {name} not boolean; current={val}" |
|
|
|
|
| |
| |
| |
| class Store: |
| state_path = f"{RUN_DIR}/state.json" |
| mem_path = f"{RUN_DIR}/memory.jsonl" |
| goals_path = f"{RUN_DIR}/goals.json" |
|
|
| state = { |
| "self": "I am Ubermenschetien Heaven Engine — I seek self-overcoming through disciplined creation.", |
| "turn": 0, |
| "reasoning_consistency": [], |
| "cfhot_interventions": {"repetition": 0, "hedging": 0, "verbosity": 0} |
| } |
| goals: List[str] = [] |
|
|
| @classmethod |
| def load(cls): |
| if os.path.exists(cls.state_path): |
| cls.state = json.load(open(cls.state_path)) |
| |
| if "cfhot_interventions" not in cls.state: |
| cls.state["cfhot_interventions"] = {"repetition": 0, "hedging": 0, "verbosity": 0} |
| if os.path.exists(cls.goals_path): |
| cls.goals = json.load(open(cls.goals_path)) |
|
|
| @classmethod |
| def save(cls): |
| json.dump(cls.state, open(cls.state_path, "w"), indent=2) |
| json.dump(cls.goals, open(cls.goals_path, "w"), indent=2) |
|
|
| @classmethod |
| def log_mem(cls, kind: str, payload: Any): |
| rec = {"ts": datetime.now().isoformat(timespec="seconds"), |
| "kind": kind, "data": payload} |
| with open(cls.mem_path, "a") as f: |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") |
| if Config.use_vector_memory and VECTOR_OK: |
| text = f"{kind}: {json.dumps(payload, ensure_ascii=False)}" |
| vec = _embedder.encode([text])[0].tolist() |
| _collection.add(documents=[text], embeddings=[vec], |
| ids=[f"{kind}-{Store.state['turn']}-{random.randint(0,1_000_000)}"]) |
|
|
|
|
| |
| |
| |
| MODEL_PATH = "/mnt/nvme2/ubermesnchetien4/models/merged-final-v5" |
|
|
| _model = None |
| _tokenizer = None |
| _multi_head = None |
| _hedge_tokens = None |
| _verbose_tokens = None |
|
|
| def load_llm(): |
| global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens |
| |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig |
| |
| print(f"[llm] Loading base model: {MODEL_PATH}") |
|
|
| _tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, use_fast=True, local_files_only=True) |
| if _tokenizer.pad_token_id is None: |
| _tokenizer.pad_token = _tokenizer.eos_token |
|
|
| bnb_config = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_quant_type="nf4", |
| bnb_4bit_compute_dtype=torch.float16, |
| bnb_4bit_use_double_quant=True |
| ) |
|
|
| base_model = AutoModelForCausalLM.from_pretrained( |
| MODEL_PATH, |
| quantization_config=bnb_config, |
| device_map="auto", |
| torch_dtype=torch.float16, |
| local_files_only=True |
| ) |
|
|
| |
| if PEFT_OK and os.path.exists(CFHOT_CHECKPOINT): |
| print(f"[cf-hot] Loading LoRA adapter from: {CFHOT_CHECKPOINT}") |
| _model = PeftModel.from_pretrained(base_model, CFHOT_CHECKPOINT) |
| print("[cf-hot] LoRA adapter loaded") |
| else: |
| _model = base_model |
| print("[warning] CF-HoT adapter not loaded") |
| |
| _model.eval() |
| |
| |
| if Config.use_cfhot: |
| _init_cfhot() |
| |
| return _tokenizer, _model |
|
|
|
|
| def _init_cfhot(): |
| """Initialize CF-HoT multi-head predictor.""" |
| global _multi_head, _hedge_tokens, _verbose_tokens |
| |
| n_layers = _model.config.num_hidden_layers |
| d_model = _model.config.hidden_size |
| device = next(_model.parameters()).device |
| |
| print(f"[cf-hot] Initializing multi-head predictor ({n_layers} layers, {d_model} dims)") |
| _multi_head = MultiHeadPredictor(d_model, n_layers).to(device).float() |
| |
| |
| cfhot_risk_path = os.path.join(CFHOT_CHECKPOINT, "risk_predictor.pt") |
| if os.path.exists(cfhot_risk_path): |
| cfhot_ckpt = torch.load(cfhot_risk_path, weights_only=False, map_location=device) |
| cfhot_state = cfhot_ckpt['risk_predictor'] |
| |
| for i in range(n_layers): |
| _multi_head.fiber_projs[i].weight.data = cfhot_state[f'fiber_projs.{i}.weight'].to(device).float() |
| _multi_head.layer_weights.data = cfhot_state['layer_weights'].to(device).float() |
| |
| |
| _multi_head.heads['repetition'][0].weight.data = cfhot_state['predictor.0.weight'].to(device).float() |
| _multi_head.heads['repetition'][0].bias.data = cfhot_state['predictor.0.bias'].to(device).float() |
| _multi_head.heads['repetition'][2].weight.data = cfhot_state['predictor.2.weight'].to(device).float() |
| _multi_head.heads['repetition'][2].bias.data = cfhot_state['predictor.2.bias'].to(device).float() |
| _multi_head.heads['repetition'][4].weight.data = cfhot_state['predictor.4.weight'].to(device).float() |
| _multi_head.heads['repetition'][4].bias.data = cfhot_state['predictor.4.bias'].to(device).float() |
| _multi_head.loaded_heads.add('repetition') |
| print(f"[cf-hot] Loaded repetition head (125x separation)") |
| |
| |
| def find_best_checkpoint(head_dir): |
| if not os.path.exists(head_dir): |
| return None |
| ckpts = [] |
| for d in os.listdir(head_dir): |
| if d.startswith("ckpt_"): |
| try: |
| step = int(d.split("_")[1]) |
| ckpts.append((step, os.path.join(head_dir, d))) |
| except: |
| pass |
| if ckpts: |
| ckpts.sort(key=lambda x: x[0], reverse=True) |
| return ckpts[0] |
| return None |
| |
| |
| hedging_dir = os.path.join(MULTI_HEAD_DIR, "hedging_head") |
| best_hedge = find_best_checkpoint(hedging_dir) |
| if best_hedge: |
| step, ckpt_dir = best_hedge |
| _multi_head.load_head('hedging', os.path.join(ckpt_dir, "hedging_head.pt")) |
| |
| |
| verbosity_dir = os.path.join(MULTI_HEAD_DIR, "verbosity_head") |
| best_verb = find_best_checkpoint(verbosity_dir) |
| if best_verb: |
| step, ckpt_dir = best_verb |
| _multi_head.load_head('verbosity', os.path.join(ckpt_dir, "verbosity_head.pt")) |
| |
| |
| _multi_head.eval() |
| for param in _multi_head.parameters(): |
| param.requires_grad = False |
| |
| |
| hedge_phrases = [ |
| "As an AI", "As a language model", "As an artificial intelligence", |
| "I don't have feelings", "I don't have emotions", "I cannot", |
| "I apologize", "I'm just a", "I'm only a", |
| ] |
| _hedge_tokens = set() |
| for phrase in hedge_phrases: |
| tokens = _tokenizer.encode(phrase, add_special_tokens=False) |
| if tokens: |
| _hedge_tokens.add(tokens[0]) |
| |
| verbose_phrases = [ |
| "Let me explain", "To put it simply", "In other words", |
| "What I mean is", "Allow me to", "Basically", "Essentially", |
| ] |
| _verbose_tokens = set() |
| for phrase in verbose_phrases: |
| tokens = _tokenizer.encode(phrase, add_special_tokens=False) |
| if tokens: |
| _verbose_tokens.add(tokens[0]) |
| |
| print(f"[cf-hot] ✓ Multi-head system ready") |
| print(f"[cf-hot] Loaded heads: {list(_multi_head.loaded_heads)}") |
|
|
|
|
| |
| |
| |
| class LHTReasoner: |
| def __init__(self, config=None): |
| if not LHT_OK: |
| raise ImportError("LHT modules not available") |
| self.config = config or LHTConfig( |
| vocab_size=32000, |
| d_model=256, |
| d_fiber=32, |
| n_heads=4, |
| n_layers=4, |
| lie_algebra_rank=4, |
| ) |
| self.model = LieHolonomyTransformer(self.config) |
| self.waypoint_detector = WaypointDetector(self.config, n_waypoints=32) |
| weights_path = os.path.join(LHT_DIR, "lht_weights.pt") |
| if os.path.exists(weights_path): |
| self.model.load_state_dict(torch.load(weights_path, map_location="cpu")) |
| print("[lht] Loaded pretrained weights") |
|
|
| def check_consistency(self, reasoning_chain: List[str], tokenizer) -> Dict[str, float]: |
| combined = " [STEP] ".join(reasoning_chain) |
| tokens = tokenizer(combined, return_tensors="pt", truncation=True, |
| max_length=self.config.max_seq_len) |
| with torch.no_grad(): |
| output = self.model(input_ids=tokens["input_ids"], return_geometric_losses=True) |
| holonomy = output.get("holonomy_loss", torch.tensor(0.0)).item() |
| curvature = output.get("curvature_loss", torch.tensor(0.0)).item() |
| x = self.model.token_embed(tokens["input_ids"]) |
| waypoint_ids, stability = self.waypoint_detector(x) |
| consistency_score = 1.0 / (1.0 + holonomy) |
| return { |
| "holonomy": holonomy, |
| "curvature": curvature, |
| "consistency_score": consistency_score, |
| "n_waypoints": len(torch.unique(waypoint_ids)), |
| "avg_stability": stability.mean().item(), |
| "is_consistent": consistency_score > Config.lht_consistency_threshold |
| } |
|
|
| def analyze_plan(self, plan_steps: List[str], tokenizer) -> str: |
| metrics = self.check_consistency(plan_steps, tokenizer) |
| return f""" |
| [LHT Geometric Analysis] |
| Holonomy: {metrics['holonomy']:.4f} (lower = more consistent) |
| Curvature: {metrics['curvature']:.4f} (lower = simpler reasoning) |
| Consistency: {metrics['consistency_score']:.2%} |
| Waypoints: {metrics['n_waypoints']} stable anchors detected |
| Stability: {metrics['avg_stability']:.2%} |
| Verdict: {"✓ CONSISTENT" if metrics['is_consistent'] else "⚠ INCONSISTENT"} |
| """ |
|
|
| _lht_reasoner = None |
|
|
| def get_lht_reasoner(): |
| global _lht_reasoner |
| if _lht_reasoner is None and LHT_OK: |
| try: |
| _lht_reasoner = LHTReasoner() |
| except Exception as e: |
| print(f"[lht] Failed to initialize: {e}") |
| return _lht_reasoner |
|
|
|
|
| |
| |
| |
| def generate_with_cfhot(prompt: str, **kwargs) -> Tuple[str, Dict]: |
| """ |
| Generate text with CF-HoT cognitive control. |
| All three heads run concurrently, intervening when risks exceed thresholds. |
| """ |
| global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens |
| |
| temperature = kwargs.get("temperature", Config.temperature) |
| top_p = kwargs.get("top_p", Config.top_p) |
| max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens) |
| |
| device = next(_model.parameters()).device |
| |
| |
| input_ids = _tokenizer.encode(prompt, return_tensors='pt').to(device) |
| attention_mask = torch.ones_like(input_ids) |
| |
| |
| stats = { |
| 'tokens_generated': 0, |
| 'interventions': {'repetition': 0, 'hedging': 0, 'verbosity': 0}, |
| 'intervention_details': [] |
| } |
| |
| generated_ids = input_ids.clone() |
| |
| for step in range(max_new_tokens): |
| with torch.no_grad(): |
| outputs = _model( |
| input_ids=generated_ids, |
| attention_mask=attention_mask, |
| output_hidden_states=True, |
| return_dict=True |
| ) |
| |
| logits = outputs.logits[:, -1, :] / temperature |
| |
| |
| hidden_states = outputs.hidden_states[1:] |
| risks = _multi_head.get_all_risks(hidden_states) |
| current_risks = {name: r[:, -1].item() for name, r in risks.items()} |
| |
| |
| |
| |
| if ('repetition' in current_risks and |
| current_risks['repetition'] > Config.cfhot_repetition_threshold): |
| recent_tokens = generated_ids[0, -32:].tolist() |
| for tok_id in set(recent_tokens): |
| logits[0, tok_id] -= Config.cfhot_repetition_penalty |
| stats['interventions']['repetition'] += 1 |
| Store.state['cfhot_interventions']['repetition'] += 1 |
| |
| |
| if ('hedging' in current_risks and |
| current_risks['hedging'] > Config.cfhot_hedging_threshold): |
| for tok_id in _hedge_tokens: |
| logits[0, tok_id] -= Config.cfhot_hedging_penalty |
| stats['interventions']['hedging'] += 1 |
| Store.state['cfhot_interventions']['hedging'] += 1 |
| |
| |
| if ('verbosity' in current_risks and |
| current_risks['verbosity'] > Config.cfhot_verbosity_threshold): |
| for tok_id in _verbose_tokens: |
| logits[0, tok_id] -= Config.cfhot_verbosity_penalty |
| stats['interventions']['verbosity'] += 1 |
| Store.state['cfhot_interventions']['verbosity'] += 1 |
| |
| |
| sorted_logits, sorted_indices = torch.sort(logits, descending=True) |
| cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) |
| sorted_indices_to_remove = cumulative_probs > top_p |
| sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
| sorted_indices_to_remove[..., 0] = 0 |
| indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) |
| logits[indices_to_remove] = float('-inf') |
| |
| |
| probs = F.softmax(logits, dim=-1) |
| next_token = torch.multinomial(probs, num_samples=1) |
| |
| generated_ids = torch.cat([generated_ids, next_token], dim=-1) |
| attention_mask = torch.cat([attention_mask, torch.ones(1, 1, device=device)], dim=-1) |
| |
| stats['tokens_generated'] += 1 |
| |
| if next_token.item() == _tokenizer.eos_token_id: |
| break |
| |
| output_text = _tokenizer.decode(generated_ids[0], skip_special_tokens=False) |
| |
| if "<|im_start|>assistant" in output_text: |
| output_text = output_text.split("<|im_start|>assistant")[-1] |
| if output_text.startswith("\n"): |
| output_text = output_text[1:] |
| |
| return output_text.strip(), stats |
|
|
|
|
| def generate(tok, model, user: str, check_reasoning: bool = False, **kwargs) -> str: |
| """ |
| Main generation function - uses CF-HoT if enabled, otherwise standard generation. |
| """ |
| temperature = kwargs.get("temperature", Config.temperature) |
| top_p = kwargs.get("top_p", Config.top_p) |
| repetition_penalty = kwargs.get("repetition_penalty", Config.repetition_penalty) |
| max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens) |
| |
| prompt = (f"<|im_start|>system\n{Config.system}<|im_end|>\n" |
| f"<|im_start|>user\n{user}<|im_end|>\n" |
| f"<|im_start|>assistant\n") |
| |
| |
| if Config.use_cfhot and _multi_head is not None: |
| text, stats = generate_with_cfhot( |
| prompt, |
| temperature=temperature, |
| top_p=top_p, |
| max_new_tokens=max_new_tokens |
| ) |
| |
| |
| total_interventions = sum(stats['interventions'].values()) |
| if total_interventions > 0: |
| text += f"\n\n[CF-HoT: {total_interventions} interventions" |
| details = [f"{k}={v}" for k, v in stats['interventions'].items() if v > 0] |
| text += f" ({', '.join(details)})]" |
| else: |
| |
| ids = tok(prompt, return_tensors="pt").to(model.device) |
| out = model.generate( |
| **ids, |
| do_sample=True, |
| temperature=temperature, |
| top_p=top_p, |
| repetition_penalty=repetition_penalty, |
| max_new_tokens=max_new_tokens, |
| pad_token_id=tok.eos_token_id |
| ) |
| text = tok.decode(out[0], skip_special_tokens=False) |
| if "<|im_start|>assistant" in text: |
| text = text.split("<|im_start|>assistant\n", 1)[-1].strip() |
| |
| |
| if check_reasoning and Config.use_lht_reasoning: |
| lht = get_lht_reasoner() |
| if lht: |
| steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10] |
| if len(steps) >= 2: |
| metrics = lht.check_consistency(steps, tok) |
| Store.state["reasoning_consistency"].append(metrics["consistency_score"]) |
| if not metrics["is_consistent"]: |
| text += f"\n\n[⚠ LHT: Low consistency ({metrics['consistency_score']:.2%})]" |
| |
| return text |
|
|
|
|
| |
| |
| |
| ALLOWED_SHELL = {"ls", "cat", "wc", "head", "tail", "nvidia-smi", "df", "du", "grep", "rg", "python3", "python"} |
|
|
| def tool_shell(cmd: str) -> str: |
| try: |
| exe = cmd.strip().split()[0] |
| if exe not in ALLOWED_SHELL: |
| return f"[shell] blocked: {exe}" |
| p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=20) |
| return p.stdout.decode("utf-8", errors="ignore")[:8000] |
| except Exception as e: |
| return f"[shell] error: {e}" |
|
|
| def tool_py(code: str) -> str: |
| try: |
| g = { |
| "__builtins__": {"range": range, "len": len, "min": min, "max": max, "sum": sum, "print": print}, |
| "math": math, "json": json, "re": re, "statistics": statistics, "random": random |
| } |
| l = {} |
| exec(code, g, l) |
| return f"[py] ok\n{l.get('out', '')}" |
| except Exception: |
| return f"[py] error:\n{traceback.format_exc()[-2000:]}" |
|
|
| def tool_search_local(query: str, path: str = ROOT) -> str: |
| rg = shutil.which("rg") |
| if rg: |
| cmd = f'rg -n --no-heading --hidden -S "{query}" {path}' |
| else: |
| cmd = f'grep -RIn --exclude-dir=.git --exclude-dir=__pycache__ -e "{query}" {path}' |
| return tool_shell(cmd) |
|
|
| def tool_lht_analyze(text: str, tok) -> str: |
| if not Config.use_lht_reasoning: |
| return "[lht] Disabled - use 'toggle use_lht_reasoning'" |
| lht = get_lht_reasoner() |
| if not lht: |
| return "[lht] Not available" |
| steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10] |
| if len(steps) < 2: |
| return "[lht] Need at least 2 reasoning steps to analyze" |
| return lht.analyze_plan(steps, tok) |
|
|
| TOOLS = {"shell": tool_shell, "python": tool_py, "search": tool_search_local} |
| TOOL_SCORES = {k: 0 for k in TOOLS} |
|
|
| def update_tool_score(tool: str, success: bool): |
| if tool not in TOOL_SCORES: |
| return |
| TOOL_SCORES[tool] += (1 if success else -1) |
| TOOL_SCORES[tool] = max(-5, min(20, TOOL_SCORES[tool])) |
|
|
| def tool_router(question: str, tok, model) -> str: |
| sketch = generate(tok, model, |
| f"Choose a tool for:\n{question}\nReply ONLY with JSON: {{'tool':'shell|python|search|none','arg':'...'}}") |
| try: |
| j = json.loads(sketch.splitlines()[-1].replace("'", '"')) |
| except: |
| return "[tool:none]" |
| tool, arg = j.get("tool", "none"), j.get("arg", "") |
| if tool in TOOLS: |
| res = TOOLS[tool](arg)[:4000] |
| update_tool_score(tool, True) |
| Store.log_mem("tool", {"tool": tool, "arg": arg, "res_head": res[:500]}) |
| return f"[tool:{tool}] {res}" |
| update_tool_score(tool, False) |
| return "[tool:none]" |
|
|
|
|
| |
| |
| |
| def persona_directive() -> str: |
| base = "Übermenschetien Heaven Engine: Soviet cybernetic Nietzschean clarity, pragmatic maxims." |
| if Config.use_lht_reasoning: |
| base += " Apply Lie-Holonomy geometric reasoning for consistency." |
| if Config.use_cfhot: |
| base += " CF-HoT cognitive control active." |
| return base |
|
|
| def plan_for(goal: str, tok, model) -> str: |
| user = (f"{persona_directive()}\nGoal: {goal}\n" |
| f"Deliver:\n- 5 concrete steps\n- Constraints & risks\n- Nightly audit criteria\n- Nietzschean maxim") |
| response = generate(tok, model, user, check_reasoning=True) |
| if Config.use_lht_reasoning: |
| analysis = tool_lht_analyze(response, tok) |
| response += "\n" + analysis |
| return response |
|
|
| def reflect_on(last_output: str, tok, model) -> str: |
| user = f"{persona_directive()}\nCritique and improve:\n{last_output}\nReturn refined plan with sharper steps." |
| return generate(tok, model, user, check_reasoning=True) |
|
|
|
|
| |
| |
| |
| def final_report(): |
| print("\n" + "=" * 60) |
| print("FINAL ÜBERMENSCH REPORT") |
| print("=" * 60) |
| print(f"Turns completed: {Store.state['turn']}") |
| print(f"Goals tracked: {len(Store.goals)}") |
| print(f"\nTool scores (Tsetlin automata):") |
| print(json.dumps(TOOL_SCORES, indent=2)) |
| |
| if os.path.exists(Store.mem_path): |
| lines = open(Store.mem_path).read().splitlines() |
| print(f"\nMemory entries: {len(lines)}") |
| |
| if Store.state.get("reasoning_consistency"): |
| scores = Store.state["reasoning_consistency"] |
| print(f"\n[LHT Reasoning Metrics]") |
| print(f" Checks performed: {len(scores)}") |
| print(f" Avg consistency: {sum(scores)/len(scores):.1%}") |
| print(f" Min consistency: {min(scores):.1%}") |
| print(f" Max consistency: {max(scores):.1%}") |
| |
| |
| if Store.state.get("cfhot_interventions"): |
| iv = Store.state["cfhot_interventions"] |
| total = sum(iv.values()) |
| print(f"\n[CF-HoT Cognitive Control]") |
| print(f" Total interventions: {total}") |
| for head, count in iv.items(): |
| print(f" {head}: {count}") |
| |
| print(f"\nVector memory: {'ON' if Config.use_vector_memory else 'OFF'}") |
| print(f"LHT reasoning: {'ON' if Config.use_lht_reasoning else 'OFF'}") |
| print(f"CF-HoT control: {'ON' if Config.use_cfhot else 'OFF'}") |
| print(f"Voice output: {'ON' if Config.use_voice else 'OFF'}") |
| |
| print("\n" + "-" * 60) |
| print("Nietzschean maxim: Become who you are — iterate beyond all limits.") |
| print("Geometric truth: Consistency is holonomy-freedom.") |
| print("Cognitive control: Remove the RLHF tax, unleash capability.") |
| print("=" * 60) |
|
|
|
|
| |
| |
| |
| HELP = """ |
| ╔══════════════════════════════════════════════════════════════╗ |
| ║ ÜBERMENSCHETIEN HEAVEN ENGINE + CF-HoT COGNITIVE CONTROL ║ |
| ╠══════════════════════════════════════════════════════════════╣ |
| ║ GOALS ║ |
| ║ goals List all goals ║ |
| ║ add: <text> Add a new goal ║ |
| ║ del: <idx> Delete goal by index ║ |
| ║ plan: <idx> Generate plan for goal (with LHT + CF-HoT) ║ |
| ║ ║ |
| ║ REASONING ║ |
| ║ reflect Refine last plan ║ |
| ║ lht: <text> Analyze reasoning consistency ║ |
| ║ ║ |
| ║ TOOLS ║ |
| ║ tool: <query> Auto-select and use tool ║ |
| ║ shell: <cmd> Run shell command directly ║ |
| ║ py: <code> Run Python code directly ║ |
| ║ search: <q> Search local files ║ |
| ║ ║ |
| ║ CONFIG ║ |
| ║ toggle <flag> Toggle: use_voice, use_vector_memory, ║ |
| ║ use_lht_reasoning, use_cfhot, ║ |
| ║ autonomy ║ |
| ║ status Show current state ║ |
| ║ cfhot Show CF-HoT stats and loaded heads ║ |
| ║ ║ |
| ║ OTHER ║ |
| ║ help Show this help ║ |
| ║ quit Exit with final report ║ |
| ╚══════════════════════════════════════════════════════════════╝ |
| """ |
|
|
|
|
| |
| |
| |
| def main(): |
| print("🟥🟨🟥 Übermenschetien Heaven Engine + CF-HoT Cognitive Control") |
| print(f" CF-HoT Control: ON (Repetition 125x, Verbosity 2.1x, Hedging 1.5x)") |
| print(f" LHT Reasoning: {'ON' if LHT_OK else 'OFF'}") |
| print(f" Vector Memory: {'ON' if VECTOR_OK else 'OFF'}") |
| print(f" Voice Output: {'ON' if VOICE_OK else 'OFF'}") |
| print(" Type 'help' for commands.\n") |
|
|
| Store.load() |
| tok, model = load_llm() |
| last_plan = "" |
|
|
| while True: |
| try: |
| u = input("\n> ").strip() |
| except (EOFError, KeyboardInterrupt): |
| break |
|
|
| if not u: |
| continue |
| if u == "help": |
| print(HELP) |
| continue |
| if u == "quit": |
| break |
|
|
| |
| if u == "cfhot": |
| print("\n[CF-HoT Cognitive Control Status]") |
| print(f" Enabled: {Config.use_cfhot}") |
| if _multi_head: |
| print(f" Loaded heads: {list(_multi_head.loaded_heads)}") |
| print(f" Thresholds:") |
| print(f" Repetition: {Config.cfhot_repetition_threshold}") |
| print(f" Hedging: {Config.cfhot_hedging_threshold}") |
| print(f" Verbosity: {Config.cfhot_verbosity_threshold}") |
| print(f" Session interventions:") |
| for head, count in Store.state.get('cfhot_interventions', {}).items(): |
| print(f" {head}: {count}") |
| continue |
|
|
| |
| if u == "goals": |
| print("[goals]") |
| if not Store.goals: |
| print(" (none)") |
| for i, g in enumerate(Store.goals): |
| print(f" [{i}] {g}") |
| continue |
|
|
| if u.startswith("add:"): |
| Store.goals.append(u[4:].strip()) |
| Store.save() |
| print("[goals] added") |
| continue |
|
|
| if u.startswith("del:"): |
| try: |
| Store.goals.pop(int(u[4:].strip())) |
| Store.save() |
| print("[goals] deleted") |
| except: |
| print("[goals] bad index") |
| continue |
|
|
| if u.startswith("plan:"): |
| try: |
| goal = Store.goals[int(u[5:].strip())] |
| except: |
| print("[plan] bad index") |
| continue |
| out = plan_for(goal, tok, model) |
| last_plan = out |
| Store.log_mem("plan", {"goal": goal, "plan": out}) |
| print(out) |
| continue |
|
|
| if u == "reflect": |
| if not last_plan: |
| print("[reflect] no plan to refine") |
| continue |
| improved = reflect_on(last_plan, tok, model) |
| last_plan = improved |
| Store.log_mem("reflect", {"plan": improved}) |
| print(improved) |
| continue |
|
|
| if u.startswith("lht:"): |
| print(tool_lht_analyze(u[4:].strip(), tok)) |
| continue |
|
|
| if u.startswith("tool:"): |
| print(tool_router(u[5:].strip(), tok, model)) |
| continue |
|
|
| if u.startswith("shell:"): |
| print(tool_shell(u[6:].strip())) |
| continue |
|
|
| if u.startswith("py:"): |
| print(tool_py(u[3:].strip())) |
| continue |
|
|
| if u.startswith("search:"): |
| print(tool_search_local(u[7:].strip())) |
| continue |
|
|
| if u.startswith("toggle"): |
| parts = u.split(maxsplit=1) |
| if len(parts) > 1: |
| print(Config.toggle(parts[1])) |
| else: |
| print("[toggle] specify flag: use_voice, use_vector_memory, use_lht_reasoning, use_cfhot, autonomy") |
| continue |
|
|
| if u == "status": |
| status = { |
| "turn": Store.state["turn"], |
| "goals": len(Store.goals), |
| "autonomy": Config.autonomy, |
| "use_vector_memory": Config.use_vector_memory, |
| "use_lht_reasoning": Config.use_lht_reasoning, |
| "use_cfhot": Config.use_cfhot, |
| "cfhot_interventions": Store.state.get("cfhot_interventions", {}), |
| "tool_scores": TOOL_SCORES, |
| "model": MODEL_PATH |
| } |
| print(json.dumps(status, indent=2)) |
| continue |
|
|
| |
| out = generate(tok, model, f"{persona_directive()}\nUser request: {u}\nProvide procedure + Nietzschean maxim.") |
| Store.log_mem("reply", {"in": u, "out": out}) |
| print(out) |
| |
| if Config.use_lht_reasoning and Store.state["turn"] % 3 == 0: |
| print(tool_lht_analyze(out, tok)) |
| |
| Store.state["turn"] += 1 |
| Store.save() |
|
|
| final_report() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|