vsl-cryosomatic-hypervisor / bone_lexicon.py
aedmark's picture
Upload 50 files
f7fce63 verified
raw
history blame
18.7 kB
import json
import random
import re
import string
import time
import unicodedata
import os
from typing import Tuple, Dict, Set, Optional, List
from bone_core import Prisma, LoreManifest
from functools import lru_cache
class LexiconStore:
HIVE_FILENAME = "cortex_hive.json"
_PUNCTUATION = string.punctuation.replace("_", "")
_TRANSLATOR = str.maketrans(_PUNCTUATION, " " * len(_PUNCTUATION))
def __init__(self):
self.categories = {
"heavy",
"kinetic",
"explosive",
"constructive",
"abstract",
"photo",
"aerobic",
"thermal",
"cryo",
"suburban",
"play",
"sacred",
"buffer",
"antigen",
"diversion",
"meat",
"gradient_stop",
"liminal",
"void",
"bureau_buzzwords",
"crisis_term",
"harvest",
"pareidolia",
"passive_watch",
"repair_trigger",
"refusal_guru",
"cursed",
"sentiment_pos",
"sentiment_neg",
"sentiment_negators",
}
self.VOCAB: Dict[str, Set[str]] = {k: set() for k in self.categories}
self.LEARNED_VOCAB: Dict[str, Dict[str, int]] = {}
self.USER_FLAGGED_BIAS = set()
self.ANTIGEN_REPLACEMENTS = {}
self.SOLVENTS = set()
self.REVERSE_INDEX: Dict[str, Set[str]] = {}
self.hive_loaded = False
def load_vocabulary(self):
data = LoreManifest.get_instance().get("LEXICON") or {}
self.SOLVENTS = set(data.get("solvents", []))
self.ANTIGEN_REPLACEMENTS = data.get("antigen_replacements", {})
for cat, words in data.items():
if cat in self.categories:
word_set = set(words)
self.VOCAB[cat] = word_set
if not cat.startswith("sentiment"):
for w in word_set:
self._index_word(w, cat)
self._load_hive()
def _index_word(self, word: str, category: str):
w = word.lower()
if w not in self.REVERSE_INDEX:
self.REVERSE_INDEX[w] = set()
self.REVERSE_INDEX[w].add(category)
def _load_hive(self):
if not os.path.exists(self.HIVE_FILENAME):
return
try:
with open(self.HIVE_FILENAME, "r", encoding="utf-8") as f:
hive_data = json.load(f)
count = 0
for cat, entries in hive_data.items():
if cat not in self.LEARNED_VOCAB:
self.LEARNED_VOCAB[cat] = {}
for word, tick in entries.items():
self.LEARNED_VOCAB[cat][word] = tick
self._index_word(word, cat)
count += 1
self.hive_loaded = True
print(
f"{Prisma.CYN}[HIVE]: The Library is open. {count} memories restored.{Prisma.RST}"
)
except (IOError, json.JSONDecodeError) as e:
print(
f"{Prisma.RED}[HIVE]: Memory corruption detected. Starting fresh. ({e}){Prisma.RST}"
)
def save_hive(self):
try:
with open(self.HIVE_FILENAME, "w", encoding="utf-8") as f:
json.dump(self.LEARNED_VOCAB, f, indent=2)
except IOError:
pass
def get_raw(self, category):
base = self.VOCAB.get(category, set())
learned = set(self.LEARNED_VOCAB.get(category, {}).keys())
combined = base | learned
if category == "suburban":
return combined - self.USER_FLAGGED_BIAS
return combined
def get_categories_for_word(self, word: str) -> Set[str]:
w = word.lower()
return self.REVERSE_INDEX.get(w, set()).copy()
def teach(self, word, category, tick):
w = word.lower()
if category not in self.LEARNED_VOCAB:
self.LEARNED_VOCAB[category] = {}
if w in self.LEARNED_VOCAB[category]:
return False
self.LEARNED_VOCAB[category][w] = tick
self._index_word(w, category)
return True
def harvest(self, text: str) -> Dict[str, List[str]]:
results = {}
if not text:
return results
clean_text = text.translate(self._TRANSLATOR).lower()
words = clean_text.split()
for w in words:
cats = self.get_categories_for_word(w)
for cat in cats:
if cat not in results:
results[cat] = []
results[cat].append(w)
return results
class LinguisticAnalyzer:
def __init__(self, store_ref):
self.store = store_ref
self._TRANSLATOR = getattr(self.store, "_TRANSLATOR", None)
self.PHONETICS = {
"PLOSIVE": set("bdgkpt"),
"FRICATIVE": set("fthszsh"),
"LIQUID": set("lr"),
"NASAL": set("mn"),
"VOWELS": set("aeiouy"),
}
self.ROOTS = {
"HEAVY": (
"lith",
"ferr",
"petr",
"dens",
"grav",
"struct",
"base",
"fund",
"mound",
),
"KINETIC": ("mot", "mov", "ject", "tract", "pel", "crat", "dynam", "flux"),
"ABSTRACT": (
"tion",
"ism",
"ence",
"ance",
"ity",
"ology",
"ness",
"ment",
"idea",
),
"SUBURBAN": ("norm", "comm", "stand", "pol", "reg", "mod"),
"VITAL": (
"viv",
"vita",
"spir",
"anim",
"bio",
"luc",
"lum",
"phot",
"phon",
"surg",
"bloom",
),
}
self.thresholds = {
"heavy_density": 0.55,
"play_vitality": 0.6,
"kinetic_flow": 0.6,
}
self.biases = {"heavy": 1.0, "play": 1.0, "kinetic": 1.0}
def measure_viscosity(self, word: str) -> float:
if not word:
return 0.0
w = word.lower()
if w in self.store.SOLVENTS:
return 0.1
length_score = min(1.0, len(w) / 12.0)
stops, flow = 0, 0
for c in w:
if c in self.PHONETICS["PLOSIVE"]:
stops += 1
elif c in self.PHONETICS["LIQUID"] or c in self.PHONETICS["VOWELS"]:
flow += 1
stop_score = min(1.0, stops / 3.0)
flow_score = min(1.0, flow / 4.0)
substance_score = max(stop_score, flow_score)
return (length_score * 0.5) + (substance_score * 0.5)
@staticmethod
def get_turbulence(words: List[str]) -> float:
if len(words) < 2:
return 0.0
lengths = [len(w) for w in words]
avg_len = sum(lengths) / len(lengths)
variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths)
turbulence = min(1.0, variance / 10.0)
return round(turbulence, 2)
def vectorize(self, text: str) -> Dict[str, float]:
words = self.sanitize(text)
if not words:
return {}
DIMENSION_MAP = {
"kinetic": "VEL",
"explosive": "CHI",
"heavy": "STR",
"constructive": "STR",
"antigen": "CHI",
"toxin": "CHI",
"thermal": "PHI",
"photo": "PHI",
"abstract": "PSI",
"sacred": "PSI",
"suburban": "BET",
"buffer": "BET",
"play": "DEL",
"aerobic": "DEL",
"harvest": "STR",
"meat": "CHI",
"void": "PSI",
"liminal": "LAMBDA",
"pareidolia": "PSI",
"crisis_term": "CHI",
"cursed": "CHI",
}
dims = {
"VEL": 0.0,
"STR": 0.0,
"CHI": 0.0,
"PHI": 0.0,
"PSI": 0.0,
"BET": 0.0,
"DEL": 0.0,
"LAMBDA": 0.0,
"ENT": 0.0,
}
for w in words:
cats = self.store.get_categories_for_word(w)
for cat in cats:
if cat in DIMENSION_MAP:
target_dim = DIMENSION_MAP[cat]
dims[target_dim] += 1.0
total = max(1.0, sum(dims.values()))
result = {k: round(v / total, 3) for k, v in dims.items()}
result["ENT"] = result[
"CHI"
]
return result
@staticmethod
def calculate_flux(vec_a: Dict[str, float], vec_b: Dict[str, float]) -> float:
if not vec_a or not vec_b:
return 0.0
keys = set(vec_a.keys()) | set(vec_b.keys())
diff_sq = sum((vec_a.get(k, 0.0) - vec_b.get(k, 0.0)) ** 2 for k in keys)
return round(diff_sq**0.5, 3)
def contextualize(self, word: str, field_vector: Dict[str, float]) -> str:
base_cat, _score = self.classify_word(word)
if not field_vector or not base_cat:
return base_cat
dominant_field = max(field_vector, key=field_vector.get)
if field_vector.get(dominant_field, 0.0) > 0.8:
if dominant_field == "PSI" and base_cat == "heavy":
return "abstract"
return base_cat
def sanitize(self, text: str) -> List[str]:
if not text:
return []
try:
normalized = (
unicodedata.normalize("NFKD", text)
.encode("ASCII", "ignore")
.decode("utf-8")
)
except (TypeError, AttributeError):
normalized = text
xlate = self._TRANSLATOR if self._TRANSLATOR else str.maketrans("", "")
cleaned_text = normalized.translate(xlate).lower()
words = cleaned_text.split()
bias_set = getattr(self.store, "USER_FLAGGED_BIAS", set())
return [w for w in words if w.strip() and w not in bias_set]
def classify_word(self, word: str) -> Tuple[Optional[str], float]:
w = word.lower()
if len(w) < 3:
return None, 0.0
for category, roots in self.ROOTS.items():
for root in roots:
if root in w:
return category.lower(), 0.8
counts = {k: 0 for k in self.PHONETICS}
char_to_sound = {char: sound_type for sound_type, chars in self.PHONETICS.items() for char in chars}
for char in w:
if sound_type := char_to_sound.get(char):
counts[sound_type] += 1
density_score = (counts["PLOSIVE"] * 1.5) + (counts["NASAL"] * 0.8)
flow_score = counts["LIQUID"] + counts["FRICATIVE"]
vitality_score = (counts["VOWELS"] * 1.2) + (flow_score * 0.8)
length_mod = 1.0 if len(w) > 5 else 1.5
final_density = (density_score / len(w)) * length_mod
final_vitality = (vitality_score / len(w)) * length_mod
heavy_thresh = self.thresholds["heavy_density"] * self.biases["heavy"]
play_thresh = self.thresholds["play_vitality"] * self.biases["play"]
kinetic_thresh = self.thresholds["kinetic_flow"] * self.biases["kinetic"]
if final_density > heavy_thresh:
return "heavy", round(final_density, 2)
if final_vitality > play_thresh:
return "play", round(final_vitality, 2)
if (flow_score / len(w)) > kinetic_thresh:
return "kinetic", 0.5
return None, 0.0
def measure_valence(self, words: List[str]) -> float:
if not words:
return 0.0
pos_set = self.store.get_raw("sentiment_pos")
neg_set = self.store.get_raw("sentiment_neg")
negators = self.store.get_raw("sentiment_negators")
score = 0.0
for i, word in enumerate(words):
is_negated = False
if i > 0 and words[i - 1] in negators:
is_negated = True
val = 0.0
if word in pos_set:
val = 1.0
elif word in neg_set:
val = -1.0
if is_negated:
val *= -0.5
score += val
normalized = score / max(1.0, len(words) * 0.5)
return max(-1.0, min(1.0, normalized))
def tune_sensitivity(self, voltage: float, drag: float):
if voltage > 15.0:
self.biases["kinetic"] = 0.8
elif voltage < 5.0:
self.biases["kinetic"] = 1.2
else:
self.biases["kinetic"] = 1.0
if drag > 5.0:
self.biases["heavy"] = 0.8
else:
self.biases["heavy"] = 1.0
class SemanticField:
def __init__(self, analyzer_ref):
self.analyzer = analyzer_ref
self.current_vector = {}
self.momentum = 0.0
self.history = []
def update(self, text: str) -> Dict[str, float]:
new_vector = self.analyzer.vectorize(text)
if not new_vector:
return self.current_vector
flux = self.analyzer.calculate_flux(self.current_vector, new_vector)
self.momentum = (self.momentum * 0.7) + (flux * 0.3)
blended = {k: round(v * 0.6, 3) for k, v in self.current_vector.items()}
for k, v in new_vector.items():
blended[k] = round(blended.get(k, 0.0) + (v * 0.4), 3)
self.current_vector = blended
self.history.append((time.time(), flux))
if len(self.history) > 10:
self.history.pop(0)
return self.current_vector
def get_atmosphere(self) -> str:
if not self.current_vector:
return "VOID"
dom = max(self.current_vector, key=self.current_vector.get)
if self.momentum > 0.5:
return f"Volatile {dom.upper()} Storm"
return f"Stable {dom.upper()} Atmosphere"
class LexiconService:
_INITIALIZED = False
_STORE = None
_ANALYZER = None
ANTIGEN_REGEX = None
SOLVENTS = set()
@classmethod
def get_store(cls):
if not cls._INITIALIZED:
cls.initialize()
return cls._STORE
@classmethod
def initialize(cls):
if cls._INITIALIZED:
return
cls._INITIALIZED = True
try:
cls._STORE = LexiconStore()
cls._STORE.load_vocabulary()
cls._ANALYZER = LinguisticAnalyzer(cls._STORE)
cls.compile_antigens()
cls.SOLVENTS = cls._STORE.SOLVENTS
total_words = sum(len(s) for s in cls._STORE.VOCAB.values())
print(
f"{Prisma.GRN}[LEXICON]: Systems Nominal. {total_words} words loaded.{Prisma.RST}"
)
except Exception as e:
cls._INITIALIZED = False
print(f"{Prisma.RED}[LEXICON]: Initialization Failed: {e}{Prisma.RST}")
raise e
@classmethod
def get_valence(cls, words: List[str]) -> float:
return cls._ANALYZER.measure_valence(words)
@classmethod
def get_categories_for_word(cls, word: str) -> Set[str]:
if not cls._INITIALIZED:
cls.initialize() # [SCHUR] Safety net
return cls._STORE.get_categories_for_word(word)
@classmethod
def get_current_category(cls, word: str) -> Optional[str]:
if not cls._INITIALIZED:
cls.initialize()
categories = cls._STORE.get_categories_for_word(word)
if categories:
return next(iter(categories))
return None
@classmethod
def measure_viscosity(cls, word: str) -> float:
return cls._ANALYZER.measure_viscosity(word)
@classmethod
def get_turbulence(cls, words: List[str]) -> float:
return cls._ANALYZER.get_turbulence(words)
@classmethod
def vectorize(cls, text: str) -> Dict[str, float]:
if not cls._INITIALIZED:
cls.initialize()
return cls._ANALYZER.vectorize(text)
@classmethod
def compile_antigens(cls):
if not cls._INITIALIZED:
cls.initialize()
return
replacements = cls._STORE.ANTIGEN_REPLACEMENTS
if not replacements:
cls.ANTIGEN_REGEX = None
return
patterns = sorted(replacements.keys(), key=len, reverse=True)
escaped = [re.escape(str(p)) for p in patterns]
cls.ANTIGEN_REGEX = re.compile("|".join(escaped), re.IGNORECASE)
@classmethod
def sanitize(cls, text):
return cls._ANALYZER.sanitize(text)
@classmethod
def classify(cls, word):
PRIORITY_ORDER = [
"heavy",
"kinetic",
"explosive",
"thermal",
"cryo",
"sacred",
"antigen",
"meat",
"void",
"liminal",
"pareidolia",
"play",
"suburban",
"abstract",
]
known_cats = cls._STORE.get_categories_for_word(word)
if known_cats:
for p_cat in PRIORITY_ORDER:
if p_cat in known_cats:
return p_cat, 1.0
return next(iter(known_cats)), 1.0
return cls._ANALYZER.classify_word(word)
@classmethod
def clean(cls, text):
return cls.sanitize(text)
@classmethod
def taste(cls, word):
return cls.classify(word)
@classmethod
def create_field(cls):
return SemanticField(cls._ANALYZER)
@classmethod
def get(cls, category: str) -> Set[str]:
return cls._STORE.get_raw(category)
@classmethod
def get_random(cls, category: str) -> str:
words = list(cls.get(category))
return random.choice(words) if words else "void"
@classmethod
def teach(cls, word: str, category: str, tick: int = 0):
cls._STORE.teach(word, category, tick)
@classmethod
def save(cls):
if cls._INITIALIZED and cls._STORE:
cls._STORE.save_hive()
print(f"{Prisma.GRN}[LEXICON]: Hive saved to disk.{Prisma.RST}")
@classmethod
def harvest(cls, text: str) -> Dict[str, List[str]]:
return cls._STORE.harvest(text)
@classmethod
def learn_antigen(cls, word: str, replacement: str = ""):
cls._STORE.ANTIGEN_REPLACEMENTS[word] = replacement
cls.compile_antigens()
@classmethod
def tune_perception(cls, voltage: float, narrative_drag: float):
if cls._ANALYZER:
cls._ANALYZER.tune_sensitivity(voltage, narrative_drag)