#!/usr/bin/env python3 # --- BEGIN MEMORY MANIFEST (auto-updated) --- # (This block is auto-written by Hive to record what datasets/files # have already been converted into memory (curves). Do not edit by hand.) MEMORY_MANIFEST = { "updated_ts": 0, "datasets_done": [], "vectors_total": 0, "notes": "Set HIVE_ALLOW_SELF_WRITE_MANIFEST=0 to stop auto-updates." } # --- END MEMORY MANIFEST --- # -*- coding: utf-8 -*- # HIVE 🐝 FULL MERGED ALL-IN-ONE **OPTIMIZED** # Offline-first + Online updates + Auto Wi-Fi + RBAC + Multilingual Voice (ASR/TTS + Phonics) # + Internal Optimization Stack (Change Manager: propose âžĄī¸ sandbox âžĄī¸ A/B test âžĄī¸ apply/rollback with Owner policy) # Upload this single file and requirements.txt to a Hugging Face Space (or run locally). # - python app.py import os, sys, re, json, time, shutil, tempfile, subprocess, platform, socket, threading, importlib, hashlib, unicodedata, urllib.request, base64 from dataclasses import dataclass from typing import Optional, List, Dict, Tuple # ----------- light bootstrap (safe) ----------- def _ensure(pkgs): for p in pkgs: mod = p.split("==")[0].split(">=")[0].split("<=")[0].split("[")[0] try: importlib.import_module(mod) except Exception: try: subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", p]) except Exception: pass _ensure(["numpy>=1.24.0","psutil>=5.9.0","requests>=2.31.0","gradio>=4.44.0","sentence-transformers>=3.0.0","faiss-cpu>=1.8.0", "transformers>=4.44.0","accelerate>=0.33.0","datasets>=2.21.0","soundfile>=0.12.1","faster-whisper>=1.0.0","langid>=1.1.6", "piper-tts>=1.2.0","g2p_en>=2.1.0","librosa>=0.10.1","scikit-learn>=1.1.0","feedparser>=6.0.11","duckduckgo_search>=6.2.10", "keyring>=24.3.1"]) import numpy as np, psutil, requests, feedparser, langid, librosa, gradio as gr, soundfile as sf from sentence_transformers import SentenceTransformer from duckduckgo_search import DDGS from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from faster_whisper import WhisperModel from piper.voice import PiperVoice from g2p_en import G2p from sklearn.metrics.pairwise import cosine_similarity try: import torch except Exception: torch=None try: import faiss except Exception: subprocess.check_call([sys.executable,"-m","pip","install","--upgrade","faiss-cpu>=1.8.0"]) import faiss # Optional vision try: import cv2; _HAVE_CV=True except Exception: _HAVE_CV=False try: from PIL import Image import pytesseract; _HAVE_TESS=True and _HAVE_CV except Exception: _HAVE_TESS=False import queue try: import keyring except Exception: keyring=None # ----------------------- config ----------------------- def ENV(name, default=None, cast=str): v=os.getenv(name, default) if v is None: return None if cast is bool: return str(v).lower() in ("1","true","yes","on") if cast is int: try: return int(v) except (ValueError, TypeError): return int(float(v)) return v CFG={ # auto-archive memory to curves.tar.gz "HIVE_AUTO_ARCHIVE": ENV("HIVE_AUTO_ARCHIVE", "1", bool), "HIVE_AUTO_ARCHIVE_MODE": ENV("HIVE_AUTO_ARCHIVE_MODE", "per_chain", str), # per_chain | per_dataset "HIVE_ARCHIVE_PATH": ENV("HIVE_ARCHIVE_PATH", "curves.tar.gz", str), # staged ingestion chaining (auto-run multiple stages this boot) "HIVE_INGEST_CHAIN": ENV("HIVE_INGEST_CHAIN", "1", bool), "HIVE_INGEST_CHAIN_MAX": ENV("HIVE_INGEST_CHAIN_MAX", "2", int), # max stages per boot # staged ingestion controls "HIVE_INGEST_STAGED": ENV("HIVE_INGEST_STAGED", "1", bool), "HIVE_INGEST_STAGE_SIZE": ENV("HIVE_INGEST_STAGE_SIZE", "3", int), "HIVE_INGEST_MIN_FREE_GB": ENV("HIVE_INGEST_MIN_FREE_GB", "8", int), "HIVE_INGEST_NEXT": ENV("HIVE_INGEST_NEXT", "0", bool), # run one stage this boot # self-edit manifest controls "HIVE_ALLOW_SELF_WRITE_MANIFEST": ENV("HIVE_ALLOW_SELF_WRITE_MANIFEST", "1", bool), "HIVE_SELF_WRITE_FILE": ENV("HIVE_SELF_WRITE_FILE", "", str), # memory auto-restore controls (admin memory) "HIVE_CURVES_AUTO_RESTORE": ENV("HIVE_CURVES_AUTO_RESTORE", "1", bool), "CURVES_ARCHIVE_LOCAL": ENV("HIVE_CURVES_ARCHIVE_LOCAL", "curves.tar.gz", str), "CURVES_ARCHIVE_URL": ENV("HIVE_CURVES_ARCHIVE_URL", "", str), "CURVES_HF_DATASET": ENV("HIVE_CURVES_HF_DATASET", "", str), "CURVES_HF_SUBPATH": ENV("HIVE_CURVES_HF_SUBPATH", "", str), "HF_READ_TOKEN": ENV("HF_READ_TOKEN", "", str), # memory directory alias "MEMORY_DIR": ENV("HIVE_CURVE_DIR", "./curves"), "CURVE_DIR": ENV("HIVE_CURVE_DIR","./curves"), "STATE_DIR": ENV("HIVE_STATE_DIR","./state"), "LAUNCH_UI": ENV("HIVE_LAUNCH_UI","1",bool), "LLM_AUTOSIZE": ENV("HIVE_LLM_AUTOSIZE","1",bool), "LLM_MAX_VRAM_GB": ENV("HIVE_LLM_MAX_VRAM_GB","0", int), "MODEL_OVERRIDE": ENV("HIVE_MODEL_ID",""), "CTX_TOKENS": ENV("HIVE_CTX_TOKENS","2048",int), "OWNER_NAME": ENV("HIVE_OWNER_USER","Rose"), # Default Owner name "OWNER_PASS": ENV("HIVE_OWNER_PASS","Fehr2008"), # Default Owner password "OWNER_SECOND": ENV("HIVE_OWNER_SECOND","Paulbear01"), "AGENT_NAME": ENV("HIVE_AGENT_NAME","Hive"), "NO_PROFANITY": ENV("HIVE_NO_PROFANITY","1",bool), "ASR_SIZE": ENV("HIVE_ASR_SIZE","small"), "TTS_LANG": ENV("HIVE_TTS_LANG","en"), "BOOTSTRAP_INGEST": ENV("HIVE_BOOTSTRAP_INGEST","1",bool), "FORCE_REINGEST": ENV("HIVE_FORCE_REINGEST","0",bool), "INGEST_SOURCES": ENV("HIVE_INGEST_SOURCES",""), "ONLINE_ENABLE": ENV("HIVE_ONLINE_ENABLE","1",bool), "ONLINE_AUTO": ENV("HIVE_ONLINE_AUTO","0",bool), "ONLINE_SOURCES": ENV("HIVE_ONLINE_SOURCES","https://hnrss.org/frontpage,https://rss.nytimes.com/services/xml/rss/nyt/World.xml"), "ONLINE_TIMEOUT": ENV("HIVE_ONLINE_TIMEOUT","8",int), "ONLINE_MAX_RESULTS": ENV("HIVE_ONLINE_MAX_RESULTS","5",int), "ONLINE_TRIGGER": ENV("HIVE_ONLINE_TRIGGER","auto",str), # bounded self governance "HIVE_USE_HF_INFERENCE": ENV("HIVE_USE_HF_INFERENCE","0",bool), "HIVE_HF_ENDPOINT": ENV("HIVE_HF_ENDPOINT","",str), "ALLOW_SELF_REBOOT": ENV("HIVE_ALLOW_SELF_REBOOT","1",bool), "ALLOW_RUNTIME_HOTPATCH": ENV("HIVE_ALLOW_RUNTIME_HOTPATCH","1",bool), "AUTO_SELF_OPTIMIZE": ENV("HIVE_AUTO_SELF_OPTIMIZE","1",bool), # internal optimization with sandbox + A/B (Owner policy) "OPT_ENABLE": ENV("HIVE_OPT_ENABLE","1",bool), "OPT_AUTO_APPLY": ENV("HIVE_OPT_AUTO_APPLY","0",bool), # OWNER MAY SET TO 1 "OPT_PKG_ALLOWLIST": ENV("HIVE_OPT_PKG_ALLOWLIST","transformers,accelerate,datasets,sentence-transformers,faiss-cpu,duckduckgo_search,feedparser,requests,gradio").split(","), "OPT_MODEL_ALLOWLIST": ENV("HIVE_OPT_MODEL_ALLOWLIST","meta-llama/Meta-Llama-3.1-8B-Instruct,meta-llama/Meta-Llama-3.1-70B-Instruct,TinyLlama/TinyLlama-1.1B-Chat-v1.0").split(","), "OPT_THRESH_LATENCY_MS": ENV("HIVE_OPT_THRESH_LATENCY_MS","0",int), "OPT_THRESH_TOKS_PER_S": ENV("HIVE_OPT_THRESH_TOKS_PER_S","0",float), "OPT_THRESH_QUALITY": ENV("HIVE_OPT_THRESH_QUALITY","0.02",float), "OPT_SANDBOX_TIMEOUT": ENV("HIVE_OPT_SANDBOX_TIMEOUT","180",int), } os.makedirs(CFG["CURVE_DIR"], exist_ok=True) os.makedirs(CFG["STATE_DIR"], exist_ok=True) OVERLAY_DIR = os.path.join(CFG["STATE_DIR"], "runtime_overlay") RUNTIME_OVERRIDES = os.path.join(CFG["STATE_DIR"], "runtime_overrides.json") OPT_DIR = os.path.join(CFG["STATE_DIR"], "opt") OPT_PROPOSALS = os.path.join(OPT_DIR, "proposals.jsonl") OPT_RESULTS = os.path.join(OPT_DIR, "results.jsonl") for p in (OVERLAY_DIR, OPT_DIR): os.makedirs(p, exist_ok=True) # ----------------- sensing / model pick ----------------- def _has_gpu_env()->bool: accel=os.getenv("SPACE_ACCELERATOR","").lower() if accel in ("t4","a10","a100","l4","l40","h100"): return True try: return torch is not None and torch.cuda.is_available() except Exception: return False def probe_caps(): free_gb = shutil.disk_usage(".").free/(1024**3) ram_gb = psutil.virtual_memory().available/(1024**3) return {"free_gb":free_gb,"ram_gb":ram_gb,"gpu":_has_gpu_env(), "max_docs":70000 if ram_gb>16 else (50000 if ram_gb>8 else 12000), "batch":512 if ram_gb>16 else (256 if ram_gb>8 else 64)} CANDIDATES=[ ("TinyLlama/TinyLlama-1.1B-Chat-v1.0", 0), ("meta-llama/Meta-Llama-3.1-8B-Instruct",12), ("meta-llama/Meta-Llama-3.1-70B-Instruct",100) ] def pick_model()->Tuple[str,dict]: if CFG["MODEL_OVERRIDE"]: return CFG["MODEL_OVERRIDE"], {"device":"cuda" if _has_gpu_env() else "cpu"} max_vram=CFG["LLM_MAX_VRAM_GB"] if _has_gpu_env(): for mid,need in reversed(CANDIDATES): if need and (max_vram==0 or need<=max_vram): return mid, {"device":"cuda"} else: ram=psutil.virtual_memory().total/(1024**3) for mid,need in reversed(CANDIDATES): if need==0 and ram>=6: return mid, {"device":"cpu"} return "TinyLlama/TinyLlama-1.1B-Chat-v1.0", {"device":"cpu"} # ----------------- embeddings / curves ----------------- _EMB_ID=os.getenv("HIVE_EMB_ID","sentence-transformers/all-MiniLM-L6-v2") class GEC: def __init__(self): self.model=SentenceTransformer(_EMB_ID) def encode(self, texts: List[str]): return self.model.encode(texts, normalize_embeddings=True) class CurveStore: def __init__(self, d): self.dir=d; os.makedirs(d, exist_ok=True) self.idx_path=os.path.join(d,"faiss.index") self.meta_path=os.path.join(d,"meta.jsonl") self.dim=384; self.gec=GEC() self.index=faiss.read_index(self.idx_path) if os.path.exists(self.idx_path) else faiss.IndexFlatIP(self.dim) def add_texts(self, docs:List[str], metas:List[Dict]): if not docs: return vecs=np.asarray(self.gec.encode(docs), dtype="float32") self.index.add(vecs) with open(self.meta_path,"a",encoding="utf-8") as f: for m in metas: f.write(json.dumps(m, ensure_ascii=False)+"\n") faiss.write_index(self.index, self.idx_path) def search(self, query:str, k:int=6)->List[Dict]: if self.index.ntotal==0: return [] qv=np.asarray(self.gec.encode([query]), dtype="float32") D,I=self.index.search(qv,k) lines=open(self.meta_path,"r",encoding="utf-8").read().splitlines() if os.path.exists(self.meta_path) else [] out=[] for i in I[0]: if 0<=ibool: idx=os.path.join(curve_dir,"faiss.index") if os.path.exists(OFFLINE_MARK): try: return json.load(open(OFFLINE_MARK)).get("ok",True) except Exception: return True if os.path.exists(idx): try: return faiss.read_index(idx).ntotal>0 except Exception: return False return False def _mark_offline_ready(): try: json.dump({"ok":True,"ts":time.time()}, open(OFFLINE_MARK,"w",encoding="utf-8")) except Exception: pass # ----------- HF Datasets bootstrap ----------- DEFAULT_SOURCES=["jhu-clsp/jflue","bea2019st/wi_locness","fce-m2109/mascorpus","rajpurkar/squad_v2", "OpenRL/daily_dialog","tetti/spelling-dataset-extended","Helsinki-NLP/opus-100","facebook/flores", "HuggingFaceH4/no_robots","bigscience/xP3","allenai/sciq","allenai/c4", "mozilla-foundation/common_voice_17_0","bene-ges/en_cmudict","openslr/librispeech_asr","conceptnet5/conceptnet5","grammarly/coedit"] def _iter_text(dataset_name:str, split="train"): from datasets import load_dataset try: ds=load_dataset(dataset_name, split=split, streaming=True) except Exception: ds=load_dataset(dataset_name, split=split, trust_remote_code=True) for ex in ds: text = ex.get("text") or ex.get("sentence") or ex.get("content") or ex.get("question") if not text: if "translation" in ex and isinstance(ex["translation"], dict): tdict=ex["translation"]; text=" | ".join([f"{k}:{v}" for k,v in tdict.items() if isinstance(v,str)]) else: text=str(ex) yield {"text": str(text)} def _plan_order(srcs: List[str])->List[str]: first=["jhu-clsp/jflue","bea2019st/wi_locness","fce-m2109/mascorpus","rajpurkar/squad_v2","OpenRL/daily_dialog","tetti/spelling-dataset-extended"] ordered=[s for s in first if s in srcs] for s in srcs: if s not in ordered: ordered.append(s) return ordered def ingest_all(curve_dir:str, sources: Optional[List[str]]=None, scope="general"): caps=probe_caps() store=CurveStore(curve_dir); lib=LibrarianCurve(store) os.makedirs(curve_dir, exist_ok=True) logf=os.path.join(curve_dir,"ingest_log.jsonl") count_total=0; sources=sources or DEFAULT_SOURCES for ds in _plan_order(sources): count=0; bt=[]; bm=[] try: for rec in _iter_text(ds): txt=(rec.get("text") or "").strip() if not txt: continue bt.append(txt); bm.append({"dataset":ds,"text":txt[:500]}) if len(bt)>=caps["batch"]: lib.ingest_pairs(bt,bm,scope); count+=len(bt); count_total+=len(bt); bt,bm=[],[] if count>=caps["max_docs"]: break if bt: lib.ingest_pairs(bt,bm,scope); count+=len(bt); count_total+=len(bt); bt,bm=[],[] with open(logf,"a",encoding="utf-8") as f: f.write(json.dumps({"dataset":ds,"ingested":count})+"\n") except Exception as e: with open(logf,"a",encoding="utf-8") as f: f.write(json.dumps({"dataset":ds,"error":str(e)})+"\n") return count_total # ----------- live search + RSS âžĄī¸ curves ----------- ONLINE_DB=os.path.join(CFG["STATE_DIR"],"online_seen.json") def _load_json(path, default): if os.path.exists(path): try: return json.load(open(path,"r",encoding="utf-8")) except Exception: return default return default def _save_json(path, data): json.dump(data, open(path,"w",encoding="utf-8"), indent=2) def online_available(timeout:int)->bool: try: requests.get("https://huggingface.co", timeout=timeout) return True except Exception: return False def _hash(s:str)->str: return hashlib.sha1(s.encode("utf-8","ignore")).hexdigest() def fetch_rss(urls:List[str], timeout:int=8, limit:int=50)->List[Dict]: items=[] for u in urls: try: f=feedparser.parse(u) for e in f.entries[:limit]: items.append({"title":e.get("title",""),"link":e.get("link",""),"summary":e.get("summary") or e.get("description",""),"published":e.get("published") or e.get("updated",""),"source":u}) except Exception: # consider logging this error pass return items def web_search_snippets(query:str, max_results:int=5, timeout:int=8)->list: out=[] try: with DDGS(timeout=timeout) as ddgs: for r in ddgs.text(query, max_results=max_results): if r and r.get("body"): out.append({"title":r.get("title",""),"href":r.get("href",""),"body":r.get("body","")}) except Exception: # consider logging this error pass return out # ----------- RBAC / users / lockouts ----------- USERS_DB=os.path.join(CFG["STATE_DIR"],"users.json") LOCKS_DB=os.path.join(CFG["STATE_DIR"],"lockouts.json") VOICES_DB=os.path.join(CFG["STATE_DIR"],"voices.json") ADAPT_DB=os.path.join(CFG["STATE_DIR"],"speech_adapt.json") def _init_users(): d={"owner":{"id":"owner:1","name":CFG["OWNER_NAME"],"role":"owner","pass":CFG["OWNER_PASS"],"second":CFG["OWNER_SECOND"],"prefs":{"activation_names":[CFG["AGENT_NAME"]],"language":"en"}}, "admins_super":[],"admins_general":[],"users":[]} _save_json(USERS_DB,d); return d def _load_users(): d=_load_json(USERS_DB, None); return d if d else _init_users() def _find_user(d, name_or_id): pools=[("owner",[d.get("owner")]),("admin_super",d["admins_super"]),("admin_general",d["admins_general"]),("user",d["users"])] for role,pool in pools: for u in pool or []: if u and (u.get("id")==name_or_id or u.get("name")==name_or_id): return u, role return None, None PERMS={ "owner":{"can_add":["admin_super","admin_general","user"],"can_remove":["admin_super","admin_general","user"], "can_edit_role_of":["admin_super","admin_general","user"],"can_edit_profile_of":["owner","admin_super","admin_general","user"], "can_view_scopes":"all","maintenance":"full","code_edit":"approve_and_edit"}, "admin_super":{"can_add":["admin_general","user"],"can_remove":["admin_general","user"], "can_edit_role_of":["admin_general","user"],"can_edit_profile_of":["admin_general","user"], "can_view_scopes":"self_only","maintenance":"advanced","code_edit":"suggest_only"}, "admin_general":{"can_add":["user"],"can_remove":["user"],"can_edit_role_of":["user"],"can_edit_profile_of":["user"], "can_view_scopes":"self_only","maintenance":"basic","code_edit":"suggest_only"}, "user":{"can_add":[],"can_remove":[],"can_edit_role_of":[],"can_edit_profile_of":["user"], "can_view_scopes":"self_only","maintenance":"none","code_edit":"none"}, "guest":{"can_add":[],"can_remove":[],"can_edit_role_of":[],"can_edit_profile_of":[], "can_view_scopes":"self_only","maintenance":"none","code_edit":"none"}, } def attempt_login(name_or_id:str, password:str="", second:Optional[str]=None): d=_load_users(); locks=_load_json(LOCKS_DB,{ }) def lock_fail(lid, msg): st=locks.get(lid, {"fails":0,"until":0}); st["fails"]=st.get("fails",0)+1 dur=180 if st["fails"]>=3 else 0; st["until"]=time.time()+dur if dur else 0 locks[lid]=st; _save_json(LOCKS_DB,locks); return False, msg u,_=_find_user(d, name_or_id) if not u: return False, "Profile not found." role=u.get("role","user"); lid=u.get("id", u.get("name")); now=time.time() st=locks.get(lid, {"fails":0,"until":0}) if now < st.get("until",0): return False, f"Locked; try again in ~{int(st['until']-now)}s." if role in ("admin_general","admin_super","owner"): if role=="owner": if password!=u.get("pass") or (u.get("second") and second!=u.get("second")): return lock_fail(lid, "Owner credentials incorrect.") else: if password!=u.get("pass"): return lock_fail(lid, "Admin password incorrect.") locks[lid]={"fails":0,"until":0}; _save_json(LOCKS_DB,locks) return True, f"Welcome, {u.get('name')} ({role})." # ----------- voice: ASR/TTS/phonics ----------- G2P = G2p() ASR_MODELS={"tiny":"tiny","base":"base","small":"small","medium":"medium","large-v3":"large-v3"} def _asr_model_name(): return ASR_MODELS.get(CFG["ASR_SIZE"],"small") _ASR=None def get_asr(): global _ASR if _ASR is not None: return _ASR size=_asr_model_name(); device="cuda" if (_has_gpu_env()) else "cpu" compute_type="float16" if device=="cuda" else "int8" _ASR=WhisperModel(size, device=device, compute_type=compute_type); return _ASR PIPER_MODELS={ "en": ("https://github.com/rhasspy/piper/releases/download/v0.0.2/en_US-amy-low.onnx", "https://github.com/rhasspy/piper/releases/download/v0.0.2/en_US-amy-low.onnx.json"), "es": ("https://github.com/rhasspy/piper/releases/download/v0.0.2/es_ES-davefx-medium.onnx", "https://github.com/rhasspy/piper/releases/download/v0.0.2/es_ES-davefx-medium.onnx.json"), "fr": ("https://github.com/rhasspy/piper/releases/download/v0.0.2/fr_FR-gilles-medium.onnx", "https://github.com/rhasspy/piper/releases/download/v0.0.2/fr_FR-gilles-medium.onnx.json"), "de": ("https://github.com/rhasspy/piper/releases/download/v0.0.2/de_DE-thorsten-low.onnx", "https://github.com/rhasspy/piper/releases/download/v0.0.2/de_DE-thorsten-low.onnx.json"), "zh": ("https://github.com/rhasspy/piper/releases/download/v0.0.2/zh_CN-huayan-low.onnx", "https://github.com/rhasspy/piper/releases/download/v0.0.2/zh_CN-huayan-low.onnx.json"), } def _download(url,dst, timeout=30): if os.path.exists(dst): return dst os.makedirs(os.path.dirname(dst),exist_ok=True); urllib.request.urlretrieve(url,dst); return dst # TODO: add timeout _TTS_CACHE={} def get_tts(lang="en") -> PiperVoice: lang=lang if lang in PIPER_MODELS else "en" if lang in _TTS_CACHE: return _TTS_CACHE[lang] mu,cu=PIPER_MODELS[lang]; m=_download(mu,f"./models/piper/{os.path.basename(mu)}"); c=_download(cu,f"./models/piper/{os.path.basename(cu)}") v=PiperVoice.load(m,c); _TTS_CACHE[lang]=v; return v def _embed_mfcc(path)->np.ndarray: y, sr = librosa.load(path, sr=16000) mf=librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20) return mf.mean(axis=1) def enroll_voice(uid:str, path:str) -> bool: db=_load_json(VOICES_DB, {}); db[uid]=_embed_mfcc(path).astype(float).tolist(); _save_json(VOICES_DB, db); return True def identify_voice(path:str, threshold:float=0.70) -> Optional[str]: db=_load_json(VOICES_DB, {}); if not db: return None emb=_embed_mfcc(path).reshape(1,-1) keys=list(db.keys()); mats=np.array([db[k] for k in keys]) sims=cosine_similarity(emb, mats)[0]; i=int(np.argmax(sims)); return keys[i] if sims[i]>=threshold else None _BASIC={'a':'a as in apple /ÃĻ/','e':'e as in elephant /ɛ/','i':'i as in igloo /ÉĒ/','o':'o as in octopus /ɒ/','u':'u as in umbrella /ƌ/', 'c':'c as in cat /k/ (before e/i/y often /s/)','g':'g as in goat /g/ (before e/i/y often soft /dʒ/)','y':'y as in yellow /j/ or happy /i/'} def phonics(word:str)->str: toks=G2P(word); phones=[t for t in toks if re.match(r"[A-Z]+[0-2]?$", t)] hints=[]; for ch in word.lower(): if ch in _BASIC and _BASIC[ch] not in hints: hints.append(_BASIC[ch]) return f"Phonemes: {' '.join(phones)} | Hints: {('; '.join(hints)) if hints else '🐝'}" def lid_chunk(text:str, min_len:int=12)->List[Tuple[str,str]]: parts=re.split(r"([.!?;\u2026\u2028\u2029])+\s{2,}|", text) chunks=[]; buf="" for p in parts: if not p: continue buf+=p if len(buf)>=min_len or re.match(r"[.!?;\u2026\u2028\u2029]", p): lang,_=langid.classify(buf.strip()); chunks.append((buf.strip(), lang)); buf="" if buf.strip(): lang,_=langid.classify(buf.strip()); chunks.append((buf.strip(), lang)) return chunks def asr_transcribe(path:str, uid: Optional[str], forced_lang: Optional[str]=None)->str: model=get_asr() prior=_load_json(ADAPT_DB,{}).get(uid or "guest",{}).get("lang_prior") language=forced_lang or prior or None segs, info = model.transcribe(path, language=language, beam_size=5, vad_filter=True) text=" ".join([s.text for s in segs]) if segs else "" if not forced_lang and text.strip(): lid,_=langid.classify(text); prof=_load_json(ADAPT_DB,{}); p=prof.get(uid or "guest",{}); p["lang_prior"]=lid; prof[uid or "guest"]=p; _save_json(ADAPT_DB,prof) return text def synthesize_multilang(text:str, fallback="en")->str: chunks=lid_chunk(text) sr=None; mix=None for ch, lg in chunks or [(text, fallback)]: lg2=lg if lg in PIPER_MODELS else fallback v=get_tts(lg2); aud, _ = v.synthesize(ch) if sr is None: sr=v.sample_rate mix = aud if mix is None else np.concatenate([mix,aud]) outp=os.path.join(tempfile.gettempdir(), f"hive_tts_{int(time.time())}.wav") sf.write(outp, mix if mix is not None else np.zeros(1), sr or 22050, subtype="PCM_16") return outp # ----------- compiler / engine ----------- class OCRagRanker: def execute(self, query, candidates): words=set(re.findall(r"\w+", query.lower())) def score(x): t=(x.get("text","") or "").lower() overlap=sum(1 for w in words if w in t) return overlap*2 + min(len(t),300)/300.0 return sorted(candidates, key=score, reverse=True) class OCPromptMinimizer: def execute(self, snippets, budget): out=[]; total=0 for s in snippets: t=(s.get("text","") or "")[:300] if total+len(t)<=budget: out.append(t); total+=len(t) return out OC_REG={"rag_ranker": OCRagRanker(), "prompt_minimizer": OCPromptMinimizer()} class PromptCompiler: def __init__(self): self.override_head=None self.override_budget=None def compile(self, user_msg, snippets, token_budget=600): if self.override_budget: token_budget=self.override_budget ranked=OC_REG["rag_ranker"].execute(user_msg, snippets) chosen=OC_REG["prompt_minimizer"].execute(ranked, budget=max(200, token_budget//3)) head=self.override_head if isinstance(self.override_head,str) else "Use the brief, relevant facts below.\n" body="\n".join([f"- {t}" for t in chosen]) return f"{head}{body}\n\nUser: {user_msg}\nAssistant:" class EngineCurve: def __init__(self): self.stats={"runs":0,"ok":0,"latency_ms":[]} self.router_rules=[] def choose_route(self, msg:str)->str: for pat in self.router_rules or []: if isinstance(pat, re.Pattern) and pat.search(msg): s=pat.pattern.lower() if "translation" in s: return "translation" if "vision" in s: return "vision" return "general" def run(self, message:str, snippets:List[Dict])->Dict: t0=time.time(); _route=self.choose_route(message); t1=time.time() self.stats["runs"]+=1; self.stats["ok"]+=1; self.stats["latency_ms"].append(int((t1-t0)*1000)) return {"ok":True,"route":_route} # ----------- wifi auto-connect (non-blocking) ----------- NET_STATE_DB=os.path.join(CFG["STATE_DIR"],"wifi_known.json") def _os_name(): return platform.system().lower() def _fast_probe(host="8.8.8.8", port=53, timeout=1.5)->bool: try: socket.setdefaulttimeout(timeout) s=socket.socket(socket.AF_INET, socket.SOCK_STREAM); s.connect((host,port)); s.close() return True except Exception: return False def _http_probe(url="https://huggingface.co", timeout=2.5)->float: try: t0=time.time(); r=requests.head(url, timeout=timeout) if r.status_code<500: return (time.time()-t0)*1000.0 except Exception: pass return -1.0 def _load_known()->List[dict]: data=_load_json(NET_STATE_DB, []); out=[] for d in data: if isinstance(d,dict) and "ssid" in d: out.append({"ssid":d["ssid"],"priority":int(d.get("priority",0))}) out.sort(key=lambda x: x.get("priority",0), reverse=True); return out def _get_saved_password(ssid:str)->Optional[str]: if keyring: try: return keyring.get_password("hive_wifi", ssid) or "" except Exception: return None return None def _connect_linux(ssid, password, timeout=12)->Tuple[bool,str]: try: cmd=["nmcli","device","wifi","connect",ssid]+(["password",password] if password else []) p=subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return (p.returncode==0), (p.stdout or p.stderr or "").strip() except Exception as e: return False, f"nmcli error: {e}" def _connect_windows(ssid, password)->Tuple[bool,str]: try: p=subprocess.run(["netsh","wlan","connect","name="+ssid,"ssid="+ssid], capture_output=True, text=True) if p.returncode==0 and "success" in (p.stdout+p.stderr).lower(): return True,"Connected." if not password: return False,"No saved password." xml=f''' {ssid}{ssid} ESSauto WPA2PSK AESfalse passPhrasefalse {password}''' tmp=os.path.join(os.getenv("TEMP","/tmp"), f"wifi_{int(time.time())}.xml"); open(tmp,"w",encoding="utf-8").write(xml) a=subprocess.run(["netsh","wlan","add","profile","filename="+tmp,"user=all"], capture_output=True, text=True) if a.returncode!=0: return False, a.stderr or a.stdout or "add profile failed" c=subprocess.run(["netsh","wlan","connect","name="+ssid,"ssid="+ssid], capture_output=True, text=True) return (c.returncode==0), (c.stderr or c.stdout or "").strip() except Exception as e: return False, f"netsh error: {e}" def _connect_macos(ssid, password)->Tuple[bool,str]: try: out=subprocess.check_output(["networksetup","-listallhardwaresports"], stderr=subprocess.DEVNULL).decode("utf-8","ignore") dev=None for block in out.split("\n\n"): if "Wi-Fi" in block or "AirPort" in block: for l in block.splitlines(): if l.strip().startswith("Device:"): dev=l.split(":",1)[1].strip(); break if dev: break if not dev: return False,"Wi-Fi device not found" cmd=["networksetup","-setairportnetwork",dev, ssid]+([password] if password else []) p=subprocess.run(cmd, capture_output=True, text=True) return (p.returncode==0), (p.stderr or p.stdout or "").strip() except Exception as e: return False, f"networksetup error: {e}" def _connect_os(ssid,password,timeout=12)->Tuple[bool,str]: osn=_os_name() if osn=="linux": return _connect_linux(ssid,password,timeout) if osn=="windows": return _connect_windows(ssid,password) if osn=="darwin": return _connect_macos(ssid,password) return False, f"Unsupported OS: {osn}" class AutoConnector: def __init__(self): self.last_attempt=0.0; self.cooldown_s=30.0; self.per_ssid_timeout=10.0; self.total_budget_s=18.0; self.thread=None; self._lock=threading.Lock() def online_quick(self)->bool: return _fast_probe(timeout=1.2) def quality_ms(self)->float: return _http_probe(timeout=2.0) def _run_once(self): if self.online_quick(): return known=_load_known(); if not known: return t_start=time.time() for item in known: if time.time()-t_start>self.total_budget_s: return ssid=item["ssid"]; pw=_get_saved_password(ssid) ok,_msg=_connect_os(ssid,pw,timeout=int(self.per_ssid_timeout)) if ok and self.online_quick(): return def kick_async(self): with self._lock: now=time.time() if now-self.last_attempt float: if not snippets or not scores: return 0.0 s = sorted(scores, reverse=True)[:3] base = sum(s)/len(s) if s else 0.0 bonus = min(0.15, 0.03 * len(snippets)) return float(max(0.0, min(1.0, base + bonus))) # ----------- overlay / hotpatch ----------- ALLOWED_PATCH_KEYS={"prompt_head","retrieval_k","token_budget","temperature","router_rules","web_threshold"} def _load_overrides(): if os.path.exists(RUNTIME_OVERRIDES): try: return json.load(open(RUNTIME_OVERRIDES,"r",encoding="utf-8")) except Exception: return {} return {} def _save_overrides(ovr:dict): json.dump(ovr, open(RUNTIME_OVERRIDES,"w",encoding="utf-8"), indent=2) class RuntimeOverlay: def __init__(self): self.ovr=_load_overrides() def apply_to(self, hive: "Hive"): o=self.ovr or {} if isinstance(o.get("prompt_head"),str): hive.compiler.override_head=o["prompt_head"] if isinstance(o.get("token_budget"),int): hive.compiler.override_budget=max(256, min(8192, o["token_budget"])) hive.retrieval_k=int(o.get("retrieval_k",6)); hive.retrieval_k=max(3,min(24,hive.retrieval_k)) hive.decoding_temperature=float(o.get("temperature",0.7)); hive.decoding_temperature=max(0.0,min(1.5,hive.decoding_temperature)) rr=o.get("router_rules") or [] if isinstance(rr,list): try: hive.engine.router_rules=[re.compile(pat,re.I) for pat in rr if isinstance(pat,str) and pat] except re.error: hive.engine.router_rules=[] t=o.get("web_threshold",None); hive.web_threshold=float(t) if isinstance(t,(int,float)) else 0.40 def patch(self, patch:dict, actor_role:str="hive")->Tuple[bool,str]: if not CFG["ALLOW_RUNTIME_HOTPATCH"]: return False,"Runtime hotpatch disabled." if actor_role not in ("hive","admin_general","admin_super","owner"): return False,"Unauthorized actor." for k in list(patch.keys()): if k not in ALLOWED_PATCH_KEYS: patch.pop(k,None) if not patch: return False,"No allowed keys." self.ovr.update(patch); _save_overrides(self.ovr); return True,"Patched." # ----------- safe reboot ----------- def _persist_before_reboot(): try: json.dump({"ts":time.time(),"note":"self-reboot"}, open(os.path.join(CFG["STATE_DIR"],"last_reboot.json"),"w",encoding="utf-8")) except Exception: pass def safe_reboot(reason:str="optimization"): if not CFG["ALLOW_SELF_REBOOT"]: return False,"Self-reboot disabled." _persist_before_reboot() try: os.execv(sys.executable, [sys.executable, os.path.abspath(__file__)] + sys.argv[1:]) except Exception: os._exit(3) return True, f"Rebooting: {reason}" # ----------- self optimizer (bounded) ----------- class SelfOptimizer(threading.Thread): def __init__(self, hive: "Hive"): super().__init__(daemon=True); self.hive=hive; self.stop=False; self.tick=45.0 def run(self): while not self.stop: time.sleep(self.tick) if not CFG["AUTO_SELF_OPTIMIZE"]: continue vm=psutil.virtual_memory(); ovr={} if vm.percent>88: ovr["token_budget"]=max(512,int(0.75*(self.hive.compiler.override_budget or CFG["CTX_TOKENS"]))) # type: ignore ovr["temperature"]=max(0.2,self.hive.decoding_temperature-0.1) lat=(sum(self.hive.engine.stats["latency_ms"][-10:])/max(1,len(self.hive.engine.stats["latency_ms"][-10:]))) if self.hive.engine.stats["latency_ms"] else 0 if lat>1200: ovr["retrieval_k"]=max(3,self.hive.retrieval_k-1) if ovr: ok,_=self.hive.overlay.patch(ovr, actor_role="hive") if ok: self.hive.overlay.apply_to(self.hive) if CFG["ALLOW_SELF_REBOOT"] and vm.percent>94: safe_reboot("refresh memory") # ----------- internal optimization stack ----------- def _append_jsonl(path, rec): with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(rec, ensure_ascii=False) + "\n") @dataclass class ChangeProposal: kind: str # "model" | "package" | "code" name: str # model id / package name / file target version: str = "" patch_text: str = ""# for "code": full replacement or diff reason: str = "" created_ts: float = time.time() proposer: str = "hive" id: str = "" class Sandbox: def __init__(self): self.root=os.path.join(OPT_DIR, f"sandbox_{int(time.time())}") os.makedirs(self.root, exist_ok=True) self.venv=os.path.join(self.root,"venv") def _run(self, args, timeout): p=subprocess.run(args, capture_output=True, text=True, timeout=timeout) return p.returncode, (p.stdout or "") + (p.stderr or "") def create(self): rc,out=self._run([sys.executable,"-m","venv",self.venv], timeout=120) if rc!=0: raise RuntimeError("venv create failed: "+out) def pip(self, pkg_spec): py=os.path.join(self.venv,"bin","python") if os.name!="nt" else os.path.join(self.venv,"Scripts","python.exe") rc,out=self._run([py,"-m","pip","install","--upgrade",pkg_spec], timeout=CFG["OPT_SANDBOX_TIMEOUT"]) if rc!=0: raise RuntimeError("pip install failed: "+out) def run_snippet(self, code:str): py=os.path.join(self.venv,"bin","python") if os.name!="nt" else os.path.join(self.venv,"Scripts","python.exe") tmp=os.path.join(self.root,"snippet.py"); open(tmp,"w",encoding="utf-8").write(code) rc,out=self._run([py,tmp], timeout=CFG["OPT_SANDBOX_TIMEOUT"]); return rc,out def _synthetic_eval(hive_factory, prompts: List[str]) -> Dict: lat_ms=[]; toks_s=[]; quality=0.0 for p in prompts: t0=time.time() h=hive_factory() out=h.pipe(h.compiler.compile(p, []), max_new_tokens=64, do_sample=False, temperature=0.2) # type: ignore t1=time.time() text=out[0]["generated_text"] lat_ms.append((t1-t0)*1000) toks=max(1,len(text.split())); toks_s.append(toks/max(0.001,(t1-t0))) q=sum(1 for w in set(re.findall(r"\w+", p.lower())) if w in text.lower())/max(1,len(set(re.findall(r"\w+", p.lower())))) quality+=q n=max(1,len(prompts)) return {"lat_ms":sum(lat_ms)/n, "toks_s":sum(toks_s)/n, "quality":quality/n} class ChangeManager: def __init__(self, hive_cls): self.hive_cls=hive_cls def _allowed_pkg(self, name): return any(name.strip().startswith(allow.strip()) for allow in CFG["OPT_PKG_ALLOWLIST"]) def _allowed_model(self, mid): return mid in CFG["OPT_MODEL_ALLOWLIST"] def propose(self, cp: ChangeProposal)->str: cp.id=f"chg_{int(time.time())}_{abs(hash(cp.name))%100000}"; _append_jsonl(OPT_PROPOSALS, cp.__dict__); return cp.id def test_and_compare(self, cp_id:str, proposal: ChangeProposal)->Dict: def base_hive(): return self.hive_cls(model_id=None) prompts=["Summarize the water cycle.","Translate to French: the quick brown fox jumps over the lazy dog.","Two-sentence difference between TCP and UDP."] base=_synthetic_eval(base_hive, prompts) sand=Sandbox(); sand.create() model_override=None try: if proposal.kind=="package": if not self._allowed_pkg(proposal.name): return {"ok":False,"reason":"package not allowlisted"} spec=proposal.name + (("=="+proposal.version) if proposal.version else "") sand.pip(spec) elif proposal.kind=="model": if not self._allowed_model(proposal.name): return {"ok":False,"reason":"model not allowlisted"} model_override=proposal.name elif proposal.kind=="code": target=os.path.basename(__file__); patched=os.path.join(sand.root,target) with open(patched,"w",encoding="utf-8") as f: f.write(proposal.patch_text or "") code=f"import importlib.util, json; p=r'{patched}'; spec=importlib.util.spec_from_file_location('hmod',p); m=importlib.util.module_from_spec(spec); spec.loader.exec_module(m); h=m.Hive(); print(json.dumps({{'ok':True}}))" rc,out=sand.run_snippet(code) if rc!=0 or '"ok": true' not in out.lower(): return {"ok":False,"reason":"patch smoke test failed","out":out} except Exception as e: return {"ok":False,"reason":f"sandbox failed: {e}"} def cand_hive(): return self.hive_cls(model_id=model_override) if model_override else self.hive_cls(model_id=None) cand=_synthetic_eval(cand_hive, prompts) delta={"lat_ms": base["lat_ms"]-cand["lat_ms"], "toks_s": cand["toks_s"]-base["toks_s"], "quality": cand["quality"]-base["quality"]} passed=True if CFG["OPT_THRESH_LATENCY_MS"]>0 and delta["lat_ms"]0 and delta["toks_s"]Tuple[bool,str]: prop=result.get("proposal",{}); kind=prop.get("kind"); name=prop.get("name","") if not result.get("passed"): return False,"did not meet thresholds" if kind=="package": if not self._allowed_pkg(name): return False,"package not allowlisted" try: subprocess.check_call([sys.executable,"-m","pip","install","--upgrade", name + (("=="+prop.get("version","")) if prop.get("version") else "")]) return True,"package installed" except Exception as e: return False,f"pip failed: {e}" if kind=="model": if not self._allowed_model(name): return False,"model not allowlisted" pref=os.path.join(OPT_DIR,"preferred_model.json"); json.dump({"model_id":name,"ts":time.time()}, open(pref,"w",encoding="utf-8")) return True,"model preference recorded (takes effect after restart)" # type: ignore if kind=="code": if not CFG["OPT_AUTO_APPLY"]: return False,"awaiting Owner approval for code changes" try: target=os.path.abspath(__file__); backup=target+f".bak_{int(time.time())}"; shutil.copyfile(target,backup) open(target,"w",encoding="utf-8").write(prop.get("patch_text","")); return True,"code updated (backup created); restart recommended" except Exception as e: return False,f"code write failed: {e}" return False,"unknown change type" # ----------- Hive core ----------- # --- Memory & Manifest Helpers (auto-inserted) --- import tempfile, urllib.request, tarfile, zipfile from pathlib import Path as _Path def _human_ts(ts: int) -> str: import datetime try: # type: ignore return datetime.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S UTC") except Exception: return str(ts) INGEST_PROGRESS = os.path.join(CFG.get("STATE_DIR","./state"), "ingest_progress.json") def _load_progress(): try: if os.path.exists(INGEST_PROGRESS): return json.load(open(INGEST_PROGRESS, "r", encoding="utf-8")) except Exception: pass return {"done": [], "stage": 0, "ts": 0} def _save_progress(p): try: json.dump(p, open(INGEST_PROGRESS, "w", encoding="utf-8"), indent=2) except Exception: pass def update_self_manifest(datasets_done: list, vectors_total: int): """Rewrite the MEMORY_MANIFEST block inside this script.""" if not CFG.get("HIVE_ALLOW_SELF_WRITE_MANIFEST", True): return False, "self-write disabled" target = CFG.get("HIVE_SELF_WRITE_FILE") or os.path.abspath(__file__) try: with open(target, "r", encoding="utf-8") as f: src = f.read() except Exception as e: return False, f"read error: {e}" start_tag = "# --- BEGIN MEMORY MANIFEST (auto-updated) ---" end_tag = "# --- END MEMORY MANIFEST ---" if start_tag not in src or end_tag not in src: return False, "manifest markers not found" head, rest = src.split(start_tag, 1) _, tail = rest.split(end_tag, 1) payload = { "updated_ts": int(time.time()), "datasets_done": sorted(list({*datasets_done})), "vectors_total": int(vectors_total), "notes": "Set HIVE_ALLOW_SELF_WRITE_MANIFEST=0 to stop auto-updates." } block = start_tag + "\n# (This block is auto-written by Hive to record what datasets/files\n# have already been converted into memory (curves). Do not edit by hand.)\n" block += "MEMORY_MANIFEST = " + json.dumps(payload, indent=4, ensure_ascii=False) + "\n" block += end_tag new_src = head + block + tail tmp = target + ".tmp" try: with open(tmp, "w", encoding="utf-8") as f: f.write(new_src) os.replace(tmp, target) except Exception as e: return False, f"write error: {e}" return True, f"manifest updated ({_human_ts(payload['updated_ts'])})" def _curves_present(curve_dir: str) -> bool: idx = os.path.join(curve_dir, "faiss.index") meta = os.path.join(curve_dir, "meta.jsonl") return os.path.exists(idx) and os.path.getsize(idx) > 0 and os.path.exists(meta) def _extract_archive(archive_path: str, dest_dir: str) -> bool: os.makedirs(dest_dir, exist_ok=True) try: if archive_path.endswith(".tar.gz") or archive_path.endswith(".tgz"): with tarfile.open(archive_path, "r:gz") as tf: tf.extractall(dest_dir) return True if archive_path.endswith(".zip"): with zipfile.ZipFile(archive_path, "r") as z: z.extractall(dest_dir) return True except Exception as e: with open(os.path.join(CFG.get("STATE_DIR","./state"), "restore_error.log"), "a", encoding="utf-8") as f: f.write(f"extract: {e}\n") return False def _restore_from_local_archive(curve_dir: str): arc = CFG.get("CURVES_ARCHIVE_LOCAL") or "curves.tar.gz" if not arc or not os.path.exists(arc): return False, "no local archive" ok = _extract_archive(arc, curve_dir) return (ok, "restored from local archive" if ok else "local extract failed") def _restore_from_url(curve_dir: str): url = (CFG.get("CURVES_ARCHIVE_URL") or "").strip() if not url: return False, "no URL provided" try: tmp = os.path.join(tempfile.gettempdir(), f"curves_{int(time.time())}.pkg") urllib.request.urlretrieve(url, tmp) ok = _extract_archive(tmp, curve_dir) try: os.remove(tmp) except: pass return (ok, "restored from URL" if ok else "URL extract failed") except Exception as e: # type: ignore open(os.path.join(CFG.get("STATE_DIR","./state"), "restore_error.log"), "a", encoding="utf-8").write(f"url: {e}\n") return False, "URL download error" def _restore_from_hf_dataset(curve_dir: str): repo_id = (CFG.get("CURVES_HF_DATASET") or "").strip() sub = (CFG.get("CURVES_HF_SUBPATH") or "").strip() if not repo_id: return False, "no dataset repo" try: from huggingface_hub import snapshot_download, hf_hub_download cache = os.path.join("/tmp", "hf_curves_cache") token = CFG.get("HF_READ_TOKEN") or None for fname in ["curves.tar.gz", "curves.zip"]: try: fp = hf_hub_download(repo_id=repo_id, filename=(sub + "/" + fname) if sub else fname, token=token, local_dir=cache, local_dir_use_symlinks=False) if _extract_archive(fp, curve_dir): return True, f"restored from HF dataset file {fname}" except Exception: pass local_dir = snapshot_download(repo_id=repo_id, token=token, local_dir=cache, local_dir_use_symlinks=False) # auto-archive after each dataset if configured if CFG.get("HIVE_AUTO_ARCHIVE", True) and str(CFG.get("HIVE_AUTO_ARCHIVE_MODE","per_chain")).lower() == "per_dataset": try: _ok_arc, _ap = _archive_memory(curve_dir) # type: ignore open(os.path.join(CFG["STATE_DIR"], "archive_status.log"), "a", encoding="utf-8").write( json.dumps({"ts": time.time(), "mode": "per_dataset", "ok": _ok_arc, "path": _ap}) + "\n" ) except Exception as _e_arc: open(os.path.join(CFG["STATE_DIR"], "archive_error.log"), "a", encoding="utf-8").write( "per_dataset: " + str(_e_arc) + "\n" ) # type: ignore src = os.path.join(local_dir, sub) if sub else local_dir if os.path.isdir(src): for root, dirs, files in os.walk(src): rel = os.path.relpath(root, src) dest_root = os.path.join(curve_dir, rel) if rel != "." else curve_dir os.makedirs(dest_root, exist_ok=True) for fn in files: shutil.copy2(os.path.join(root, fn), os.path.join(dest_root, fn)) return True, "restored from HF dataset snapshot" return False, "HF snapshot missing subpath" except Exception as e: # type: ignore open(os.path.join(CFG.get("STATE_DIR","./state"), "restore_error.log"), "a", encoding="utf-8").write(f"hf: {e}\n") return False, "HF restore error" def restore_curves_if_missing(curve_dir: str): if not CFG.get("HIVE_CURVES_AUTO_RESTORE", True): return False, "auto-restore disabled" # type: ignore if _curves_present(curve_dir): return True, "memory present" ok, msg = _restore_from_local_archive(curve_dir) if ok and _curves_present(curve_dir): return True, msg ok, msg = _restore_from_url(curve_dir) if ok and _curves_present(curve_dir): return True, msg ok, msg = _restore_from_hf_dataset(curve_dir) if ok and _curves_present(curve_dir): return True, msg return False, "no restore source succeeded" def _archive_memory(curve_dir: str, archive_path: str=None) -> tuple: """Tar+gzip the memory directory to archive_path (default curves.tar.gz).""" try: import tarfile, tempfile as _tf ap = archive_path or CFG.get("HIVE_ARCHIVE_PATH","curves.tar.gz") or "curves.tar.gz" # write to temp then move for atomicity tmp = os.path.join(_tf.gettempdir(), f"curves_{int(time.time())}.tar.gz") with tarfile.open(tmp, "w:gz") as tar: tar.add(curve_dir, arcname="curves") os.replace(tmp, ap) return True, ap except Exception as e: try: open(os.path.join(CFG["STATE_DIR"], "archive_error.log"), "a", encoding="utf-8").write(str(e)+"\n") except Exception: pass return False, str(e) if not CFG.get("CURVES_AUTO_RESTORE", True): return False, "auto-restore disabled" if _curves_present(curve_dir): return True, "curves already present" ok, msg = _restore_from_local_archive(curve_dir) if ok and _curves_present(curve_dir): return True, msg ok, msg = _restore_from_url(curve_dir) if ok and _curves_present(curve_dir): return True, msg ok, msg = _restore_from_hf_dataset(curve_dir) if ok and _curves_present(curve_dir): return True, msg return False, "no restore source succeeded" # --- End Memory & Manifest Helpers --- # --- Staged Ingestion Orchestrator (auto) --- def _plan_sources(): srcs = [s.strip() for s in (CFG.get("INGEST_SOURCES") or "").split(",") if s.strip()] return srcs or (DEFAULT_SOURCES if "DEFAULT_SOURCES" in globals() else []) def _next_batch(done: list, all_sources: list, k: int): todo = [s for s in all_sources if s not in set(done)] return todo[:max(k,0)] def staged_ingest_once(curve_dir: str) -> dict: """Ingest a single stage (up to HIVE_INGEST_STAGE_SIZE datasets), respecting disk floor. Updates progress + manifest.""" try: import shutil, time as _t floor = int(CFG.get("HIVE_INGEST_MIN_FREE_GB", 8)) free_gb = shutil.disk_usage(".").free / (1024**3) if free_gb < floor: return {"ok": False, "reason": f"free disk {free_gb:.1f} GB < floor {floor} GB"} all_sources = _plan_sources() prog = _load_progress() batch = _next_batch(prog.get("done", []), all_sources, int(CFG.get("HIVE_INGEST_STAGE_SIZE",3))) if not batch: return {"ok": True, "reason": "all sources already ingested", "done": prog.get("done", [])} total_added = 0 actually_ingested = [] for ds in batch: added = ingest_all(curve_dir, [ds], scope="general") total_added += added actually_ingested.append(ds) prog["done"].append(ds) # check disk after each dataset free_gb = shutil.disk_usage(".").free / (1024**3) if free_gb < floor: break prog["stage"] = int(prog.get("stage", 0)) + 1 prog["ts"] = int(_t.time()) _save_progress(prog) # manifest update try: vecs = 0 try: # type: ignore vecs = CurveStore(curve_dir).index.ntotal except Exception: pass update_self_manifest(prog.get("done", []), int(vecs)) except Exception: pass return {"ok": True, "ingested": actually_ingested, "added_vectors_est": total_added, "stage": prog["stage"]} except Exception as _e: try: open(os.path.join(CFG.get("STATE_DIR","./state"), "ingest_error.log"), "a", encoding="utf-8").write(str(_e)+"\n") except Exception: pass return {"ok": False, "error": str(_e)} def staged_ingest_chain_if_enabled(curve_dir: str) -> dict: """Run 0..N stages this boot depending on HIVE_INGEST_CHAIN and HIVE_INGEST_CHAIN_MAX, with safety checks.""" if not CFG.get("HIVE_INGEST_STAGED", True): return {"ok": True, "reason": "staged disabled"} results = [] max_stages = max(0, int(CFG.get("HIVE_INGEST_CHAIN_MAX", 2))) if CFG.get("HIVE_INGEST_CHAIN", True) else (1 if CFG.get("HIVE_INGEST_NEXT") else 0) for i in range(max_stages): r = staged_ingest_once(curve_dir) results.append(r) if not r.get("ok", False): break if r.get("reason") == "all sources already ingested": break # stop if no items were ingested (e.g., disk floor hit immediately) if not r.get("ingested"): break # auto-archive after chain if configured if CFG.get("HIVE_AUTO_ARCHIVE", True) and str(CFG.get("HIVE_AUTO_ARCHIVE_MODE","per_chain")).lower() in ("per_chain","perdataset","per-dataset"): try: _ok_arc, _ap = _archive_memory(curve_dir) # type: ignore open(os.path.join(CFG["STATE_DIR"], "archive_status.log"), "a", encoding="utf-8").write(json.dumps({"ts":time.time(),"mode":"per_chain","ok":_ok_arc,"path":_ap})+"\n") except Exception as _e_arc: open(os.path.join(CFG["STATE_DIR"], "archive_error.log"), "a", encoding="utf-8").write("per_chain: "+str(_e_arc)+"\n") return {"ok": True, "chain_results": results} # --- End Staged Ingestion Orchestrator --- class Hive: def __init__(self, model_id: Optional[str]=None, device: Optional[str]=None): # --- try restoring memory if missing (local archive / URL / HF dataset) --- try: ok_restored, restore_msg = restore_curves_if_missing(CFG["CURVE_DIR"] if "CURVE_DIR" in CFG else CFG.get("MEMORY_DIR","./curves")) open(os.path.join(CFG["STATE_DIR"], "restore_status.log"), "a", encoding="utf-8").write(json.dumps({"ok":bool(ok_restored),"msg":restore_msg,"ts":time.time()})+"\n") except Exception as e: open(os.path.join(CFG["STATE_DIR"], "restore_error.log"), "a", encoding="utf-8").write("restore: "+str(e)+"\n") # --- staged ingestion chaining (run next stages automatically if enabled) --- try: _ing_chain = staged_ingest_chain_if_enabled(CFG["CURVE_DIR"] if "CURVE_DIR" in CFG else CFG.get("MEMORY_DIR","./curves")) open(os.path.join(CFG["STATE_DIR"], "ingest_chain_status.log"), "a", encoding="utf-8").write(json.dumps({"ts":time.time(),"chain":_ing_chain})+"\n") except Exception as e: open(os.path.join(CFG["STATE_DIR"], "ingest_error.log"), "a", encoding="utf-8").write("chain: "+str(e)+"\n") if ok_restored: try: if CurveStore(CFG["CURVE_DIR"] if "CURVE_DIR" in CFG else CFG.get("MEMORY_DIR","./curves")).index.ntotal > 0: _mark_offline_ready() except Exception: pass except Exception as e: open(os.path.join(CFG["STATE_DIR"], "restore_error.log"), "a", encoding="utf-8").write(f"restore: {e}\n") need_ingest=False if CFG["FORCE_REINGEST"]: need_ingest=True else: if not _curves_ready(CFG["CURVE_DIR"]) and CFG["BOOTSTRAP_INGEST"]: need_ingest=True if need_ingest: try: srcs=[s.strip() for s in (CFG["INGEST_SOURCES"] or "").split(",") if s.strip()] or DEFAULT_SOURCES ingest_all(CFG["CURVE_DIR"], srcs, scope="general") if CurveStore(CFG["CURVE_DIR"]).index.ntotal>0: _mark_offline_ready() except Exception as e: open(os.path.join(CFG["CURVE_DIR"],"ingest_error.log"),"a",encoding="utf-8").write(str(e)+"\n") self.store=CurveStore(CFG["CURVE_DIR"]) self.bus = MessageBus(print) self.curves = {} self.create_initial_curves(num_general=2, num_librarians=1) self.compiler=PromptCompiler(); self.engine=EngineCurve() if not model_id: model_id, info = pick_model() if CFG["LLM_AUTOSIZE"] else (CANDIDATES[0][0], {"device":"cpu"}) device = info.get("device","cpu") self.model_id=model_id or CFG["MODEL_OVERRIDE"] or CANDIDATES[0][0] trust=True; kwargs={} if torch and torch.cuda.is_available() and device=="cuda": kwargs.update(dict(torch_dtype=torch.float16, device_map="auto")) # --- Model / Tokenizer initialization (supports local transformers or Hugging Face Inference API) --- use_remote = CFG["HIVE_USE_HF_INFERENCE"] if use_remote: # Remote path using huggingface_hub.InferenceClient try: from huggingface_hub import InferenceClient except Exception as e: raise RuntimeError(f"HIVE_USE_HF_INFERENCE=1 but huggingface_hub is missing: {e}") endpoint = os.getenv("HIVE_HF_ENDPOINT","").strip() or None token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") or CFG["HF_READ_TOKEN"] self.client = InferenceClient(model=self.model_id if endpoint is None else None, token=token, timeout= int(os.getenv("HIVE_HF_TIMEOUT","60") or "60"), base_url=endpoint) # define a thin wrapper so downstream code can call self.pipe(prompt, **gen_kwargs) def _remote_pipe(prompt, max_new_tokens=256, do_sample=True, temperature=0.7, **kw): # Some endpoints require a stop sequence; fall back to "Assistant:" which our compiler uses. stop = kw.get("stop_sequences") or ["", "Assistant:"] resp = self.client.text_generation( prompt, max_new_tokens=int(max_new_tokens), temperature=float(temperature), do_sample=bool(do_sample), stop_sequences=stop, stream=False, ) return [{"generated_text": resp}] self.pipe = _remote_pipe self.tok = None self.model = None else: # Local path using transformers self.tok = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=trust) # Prefer half precision on GPU self.model = AutoModelForCausalLM.from_pretrained(self.model_id, trust_remote_code=trust, **kwargs) self.pipe = pipeline( "text-generation", model=self.model, tokenizer=self.tok, device=0 if (torch and torch.cuda.is_available() and device=="cuda") else -1 ) self.overlay=RuntimeOverlay() self.retrieval_k=6; self.decoding_temperature=0.7; self.web_threshold=0.40 self.overlay.apply_to(self) self.state_path=os.path.join(CFG["STATE_DIR"],"last_state.json") if not os.path.exists(self.state_path): _save_json(self.state_path, {"ok":True,"ts":time.time()}) # Preferred model (record exists) try: pref=json.load(open(os.path.join(OPT_DIR,"preferred_model.json"),"r",encoding="utf-8")) if isinstance(pref,dict) and pref.get("model_id") in CFG["OPT_MODEL_ALLOWLIST"]: pass except Exception: pass self.changes=ChangeManager(Hive) try: # --- Self-Optimization and Change Management --- self.bus.subscribe("hive:evaluate_performance", self._handle_optimization_trigger) self.bus.subscribe("hive:commit_change", self._commit_change) self.change_history = [] self.selfopt=SelfOptimizer(self); self.selfopt.start() except Exception: pass def summarize_for_memory(self, text:str, max_new_tokens:int=160)->str: prompt=("Condense the following content into 4–6 bullet points with names, dates, numbers, and a one-line takeaway. Keep it factual.\n\n" f"{text[:3000]}\n\nSummary:") out=self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False, temperature=0.01) return out[0]["generated_text"].split("Summary:",1)[-1].strip() def add_curve(self, text:str, meta:Dict, scope:str="general"): self.librarian.ingest_pairs([text],[meta],scope) def online_update(self, query_hint: Optional[str]=None)->Dict: if not CFG["ONLINE_ENABLE"]: return {"ok":False,"reason":"online disabled"} if not online_available(CFG["ONLINE_TIMEOUT"]): return {"ok":False,"reason":"offline"} seen=_load_json(ONLINE_DB, {}) urls=[u.strip() for u in (CFG["ONLINE_SOURCES"] or "").split(",") if u.strip()] items=fetch_rss(urls, timeout=CFG["ONLINE_TIMEOUT"], limit=30) added=0 for it in items: key=hashlib.sha1(((it.get("link") or "")+(it.get("title") or "")).encode("utf-8","ignore")).hexdigest() if key in seen: continue base=(it.get("title","")+"\n\n"+it.get("summary","")).strip() summ=self.summarize_for_memory(base) self.add_curve(summ, {"dataset":"online_rss","url":it.get("link"),"title":it.get("title"),"published":it.get("published")}, scope="general") seen[key]=int(time.time()); added+=1 _save_json(ONLINE_DB, seen); return {"ok":True,"added":added} def web_update_and_store(self, query:str, max_docs:int, timeout:int)->int: if not (CFG["ONLINE_ENABLE"] and online_available(timeout)): return 0 hits=web_search_snippets(query, max_results=max_docs, timeout=timeout); added=0 for h in hits: body=(h.get("title","")+"\n\n"+h.get("body","")).strip() if not body: continue summ=self.summarize_for_memory(body) meta={"dataset":"web_update","source":h.get("href",""),"title":h.get("title",""),"ts":time.time()} self.add_curve(summ, meta, scope="general"); added+=1 return added def create_curve(self, curve_type, params=None): curve_id = f"{curve_type}:{int(time.time()*1000)}" if curve_type == "general": curve = GeneralCurve(curve_id, self.bus, print, self.pipe, self.compiler, params) elif curve_type == "librarian": curve = LibrarianCurve(self.store, params) # type: ignore curve.set_bus(self.bus, curve_id, print) else: return None self.curves[curve_id] = curve return curve def create_initial_curves(self, num_general, num_librarians): specs = [["grammar", "tense", "adjective"], ["story", "robot", "music"]] for i in range(num_general): self.create_curve("general", params={"specializations": specs[i % len(specs)]}) for i in range(num_librarians): self.create_curve("librarian", params={"ensemble_id": f"E{i}"}) def chat(self, message:str, effective_role:str, caller_id: Optional[str], k:int=None, max_new_tokens:int=256, temperature:float=None)->str: task_id = f"task:{int(time.time())}" sub_task_queries = list(set(re.findall(r'\b\w{3,}\b', message.lower()))) if not sub_task_queries: return "Please provide a more specific query." num_sub_tasks = len(sub_task_queries) results = {} results_lock = threading.Lock() all_results_event = threading.Event() def on_task_result(topic, msg): if msg.get("task_id") == task_id: with results_lock: results[msg["sub_task_id"]] = msg["result"] if len(results) >= num_sub_tasks: all_results_event.set() self.bus.subscribe("task:result", on_task_result) for query in sub_task_queries: sub_task_id = f"subtask:{query}:{int(time.time())}" fitness_scores = {} fitness_event = threading.Event() def on_fitness_response(topic, msg): fitness_scores[msg["curve_id"]] = msg["fitness"] if len(fitness_scores) >= len([c for c in self.curves.values() if c.type == 'general' and c.is_active]): fitness_event.set() fitness_reply_topic = f"fitness-reply:{sub_task_id}" self.bus.subscribe(fitness_reply_topic, on_fitness_response) self.bus.post("task:general", {"type": "get_fitness", "text_context": query, "reply_to": fitness_reply_topic}) fitness_event.wait(timeout=0.5) self.bus.unsubscribe(fitness_reply_topic, on_fitness_response) if not fitness_scores: continue best_curve_id = max(fitness_scores, key=fitness_scores.get) self.bus.post("task:general", {"type": "execute_sub_task", "task_id": task_id, "sub_task_id": sub_task_id, "assignee": best_curve_id, "payload": {"text": query}, "user_id": caller_id}) all_results_event.wait(timeout=4.0) self.bus.unsubscribe("task:result", on_task_result) if not results: return "The hive is thinking, but the workers did not respond in time." final_answer = "From what I could gather:\n" + "\n".join(f"- For '{sub_task_id.split(':')[1]}': {result.get('snippet', 'no information.')}" for sub_task_id, result in sorted(results.items())) return final_answer def _run_benchmark(self, hive_instance): """Runs a set of benchmark tasks against a given hive instance.""" benchmark_tasks = ["what is machine learning", "explain quantum physics", "define philosophy"] total_time = 0 success_count = 0 for task in benchmark_tasks: start_time = time.monotonic() try: # This is a conceptual stand-in for a full, isolated simulation environment. # type: ignore result = hive_instance.chat(task, effective_role="user", caller_id="benchmark", timeout=2.0) if "no information" not in result and "did not respond" not in result: success_count += 1 except Exception: pass # Task failed total_time += (time.monotonic() - start_time) avg_time = total_time / len(benchmark_tasks) if benchmark_tasks else float('inf') success_rate = success_count / len(benchmark_tasks) if benchmark_tasks else 0 # Fitness score: lower is better (lower time, higher success) fitness = avg_time / (success_rate + 0.01) return fitness def _run_simulation(self, proposed_change): """Runs a simulation of a proposed change in a sandboxed copy of the hive.""" def simulation_worker(): print(f"Simulating change: {proposed_change['action']} on {proposed_change.get('curve_id')}") baseline_fitness = self._run_benchmark(self) print(f"Baseline hive fitness: {baseline_fitness:.2f}") # This is a conceptual deep copy. A true deep copy of a threaded system is complex. sim_hive = self.__class__(model_id=self.model_id) # Re-create a similar hive sim_hive.curves = {k: v for k, v in self.curves.items()} # Shallow copy of curves try: action = proposed_change["action"] curve_id = proposed_change.get("curve_id") if action == "prune" and curve_id in sim_hive.curves: sim_hive.curves.pop(curve_id) elif action == "add": sim_hive.create_curve(proposed_change["curve_type"], proposed_change.get("params")) elif action == "edit" and curve_id in sim_hive.curves: # In a real sim, you'd need to properly edit the curve object pass sim_fitness = self._run_benchmark(sim_hive) print(f"Simulated hive fitness: {sim_fitness:.2f}") is_beneficial = sim_fitness < baseline_fitness if is_beneficial: print(f"Simulation shows improvement for action '{action}'. Committing change.") self.bus.post("hive:commit_change", proposed_change) else: print(f"Simulation shows no improvement for action '{action}'. Aborting change.") finally: sim_hive.selfopt.stop = True # type: ignore # Stop background threads in the copy sim_thread = threading.Thread(target=simulation_worker, daemon=True) sim_thread.start() def _get_performance_metrics(self): """Collects performance metrics from the hive.""" curves_data = [c.to_dict() for c in self.curves.values()] tasks_completed = sum(c['perf_metrics']['tasks_completed'] for c in curves_data) total_time = sum(c['perf_metrics']['total_processing_time'] for c in curves_data) errors = sum(c['perf_metrics']['errors'] for c in curves_data) error_rate = errors / max(1, tasks_completed) avg_task_time = total_time / max(1, tasks_completed) return {"error_rate": error_rate, "avg_task_time": avg_task_time} def _commit_change(self, topic, change): """Applies a validated change to the live hive and records it.""" action = change["action"] curve_id = change.get("curve_id") before_metrics = self._get_performance_metrics() if action == "prune" and curve_id in self.curves: curve = self.curves.pop(curve_id) curve.shutdown() elif action == "add": self.create_curve(change["curve_type"], change.get("params")) elif action == "edit" and curve_id in self.curves: self.bus.post(f"curve:{curve_id}", {"type": "admin:edit_curve", "new_params": change["new_params"]}) after_metrics = self._get_performance_metrics() change["impact"] = {"before": before_metrics, "after": after_metrics} self.change_history.append(change) print(f"Change committed: {action} on {curve_id}. Impact: {change['impact']}") def _handle_optimization_trigger(self, topic, message): """Event-driven entry point for continuous self-optimization.""" trigger = message.get("trigger") curve_id = message.get("curve_id") print(f"Optimization triggered by '{trigger}' from curve {curve_id}.") # For this prototype, we'll just simulate a prune action as an example change = {"action": "prune", "curve_id": curve_id} self._run_simulation(change) # ----------- OCR helper ----------- def ocr_text_from_image_bgr(image_bgr)->str: if not (_HAVE_CV and _HAVE_TESS): return "" gray=cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) # type: ignore return pytesseract.image_to_string(gray) or "" # --------------- UI --------------- HELP=f""" **Admin/User mode**: Admins (general/super) and Owner log in with password (Owner also needs second factor). After login choose Admin or User mode. **Owner-only code edits** are enforced via Change Manager policy. Hive can sandbox, test, and propose; code writes require Owner approval (`OPT_AUTO_APPLY=1`) unless Owner applies manually. **Offline/Online**: Works fully offline from curves. If online and enabled, fetches RSS/web snippets âžĄī¸ summarizes locally âžĄī¸ saves to curves (persists offline). **Voice**: Faster-Whisper ASR (auto language), Piper TTS mixed-language, phonics hints (English). **Privacy**: Sensitive/first-person inputs route to user-private library; neutral info to general. """ from collections import defaultdict class MessageBus: """A thread-safe, centralized event bus for curve communication.""" def __init__(self, hive_logger): self.topics = defaultdict(list) self.message_queue = queue.Queue() self._lock = threading.Lock() self.running = True self.logger = hive_logger self.worker_thread = threading.Thread(target=self._process_messages, daemon=True) self.worker_thread.start() def subscribe(self, topic, callback): with self._lock: self.topics[topic].append(callback) def unsubscribe(self, topic, callback): with self._lock: if topic in self.topics: self.topics[topic] = [cb for cb in self.topics[topic] if cb != callback] def post(self, topic, message): self.message_queue.put((topic, message)) def _process_messages(self): while self.running: try: topic, message = self.message_queue.get(timeout=1) with self._lock: subscribers = self.topics.get(topic, []) + self.topics.get('*', []) for callback in subscribers: try: callback(topic, message) except Exception as e: self.logger(f"Error in callback for topic {topic}: {e}") # type: ignore except queue.Empty: continue class Curve: def __init__(self, curve_id, curve_type, bus, logger, params=None): self.id = curve_id self.type = curve_type self.bus = bus self.logger = logger self.params = params or {} self.created_at = time.time() self.state = {"status": "initializing"} self.perf_metrics = { "messages_processed": 0, "tasks_completed": 0, "errors": 0, "last_active_ts": self.created_at, "total_processing_time": 0.0 } self._lock = threading.Lock() self.is_active = True self.state['status'] = 'active' def handle_message(self, topic, message): self.log_activity() if message.get("type") == "shutdown": self.shutdown() def log_activity(self): with self._lock: self.perf_metrics["last_active_ts"] = time.time() self.perf_metrics["messages_processed"] += 1 def shutdown(self): with self._lock: if not self.is_active: return self.is_active = False self.state['status'] = 'shutdown' # type: ignore self.logger(f"Curve {self.id} shutting down.") def to_dict(self): with self._lock: return { "id": self.id, "type": self.type, "params": self.params, "created_at": self.created_at, "state": self.state, "perf_metrics": self.perf_metrics } class GeneralCurve(Curve): def __init__(self, curve_id, bus, logger, llm_pipe, compiler, params=None): super().__init__(curve_id, "general", bus, logger, params) self.specializations = self.params.get("specializations", []) self.bus.subscribe("task:general", self.handle_message) self.llm_pipe = llm_pipe self.compiler = compiler self.local_cache = {} def handle_message(self, topic, message): super().handle_message(topic, message) msg_type = message.get("type") if msg_type == "get_fitness": self._get_fitness(message) if msg_type == "execute_sub_task" and message.get("assignee") == self.id: self._execute_sub_task(message) # Trigger a performance evaluation after completing a task. if msg_type == "execute_sub_task": self.bus.post("hive:evaluate_performance", {"curve_id": self.id, "trigger": "task_complete"}) def _execute_sub_task(self, message): start_time = time.monotonic() sub_task_id = message.get("sub_task_id") query_text = message.get("payload", {}).get("text") self.logger(f"{self.id} starting sub-task: {sub_task_id} for query '{query_text}'") # For this prototype, we'll use the librarian to get one relevant snippet. # A more advanced version would retrieve multiple snippets. query_id = f"query:{sub_task_id}" response_event = threading.Event() query_result = None def on_query_response(topic, msg): nonlocal query_result if msg.get("query_id") == query_id: query_result = msg.get("payload") response_event.set() reply_topic = f"response:{self.id}:{query_id}" self.bus.subscribe(reply_topic, on_query_response) self.bus.post("query:librarian", {"type": "find_data", "query_id": query_id, "text": query_text, "reply_to_topic": reply_topic}) response_event.wait(timeout=1.5) self.bus.unsubscribe(reply_topic, on_query_response) snippets = [query_result] if query_result else [] prompt = self.compiler.compile(query_text, snippets) out = self.llm_pipe(prompt, max_new_tokens=64, do_sample=True, temperature=0.7) reply = out[0]["generated_text"].split("Assistant:",1)[-1].strip() final_answer = {"snippet": reply, "source": query_result.get('source') if query_result else 'internal'} self.bus.post("task:result", {"task_id": message.get("task_id"), "sub_task_id": sub_task_id, "status": "completed", "result": final_answer}) def _get_fitness(self, message): # type: ignore with self._lock: cost = self.perf_metrics["tasks_completed"] + (self.perf_metrics["total_processing_time"] / 60.0) errors = self.perf_metrics.get("errors", 0) error_penalty = errors * 2 task_keywords = set(re.findall(r'\b\w{3,}\b', message.get("text_context", "").lower())) specialization_score = len(task_keywords.intersection(self.specializations)) if task_keywords and self.specializations else 0 fitness = (specialization_score * 10) - cost - error_penalty self.bus.post(message["reply_to"], {"curve_id": self.id, "fitness": fitness}) def launch_ui(): hive=Hive(); store=CurveStore(CFG["CURVE_DIR"]); lib=LibrarianCurve(store) with gr.Blocks(title="Hive 🐝 Full Merged Optimized") as demo: gr.Markdown(f"## {CFG['AGENT_NAME']} 🐝 Full Merged, Offline-first + Online updates + Internal Optimization") with gr.Row(): login_name=gr.Textbox(label="Name or ID") login_pass=gr.Textbox(label="Password (admins only)", type="password") login_second=gr.Textbox(label="Second (owner only)", type="password") login_btn=gr.Button("Login") login_status=gr.Markdown() uid_state=gr.State(None); role_state=gr.State("guest"); mode_state=gr.State("user"); phonics_state=gr.State(False) def do_login(nm,pw,sec): ok, info=attempt_login(nm or "", pw or "", sec or None) d=_load_users(); u,_=_find_user(d, nm or "") role=u["role"] if u else "guest" prof=_load_json(ADAPT_DB,{}).get(u["id"] if u else "guest",{}); phon_on=bool(prof.get("phonics_on",False)) return info,(u["id"] if u else None),role,"user",phon_on login_btn.click(do_login,[login_name,login_pass,login_second],[login_status, uid_state, role_state, mode_state, phonics_state]) mode_picker=gr.Radio(choices=["user","admin"], value="user", label="Mode (admins/owner only)") def set_mode(role, pick): if role not in ("admin_general","admin_super","owner"): return "user" return pick mode_picker.change(set_mode, [role_state, mode_picker], [mode_state]) with gr.Tab("Chat"): chat=gr.Chatbot(height=420) msg=gr.Textbox(placeholder=f"Talk to {CFG['AGENT_NAME']}") def talk(m, uid, role, mode, hist): eff = role if mode=="admin" else "user" reply=hive.chat(m or "", eff, caller_id=uid) # privacy routing personal = False if re.search(r"\b(my|mine|me|I|our|we)\b", (m or ""), re.I) and re.search(r"\b(password|address|email|phone|ssn|school|kid|medical|bank|card|passport)\b", (m or ""), re.I): personal = True scope = f"user:{uid}" if (uid and personal) else "general" lib.ingest_pairs([m],[{"dataset":"chat"}], scope=scope) return hist+[[m, reply]], "" msg.submit(talk,[msg,uid_state,role_state,mode_state,chat],[chat,msg]) with gr.Tab("Voice"): gr.Markdown("### Voice login / ASR / Mixed-language TTS / Phonics") mic=gr.Audio(sources=["microphone"], type="filepath", label="Speak (5–10s)") asr_lang=gr.Dropdown(choices=["auto","en","es","fr","de","zh"], value="auto", label="ASR language (force or auto)") phonics_toggle=gr.Checkbox(value=False, label="Enable Phonics assist (English pronunciation help)") transcribe_btn=gr.Button("Transcribe") transcript=gr.Textbox(label="Transcript") who_btn=gr.Button("Login by Voice (users only)") who_status=gr.Markdown() reply_btn=gr.Button("Reply + Speak") reply_text=gr.Textbox(label="Assistant Reply") reply_audio=gr.Audio(type="filepath", label="Assistant Voice") def do_transcribe(path, asr_lg, uid): if not path: return "" text=asr_transcribe(path, uid, None if asr_lg=="auto" else asr_lg) prof=_load_json(ADAPT_DB,{}); p=prof.get(uid or "guest",{}) dur=librosa.get_duration(filename=path) or 0.001 syl=len(re.findall(r"[aeiouyAEIOUY]+", text)); rate=(syl/dur) if dur > 0 else 0 p["rate"]=0.8*p.get("rate", rate)+0.2*rate chunks=lid_chunk(text); en_len=sum(len(c) for c,l in chunks if l.startswith("en")); all_len=sum(len(c) for c,l in chunks) if all_len>0: ratio=en_len/all_len; p["codeswitch_en"]=0.8*p.get("codeswitch_en",ratio)+0.2*ratio prof[uid or "guest"]=p; _save_json(ADAPT_DB,prof) scope="user:"+uid if uid and ("my " in text.lower() or "I " in text) else "general" lib.ingest_pairs([text],[{"dataset":"voice_asr"}], scope=scope) return text transcribe_btn.click(do_transcribe,[mic,asr_lang,uid_state],[transcript]) def do_login_voice(path): if not path: return "No audio.", None, "guest", "user" uidv=identify_voice(path) if not uidv: return "Voice not recognized. You can enroll as a new user.", None, "guest", "user" d=_load_users() for grp in ["users","admins_general","admins_super"]: for u in d.get(grp,[]): if u["id"]==uidv: if u["role"] in ("admin_general","admin_super"): return "Admin roles require password login.", None, "guest", "user" return f"Welcome back, {u['name']} (user).", uidv, "user", "user" if d["owner"]["id"]==uidv: return "Owner must login with password + second factor.", None, "guest", "user" return "Matched unknown id; please login manually.", None, "guest", "user" who_btn.click(do_login_voice,[mic],[who_status, uid_state, role_state, mode_state]) def do_reply(uid, role, mode, text, phon_toggle): if not text: return "", None eff = role if mode=="admin" else "user" prompt=text if phon_toggle: notes=[] for ch, lg in lid_chunk(text): if lg.startswith("en"): for w in re.findall(r"\b[A-Za-z][A-Za-z-']+\b", ch): if len(w)>=6 or w[0].isupper(): notes.append(f"{w}: {phonics(w)}") if notes: prompt += "\n\n(Phonics)\n" + "\n".join(f"- {n}" for n in notes[:10]) prof=_load_json(ADAPT_DB,{}); p=prof.get(uid or "guest",{}); p["phonics_on"]=True; prof[uid or "guest"]=p; _save_json(ADAPT_DB,prof) ans=hive.chat(prompt, eff, caller_id=uid) wav=synthesize_multilang(ans, CFG["TTS_LANG"]); return ans, wav reply_btn.click(do_reply,[uid_state, role_state, mode_state, transcript, phonics_toggle],[reply_text, reply_audio]) with gr.Accordion("Voice enrollment (add your voiceprint)", open=False): enroll_audio=gr.Audio(sources=["microphone"], type="filepath", label="Record 5–10s") enroll_btn=gr.Button("Enroll voice for current user"); enroll_status=gr.Markdown() def do_enroll(uid, path): if not uid: return "Login or specify user first." if not path: return "No audio." enroll_voice(uid, path); return "Voice enrolled." enroll_btn.click(do_enroll,[uid_state, enroll_audio],[enroll_status]) with gr.Accordion("New user by voice (no password)", open=False): nu_audio=gr.Audio(sources=["microphone"], type="filepath", label="Record 5–10s") nu_name=gr.Textbox(label="Your name") nu_lang=gr.Dropdown(choices=["en","es","fr","de","zh"], value="en", label="Preferred language") nu_btn=gr.Button("Create user from my voice"); nu_status=gr.Markdown() def do_new_user(path, name, lang): if not path or not name: return "Provide audio and a name." d=_load_users(); uid=f"user:{int(time.time())}" entry={"id":uid,"name":name,"role":"user","pass":"","prefs":{"activation_names":[CFG['AGENT_NAME']],"language":lang}} d["users"].append(entry); _save_json(USERS_DB,d); enroll_voice(uid, path) return f"Created user {name} ({uid}) with enrolled voice." nu_btn.click(do_new_user,[nu_audio, nu_name, nu_lang],[nu_status]) with gr.Tab("Online & Wi-Fi"): gr.Markdown("### Auto-connect to known Wi-Fi (non-blocking) and fetch online updates") wifi_status=gr.Markdown("Wi-Fi: checking...") connect_now=gr.Button("Try auto-connect now (non-blocking)") online_now=gr.Button("Fetch updates now"); online_status=gr.Markdown() connect_now.click(lambda: (NET.kick_async() or True) and "Auto-connect started in background.", [], [wifi_status]) online_now.click(lambda: ("Added %s new summaries to curves." % (Hive().online_update().get("added",0))), [], [online_status]) with gr.Tab("Help"): gr.Markdown(HELP) # ------ Admin Controls (no separate tab; visible in Admin mode) ------ with gr.Accordion("Admin Controls (switch to Admin mode to enable)", open=False, visible=True) as admin_controls: admin_info=gr.Markdown("Switch to **Admin mode** above to use these tools.") target=gr.Textbox(label="Target name or id") new_name=gr.Textbox(label="New name") new_pass=gr.Textbox(label="New password") new_role=gr.Dropdown(choices=["owner","admin_super","admin_general","user"], value="user", label="New role") add_name=gr.Textbox(label="Add: name") add_role=gr.Dropdown(choices=["admin_super","admin_general","user"], value="user", label="Add role") add_pass=gr.Textbox(label="Add password (admins only)") add_btn=gr.Button("Add user/admin") rename_btn=gr.Button("Rename") pass_btn=gr.Button("Change password") role_btn=gr.Button("Change role") out=gr.Markdown() def is_admin(mode, role): return (mode=="admin") and (role in ("admin_general","admin_super","owner")) def do_add(mode, role, caller, nm, rl, pw): if not is_admin(mode, role): return "Switch to Admin mode to use this." d=_load_users(); cu,_=_find_user(d, caller or "") if not cu: return "Login first as admin." if rl not in PERMS.get(cu["role"],{}).get("can_add",[]): return f"{cu['role']} cannot add {rl}." uid=f"{rl}:{int(time.time())}" entry={"id":uid,"name":nm,"role":rl,"pass":pw if rl!='user' else "", "prefs":{"activation_names":[CFG["AGENT_NAME"]],"language":"en"}} if rl=="owner": d["owner"]=entry elif rl=="admin_super": d["admins_super"].append(entry) elif rl=="admin_general": d["admins_general"].append(entry) else: d["users"].append(entry) _save_json(USERS_DB,d); return f"Added {rl}: {nm}" add_btn.click(do_add, [mode_state, role_state, uid_state, add_name, add_role, add_pass], [out]) def do_rename(mode, role, caller, tgt, nm): if not is_admin(mode, role): return "Switch to Admin mode to use this." d=_load_users(); u,_=_find_user(d, tgt or "") if not u: return "Target not found." cu,_=_find_user(d, caller or "") if not cu: return "Login first." if u["role"] in PERMS.get(cu["role"],{}).get("can_edit_profile_of",[]): u["name"]=nm; _save_json(USERS_DB,d); return "Renamed." return "Not allowed." rename_btn.click(do_rename,[mode_state, role_state, uid_state, target, new_name],[out]) def do_pass(mode, role, caller, tgt, pw): if not is_admin(mode, role): return "Switch to Admin mode to use this." d=_load_users(); u,_=_find_user(d, tgt or "") if not u: return "Target not found." cu,_=_find_user(d, caller or "") if not cu: return "Login first." if u["role"] in PERMS.get(cu["role"],{}).get("can_edit_profile_of",[]): u["pass"]=pw; _save_json(USERS_DB,d); return "Password changed." return "Not allowed." pass_btn.click(do_pass,[mode_state, role_state, uid_state, target, new_pass],[out]) def do_role(mode, role, caller, tgt, rl): if not is_admin(mode, role): return "Switch to Admin mode to use this." d=_load_users(); u,_=_find_user(d, tgt or "") if not u: return "Target not found." cu,_=_find_user(d, caller or ""); if not cu: return "Login first." allowed_new = {"owner":["owner","admin_super","admin_general","user"], "admin_super":["admin_general","user"], "admin_general":["admin_general","user"]}.get(cu["role"], []) if u["role"] not in PERMS.get(cu["role"],{}).get("can_edit_role_of",[]) or rl not in allowed_new: return f"Not allowed to set {rl}." for grp in ["admins_super","admins_general","users"]: d[grp]=[x for x in d[grp] if x["id"]!=u["id"]] if rl=="owner": d["owner"]=u; u["role"]="owner" elif rl=="admin_super": d["admins_super"].append(u); u["role"]="admin_super" elif rl=="admin_general": d["admins_general"].append(u); u["role"]="admin_general" else: d["users"].append(u); u["role"]="user" _save_json(USERS_DB,d); return f"Role set to {rl}." role_btn.click(do_role,[mode_state, role_state, uid_state, target, new_role],[out]) # ------ Internal Optimization controls (Owner-gated) ------ gr.Markdown("### Internal Optimization (Change Manager)") prop_kind=gr.Dropdown(choices=["model","package","code"], value="model", label="Proposal type") prop_name=gr.Textbox(label="Model ID / Package Name") prop_ver=gr.Textbox(label="Package version (optional)") prop_reason=gr.Textbox(label="Why this change?") prop_patch=gr.Code(label="Code patch (for 'code' proposals): paste full replacement or diff") propose_btn=gr.Button("Propose"); test_btn=gr.Button("Test in sandbox"); apply_btn=gr.Button("Apply (policy-checked)") opt_out=gr.Markdown() _last = {"id": None, "obj": None} def do_propose(kind,name,ver,reason,patch): cp=ChangeProposal(kind=kind,name=name or "",version=ver or "",reason=reason or "",patch_text=patch or "") pid=hive.changes.propose(cp); _last["id"]=pid; _last["obj"]=cp return f"Proposed {kind}: {name or '(code patch)'} (id:{pid})" def do_test(): if not _last["obj"]: return "No proposal in memory. Submit one first." res=hive.changes.test_and_compare(_last["id"], _last["obj"]); return json.dumps(res, indent=2) def do_apply(role, mode): if role not in ("admin_super","owner") or mode!="admin": return "Only admin_super or owner may apply." if not _last["obj"]: return "No proposal loaded." res=hive.changes.test_and_compare(_last["id"], _last["obj"]) if not res.get("ok"): return f"Test failed: {res.get('reason','unknown')}" if _last["obj"].kind=="code" and role!="owner" and not CFG["OPT_AUTO_APPLY"]: return "Awaiting Owner approval for code changes." # type: ignore ok,msg=hive.changes.apply(res); return msg if ok else f"Apply failed: {msg}" propose_btn.click(do_propose, [prop_kind,prop_name,prop_ver,prop_reason,prop_patch],[opt_out]) test_btn.click(lambda: do_test(), [], [opt_out]) apply_btn.click(do_apply, [role_state, mode_state], [opt_out]) demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860"))) # ----------- entry ----------- if __name__=="__main__": if ENV("HIVE_LAUNCH_UI","1",bool): launch_ui() else: h=Hive(); print("CLI mode. Type and press Enter (Ctrl+C to exit).") try: while True: s=input("> ").strip() if not s: continue print(h.chat(s, effective_role="user", caller_id=None)) except KeyboardInterrupt: pass