Hive_5 / app.py
Paulhayes's picture
Create app.py
185cd06 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
# HIVE 🐝 FULL MERGED ALL-IN-ONE **OPTIMIZED**
# Offline-first + Online updates + Auto Wi-Fi + RBAC + Multilingual Voice (ASR/TTS + Phonics)
# + Internal Optimization Stack (Change Manager: propose ➡️ sandbox ➡️ A/B test ➡️ apply/rollback with Owner policy)
# Upload this single file and requirements.txt to a Hugging Face Space (or run locally).
# - python app.py
# --- BEGIN MEMORY MANIFEST (auto-updated) ---
# (manifest placeholder) -- runtime prompt logic is applied inside `Hive.chat`
# Set HuggingFace cache to use local project directory for faster loading
from pathlib import Path as _Path
import os
import sys
import subprocess
import re
import json
import time
import platform
import threading
import unicodedata
import hashlib
import urllib.request
import urllib.parse
try:
from importlib import metadata as importlib_metadata
except Exception:
importlib_metadata = None
import urllib.parse
import tempfile
import tarfile
import asyncio
import socket
import shutil
import importlib.util
import multiprocessing
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple, Iterable
from collections import OrderedDict
from collections.abc import Iterable as ABCIterable
_project_root = _Path(__file__).parent.absolute()
os.environ.setdefault("HF_HOME", str(_project_root / ".cache" / "huggingface"))
# ----------- environment detection & bootstrap -----------
# Replit-specific runtime detection removed — support deprecated. Use `requirements.txt` or container images
# for reproducible environments. This code now targets HF Spaces and local/container deployments.
def _ensure(pkgs: List[str]):
"""Install packages if missing. Intended for HF Spaces/local deployments.
Runtime-managed environments should prefer a pinned `requirements.txt` or prebuilt image.
"""
# Always attempt to ensure packages are installed at runtime when possible.
for p in pkgs:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "--quiet", p],
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
except Exception as e:
print(f"[BOOTSTRAP] Could not install {p}: {e}")
# Import core dependencies with graceful fallbacks
import collections, logging
import random
# Track missing runtime dependencies detected during imports
MISSING_DEPS = []
try:
import numpy as np
except ImportError:
MISSING_DEPS.append("numpy>=1.24.0")
np = None
try:
import gradio as gr
except ImportError:
MISSING_DEPS.append("gradio>=4.44.0")
gr = None
# --- Setup Logging ---
# This is done early to capture logs from the entire bootstrap process.
HIVE_HOME_LOG_DIR = os.path.join(os.getenv("HIVE_HOME", "./hive_data"), "system", "logs")
os.makedirs(HIVE_HOME_LOG_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(HIVE_HOME_LOG_DIR, "hive.log")
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(LOG_FILE_PATH),
logging.StreamHandler(sys.stdout) # Keep console logging
],
force=True
)
import struct, queue
# Critical imports - if these fail, show installation instructions
CRITICAL_DEPS = ["numpy", "gradio", "psutil", "transformers", "torch", "sounddevice", "peft"]
MISSING_DEPS = []
try:
from peft import LoraConfig, get_peft_model, TaskType, PeftModel
except ImportError:
MISSING_DEPS.append("peft>=0.12.0")
# Check if critical imports succeeded
CRITICAL_IMPORTS_OK = len(MISSING_DEPS) == 0
# Required minimum versions for key packages
REQUIRED_VERSIONS = {
"transformers": ">=4.30.0",
"torch": ">=2.0.0",
"sentencepiece": ">=0.1.98",
"peft": ">=0.12.0",
"gradio": ">=4.44.0",
}
def _parse_version_spec(spec: str):
# simplistic parser for specs like '>=1.2.3'
if spec.startswith(">="):
return (">=", spec[2:])
if spec.startswith("=="):
return ("==", spec[2:])
return (">=", spec)
def _version_satisfies(installed: str, spec: str) -> bool:
if not installed:
return False
op, ver = _parse_version_spec(spec)
try:
from packaging.version import Version, InvalidVersion
except Exception:
return True
try:
if op == ">=":
return Version(installed) >= Version(ver)
if op == "==":
return Version(installed) == Version(ver)
except Exception:
return False
return True
def get_installed_versions():
out = {}
for pkg in REQUIRED_VERSIONS.keys():
ver = None
try:
if importlib_metadata is not None:
ver = importlib_metadata.version(pkg)
except Exception:
try:
m = __import__(pkg)
ver = getattr(m, "__version__", None)
except Exception:
ver = None
out[pkg] = ver
# Add huggingface hub if available
try:
import huggingface_hub as _hf
out['huggingface_hub'] = getattr(_hf, '__version__', None)
except Exception:
out['huggingface_hub'] = None
return out
def check_and_maybe_install(required_versions: dict, auto_install: bool = False):
"""Checks installed package versions and optionally installs missing/old ones.
Returns a dict of statuses and a list of missing entries.
"""
statuses = {}
missing = []
installed = get_installed_versions()
for pkg, spec in required_versions.items():
inst = installed.get(pkg)
ok = _version_satisfies(inst, spec)
statuses[pkg] = {"installed": inst, "required": spec, "ok": ok}
if not ok:
missing.append(f"{pkg}{spec}")
if auto_install:
try:
print(f"[BOOTSTRAP] Auto-installing {pkg}{spec}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", f"{pkg}{spec}"])
except Exception as e:
print(f"[BOOTSTRAP] Auto-install of {pkg} failed: {e}")
return statuses, missing
# Lightweight fallback LLM for degraded environments (no transformers)
class FallbackLLM:
"""Improved lightweight fallback model for degraded environments.
Provides deterministic, template-based replies for common user intents
(definitions, translations, grammar corrections, short summaries) and
includes a clear limitation notice on each response.
This is intentionally conservative and intended only as a graceful
fallback when real LLMs are unavailable.
"""
def __init__(self, name: str = "fallback-v1"):
self.name = name
def _limit_note(self) -> str:
return (
"\n\n---\nNOTICE: You are interacting with a lightweight fallback mode. "
"Responses may be incomplete or approximate. Install 'transformers' and 'torch' "
"or configure a Hugging Face inference endpoint to enable full capabilities."
)
def _short_snip(self, text: str, limit: int = 200) -> str:
t = text.replace("\n", " ").strip()
return t[:limit].rsplit(" ", 1)[0] if len(t) > limit else t
def generate_text(self, prompt: str, max_new_tokens: int = 128, temperature: float = 0.7) -> str:
q = prompt.strip()
ql = q.lower()
# Local tiny knowledge base override: prefer exact-match short facts
if 'FALLBACK_KB' in globals():
for k, v in FALLBACK_KB.items():
if k.lower() in ql:
return v + self._limit_note()
# Definitions / 'what is' intents
if "definition of" in ql or ql.startswith("define ") or ql.startswith("what is "):
# Extract short subject
subj = re.split(r"definition of|define|what is", ql, maxsplit=1)[-1].strip().split("?")[0].strip()
subj = subj.split()[:6]
subj = " ".join(subj) if subj else "the term"
resp = f"Definition of {subj}: A concise, plain-language explanation. Examples and nuance may be limited in fallback mode."
return resp + self._limit_note()
# Grammar correction / editing
if re.search(r"correct\b|fix grammar|grammar check|proofread", ql):
# Heuristic: find last quoted sentence or use entire prompt
m = re.search(r"\"([^\"]+)\"|‘([^’]+)’|'([^']+)'", q)
target = m.group(1) if m and m.group(1) else q.splitlines()[-1]
corrected = self._simple_grammar_fix(target)
resp = f"Corrected: {corrected}"
return resp + self._limit_note()
# Translation (very simple)
if ql.startswith("translate ") or "translate to" in ql:
# Try to extract target language and phrase
langs = re.findall(r"translate (?:to )?([a-zA-Z]+)", ql)
lang = langs[-1] if langs else "the target language"
phrase = q.split("\n")[-1][:200]
resp = f"Translation ({lang}): {self._short_snip(phrase, 120)} (literal, approximate)."
return resp + self._limit_note()
# Summarize
if ql.startswith("summarize") or "condense" in ql or "summary" in ql:
body = q.split("\n", 1)[-1] if "\n" in q else q
snippet = self._short_snip(body, 300)
resp = f"Summary: {snippet[:200]}..."
return resp + self._limit_note()
# Short Q&A fallback: provide a concise, generic answer based on keywords
keywords = re.findall(r"\b([a-zA-Z]{4,})\b", ql)
if keywords:
top = keywords[0]
resp = f"(Fallback) Quick answer about '{top}': I may not have full info offline, but here's a concise hint related to your query."
return resp + self._limit_note()
# Default echo-style response
snippet = self._short_snip(q, 200)
return f"(Fallback) {snippet}" + self._limit_note()
def _simple_grammar_fix(self, text: str) -> str:
# Minimal heuristics: capitalize sentences and fix double spaces
s = re.sub(r"\s+", " ", text.strip())
if s and s[0].islower():
s = s[0].upper() + s[1:]
if not s.endswith(('.', '?', '!')):
s = s + '.'
return s
def stream(self, prompt: str, max_new_tokens: int = 128, temperature: float = 0.7):
text = self.generate_text(prompt, max_new_tokens=max_new_tokens, temperature=temperature)
# Stream in modest chunks so UI can present progressive output
chunk_size = 80
for i in range(0, len(text), chunk_size):
yield text[i: i + chunk_size]
# Tiny local knowledge overrides for the fallback LLM. Keep concise facts.
FALLBACK_KB = {
"python": "Python is a high-level, interpreted programming language known for readability and broad library support.",
"html": "HTML stands for HyperText Markup Language and is used to structure content for the web.",
"tinyllama": "TinyLlama is a small, efficient language model ideal for low-resource devices.",
}
def run_fallback_dryrun():
"""Simple dry-run that exercises the fallback LLM against sample prompts and prints outputs."""
fb = FallbackLLM()
tests = [
"Define recursion.",
"Please correct: this sentence need fix",
"Translate to Spanish: Hello, how are you?",
"Summarize:\nPython is a popular language used for data science, web development, and scripting.",
"Tell me about TinyLlama",
]
print("Running fallback dry-run tests:")
for t in tests:
print('\n---\nPROMPT:', t)
out = fb.generate_text(t)
print('OUTPUT:', out)
# ============================================================================
# LIGHTWEIGHT SERVER (embedded for --lite mode)
# ============================================================================
# Provides a deterministic fallback LLM, simple retrieval, session history,
# and a browser UI using only Python standard library.
# ============================================================================
from http.server import HTTPServer, BaseHTTPRequestHandler
import math
import tempfile
import wave
# In-memory storage for lite mode
_LITE_SESSIONS = {} # sid -> list of messages {role, text, ts}
_LITE_KB = [] # list of text documents
def _lite_retrieve(query: str, k: int = 5) -> List[str]:
"""Simple substring-based retrieval for lite mode."""
q = query.lower()
scored = []
for doc in _LITE_KB:
score = 1.0 if q in doc.lower() else 0.0
scored.append((score, doc))
scored.sort(reverse=True)
return [d for s, d in scored[:k] if s > 0]
def _lite_synthesize_sine(text: str) -> str:
"""Generate a simple sine-wave WAV file for TTS stub."""
out = os.path.join(tempfile.gettempdir(), f"hive_tts_{int(time.time()*1000)}.wav")
duration = max(0.3, min(3.0, len(text)/40.0))
sr = 22050
nframes = int(sr * duration)
freq = 440.0
amplitude = 8000
with wave.open(out, 'w') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sr)
for i in range(nframes):
v = int(amplitude * math.sin(2*math.pi*freq*(i/sr)))
wf.writeframes(v.to_bytes(2, 'little', signed=True))
return out
class _LiteHTTPHandler(BaseHTTPRequestHandler):
"""HTTP request handler for lightweight server."""
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def _set_json(self, data: Dict, status: int = 200):
"""Send JSON response."""
b = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(status)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', str(len(b)))
self.end_headers()
self.wfile.write(b)
def do_GET(self):
"""Handle GET requests (serve static HTML UI)."""
path = urllib.parse.urlparse(self.path).path
if path == '/' or path.startswith('/static'):
p = 'static/index.html' if path in ('/', '/static') else path.lstrip('/')
if not os.path.exists(p):
self.send_response(404)
self.end_headers()
return
ct = 'text/html'
if p.endswith('.js'):
ct = 'text/javascript'
if p.endswith('.css'):
ct = 'text/css'
try:
with open(p, 'rb') as f:
data = f.read()
self.send_response(200)
self.send_header('Content-Type', ct)
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
except Exception:
self.send_response(500)
self.end_headers()
return
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle POST requests (chat, KB, TTS, history)."""
path = urllib.parse.urlparse(self.path).path
length = int(self.headers.get('Content-Length', '0'))
body = self.rfile.read(length) if length else b''
try:
payload = json.loads(body.decode('utf-8') or '{}')
except Exception:
payload = {}
if path == '/chat':
sid = payload.get('sid') or str(int(time.time()*1000))
msg = payload.get('message', '')
hist = _LITE_SESSIONS.setdefault(sid, [])
hist.append({'role': 'user', 'text': msg, 'ts': time.time()})
snippets = _lite_retrieve(msg)
prompt = msg + '\n\nRetrieved:\n' + '\n'.join(snippets) if snippets else msg
llm = FallbackLLM()
reply = llm.generate_text(prompt)
hist.append({'role': 'assistant', 'text': reply, 'ts': time.time()})
return self._set_json({'ok': True, 'sid': sid, 'reply': reply, 'history': hist})
if path == '/history':
sid = payload.get('sid')
return self._set_json({'history': _LITE_SESSIONS.get(sid, [])})
if path == '/add_kb':
text = payload.get('text', '')
if text:
_LITE_KB.append(text)
return self._set_json({'ok': True})
return self._set_json({'ok': False, 'error': 'no text provided'}, status=400)
if path == '/tts':
text = payload.get('text', '')
if not text:
return self._set_json({'ok': False, 'error': 'no text'}, status=400)
wav = _lite_synthesize_sine(text)
return self._set_json({'ok': True, 'wav': os.path.basename(wav)})
self._set_json({'error': 'unknown endpoint'}, status=404)
def run_lite_server(port: int = 7860):
"""Run the lightweight Hive server (HTTP + deterministic LLM, stdlib only)."""
print(f"\n[LITE] Starting lightweight Hive server on http://0.0.0.0:{port}")
print(f"[LITE] Open http://localhost:{port} in your browser")
print(f"[LITE] Press Ctrl+C to stop\n")
srv = HTTPServer(('0.0.0.0', port), _LiteHTTPHandler)
try:
srv.serve_forever()
except KeyboardInterrupt:
print('\n[LITE] Shutting down...')
srv.server_close()
# Optional imports - gracefully degrade if not available
try:
import psutil
except ImportError:
psutil = None
try:
import requests
except ImportError:
requests = None
try:
import feedparser
except ImportError:
feedparser = None
try:
import langid
except ImportError:
langid = None
try:
import librosa
except ImportError:
librosa = None
try:
import soundfile as sf
except ImportError:
sf = None
try:
from sentence_transformers import SentenceTransformer
except ImportError:
SentenceTransformer = None
try:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer
except ImportError:
AutoTokenizer = AutoModelForCausalLM = pipeline = None
StoppingCriteria = StoppingCriteriaList = TextIteratorStreamer = None
try:
from faster_whisper import WhisperModel
except ImportError:
WhisperModel = None
try:
from piper.voice import PiperVoice
except ImportError:
PiperVoice = None
try:
from duckduckgo_search import DDGS
except ImportError:
DDGS = None
try:
from g2p_en import G2p
except ImportError:
G2p = None
try:
from sklearn.metrics.pairwise import cosine_similarity
except ImportError:
cosine_similarity = None
try:
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
_HAVE_FASTAPI = True
except ImportError:
FastAPI = HTTPException = JSONResponse = uvicorn = None
_HAVE_FASTAPI = False
from concurrent.futures import ThreadPoolExecutor
# Optional dependencies with graceful fallback
try:
import pvporcupine
_HAVE_PVP=True
except ImportError:
_HAVE_PVP=False
try:
import webrtcvad
_HAVE_VAD=True
except ImportError:
_HAVE_VAD=False
try:
import torch
except ImportError:
torch=None
# Import FAISS early to avoid segfaults (needs to load before some other libs)
try:
import faiss
except ImportError:
faiss=None
# Define StoppingCriteria only if transformers is available
if StoppingCriteria is not None:
class StopOnTokens(StoppingCriteria):
def __init__(self, stop_token_ids: List[int]):
self.stop_token_ids = stop_token_ids
def __call__(self, input_ids, scores, **kwargs) -> bool:
for stop_id in self.stop_token_ids:
if input_ids[0][-1] == stop_id:
return True
return False
else:
StopOnTokens = None
# Optional vision
try:
import cv2; _HAVE_CV=True
except Exception:
_HAVE_CV=False
try:
from PIL import Image
import pytesseract; _HAVE_TESS=True and _HAVE_CV
except Exception:
_HAVE_TESS=False
try:
import keyring
except ImportError:
keyring=None
try:
import sounddevice as sd
_HAVE_SD = True
except Exception as e:
# sounddevice can raise OSError when PortAudio is not installed on the system
sd = None
_HAVE_SD = False
try:
# Provide a helpful diagnostic when PortAudio is missing
emsg = str(e)
if isinstance(e, OSError) and "PortAudio" in emsg or "portaudio" in emsg.lower():
print("[Warning] sounddevice import failed: PortAudio library not found.")
print("[Hint] On Debian/Ubuntu: sudo apt-get install -y libportaudio2 portaudio19-dev")
print("[Hint] On Raspbian: sudo apt-get install -y libportaudio2 portaudio19-dev")
print("[Hint] After installing system packages: pip install --force-reinstall sounddevice")
else:
print(f"[Warning] sounddevice import failed: {e}")
except Exception:
# Swallow secondary errors from diagnostics so import doesn't fail
pass
# Perform a strict full-mode dependency check (do not run in degraded mode)
# If you want the app to auto-degrade when packages are missing, set env HIVE_FORCE_DEGRADED=1
import importlib
def _check_full_mode_requirements():
# Identify missing core Python packages required for full mode and optionally auto-install them.
missing = []
pkg_map = {
'transformers': 'transformers',
'torch': 'torch',
'sentence-transformers': 'sentence-transformers',
'peft': 'peft>=0.12.0',
'faiss': 'faiss-cpu',
'gradio': 'gradio',
'sentencepiece': 'sentencepiece'
}
if AutoTokenizer is None:
missing.append(pkg_map['transformers'])
if torch is None:
missing.append(pkg_map['torch'])
if SentenceTransformer is None:
missing.append(pkg_map['sentence-transformers'])
try:
import peft as _peft # type: ignore
except Exception:
missing.append(pkg_map['peft'])
if faiss is None:
missing.append(pkg_map['faiss'])
if gr is None:
missing.append(pkg_map['gradio'])
if importlib.util.find_spec('sentencepiece') is None:
missing.append(pkg_map['sentencepiece'])
# Deduplicate
missing = list(dict.fromkeys(missing))
if not missing:
return
# If user explicitly allowed auto-installation, attempt to pip install missing packages
if os.getenv('HIVE_AUTOINSTALL', '').lower() in ('1', 'true', 'yes', 'on') and not os.getenv('HIVE_AUTOINSTALL_DONE'):
print('[Hive] Missing packages detected, attempting to pip install:', missing)
try:
for spec in missing:
print(f"[Hive] Installing {spec}...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--upgrade', spec])
# Mark done to avoid loops and restart the process to load new packages
os.environ['HIVE_AUTOINSTALL_DONE'] = '1'
print('[Hive] Packages installed; restarting process to load new modules.')
os.execv(sys.executable, [sys.executable] + sys.argv)
except Exception as e:
print(f"[Hive] Auto-install failed: {e}")
print('Please install the missing packages manually (see README.md).')
sys.exit(3)
# If we reach here, auto-install not enabled or already attempted; print instructions and exit.
print('\n============================================================')
print('HIVE - MISSING REQUIRED DEPENDENCIES FOR FULL MODE')
print('============================================================')
print('The following Python packages are required to run Hive in full mode (LLMs, UI, and audio):')
for p in missing:
print(f' - {p}')
print('\nRequired system packages (Debian/Ubuntu):')
print(' sudo apt-get update && sudo apt-get install -y build-essential ffmpeg libsndfile1 libportaudio2 portaudio19-dev')
print('\nAfter installing system packages, install Python packages:')
print(' pip install -r requirements.txt')
print('\nTo enable automatic pip installation, set environment variable: HIVE_AUTOINSTALL=1')
print('\nExiting to allow you to prepare the environment.')
sys.exit(2)
_check_full_mode_requirements()
# ----------------------- config -----------------------
def ENV(name, default=None, cast=str):
v=os.getenv(name, default)
if v is None: return None
if cast is bool: return str(v).lower() in ("1","true","yes","on")
if cast is int:
try: return int(v) # type: ignore
except (ValueError, TypeError): return int(float(v))
return v
CFG={
# auto-archive memory to curves.tar.gz
"HIVE_AUTO_ARCHIVE": ENV("HIVE_AUTO_ARCHIVE", "1", bool),
"HIVE_AUTO_ARCHIVE_MODE": ENV("HIVE_AUTO_ARCHIVE_MODE", "per_chain", str), # per_chain | per_dataset
"HIVE_ARCHIVE_PATH": ENV("HIVE_ARCHIVE_PATH", "curves.tar.gz", str),
# staged ingestion chaining (auto-run multiple stages this boot)
"HIVE_INGEST_CHAIN": ENV("HIVE_INGEST_CHAIN", "1", bool),
"HIVE_INGEST_CHAIN_MAX": ENV("HIVE_INGEST_CHAIN_MAX", "2", int), # max stages per boot
# staged ingestion controls
"HIVE_INGEST_STAGED": ENV("HIVE_INGEST_STAGED", "1", bool),
"HIVE_INGEST_STAGE_SIZE": ENV("HIVE_INGEST_STAGE_SIZE", "3", int),
"HIVE_INGEST_MIN_FREE_GB": ENV("HIVE_INGEST_MIN_FREE_GB", "8", int),
"HIVE_INGEST_NEXT": ENV("HIVE_INGEST_NEXT", "0", bool),
# self-edit manifest controls
"HIVE_ALLOW_SELF_WRITE_MANIFEST": ENV("HIVE_ALLOW_SELF_WRITE_MANIFEST", "1", bool),
"HIVE_SELF_WRITE_FILE": ENV("HIVE_SELF_WRITE_FILE", "", str),
# memory auto-restore controls (admin memory)
"CURVES_AUTO_RESTORE": ENV("HIVE_CURVES_AUTO_RESTORE", "1", bool),
"CURVES_ARCHIVE_LOCAL": ENV("HIVE_CURVES_ARCHIVE_LOCAL", "curves.tar.gz", str),
"CURVES_ARCHIVE_URL": ENV("HIVE_CURVES_ARCHIVE_URL", "", str),
"CURVES_HF_DATASET": ENV("HIVE_CURVES_HF_DATASET", "", str),
"CURVES_HF_SUBPATH": ENV("HIVE_CURVES_HF_SUBPATH", "", str),
"HF_READ_TOKEN": ENV("HF_READ_TOKEN", "", str),
# memory directory alias
"HIVE_HOME": ENV("HIVE_HOME", "/home/hive/hive_data" if os.path.exists("/home/hive") else "./hive_data"), # type: ignore
"CURVE_DIR": os.path.join(ENV("HIVE_HOME", "/home/hive/hive_data" if os.path.exists("/home/hive") else "./hive_data"), "curves"), # type: ignore
"STATE_DIR": os.path.join(ENV("HIVE_HOME", "/home/hive/hive_data" if os.path.exists("/home/hive") else "./hive_data"), "system"), # type: ignore
"LAUNCH_UI": ENV("HIVE_LAUNCH_UI","1",bool),
"LLM_AUTOSIZE": ENV("HIVE_LLM_AUTOSIZE", "1", bool), # type: ignore
"LLM_MAX_VRAM_GB": ENV("HIVE_LLM_MAX_VRAM_GB","0", int),
"MODEL_OVERRIDE": ENV("HIVE_MODEL_ID",""),
"CTX_TOKENS": ENV("HIVE_CTX_TOKENS","2048",int),
"OWNER_NAME": ENV("HIVE_OWNER_USER",""),
"OWNER_PASS": ENV("HIVE_OWNER_PASS",""),
"OWNER_SECOND": ENV("HIVE_OWNER_SECOND",""),
"AGENT_NAME": ENV("HIVE_AGENT_NAME","Hive"),
"NO_PROFANITY": ENV("HIVE_NO_PROFANITY","1",bool),
"ASR_SIZE": ENV("HIVE_ASR_SIZE","small"),
"TTS_LANG": ENV("HIVE_TTS_LANG","en"),
"BOOTSTRAP_INGEST": ENV("HIVE_BOOTSTRAP_INGEST", "1", bool),
"FORCE_REINGEST": ENV("HIVE_FORCE_REINGEST","0",bool),
"INGEST_SOURCES": ENV("HIVE_INGEST_SOURCES",""),
"ONLINE_ENABLE": ENV("HIVE_ONLINE_ENABLE","1",bool),
"ONLINE_AUTO": ENV("HIVE_ONLINE_AUTO","0",bool),
"ONLINE_SOURCES": ENV("HIVE_ONLINE_SOURCES","https://hnrss.org/frontpage,https://rss.nytimes.com/services/xml/rss/nyt/World.xml"),
"ONLINE_TIMEOUT": ENV("HIVE_ONLINE_TIMEOUT","8",int),
"ONLINE_MAX_RESULTS": ENV("HIVE_ONLINE_MAX_RESULTS","5",int),
"ONLINE_TRIGGER": ENV("HIVE_ONLINE_TRIGGER","auto",str),
# bounded self governance
"HIVE_USE_HF_INFERENCE": ENV("HIVE_USE_HF_INFERENCE","0",bool),
"HIVE_HF_ENDPOINT": ENV("HIVE_HF_ENDPOINT","",str),
"ALLOW_SELF_REBOOT": ENV("HIVE_ALLOW_SELF_REBOOT","1",bool),
"ALLOW_RUNTIME_HOTPATCH": ENV("HIVE_ALLOW_RUNTIME_HOTPATCH", "1", bool),
"AUTO_SELF_OPTIMIZE": ENV("HIVE_AUTO_SELF_OPTIMIZE","1",bool),
"PVPORCUPINE_ACCESS_KEY": ENV("HIVE_PVPORCUPINE_ACCESS_KEY", "", str),
"HIVE_WAKE_WORDS": ENV("HIVE_WAKE_WORDS", "bumblebee", str), # Default wake word
"VIDEO_ENABLED": ENV("HIVE_VIDEO_ENABLED", "0", bool), # Add this line
# internal optimization with sandbox + A/B (Owner policy)
"OPT_ENABLE": ENV("HIVE_OPT_ENABLE","1",bool),
"OPT_AUTO_APPLY": ENV("HIVE_OPT_AUTO_APPLY","0",bool), # OWNER MAY SET TO 1
"OPT_PKG_ALLOWLIST": ENV("HIVE_OPT_PKG_ALLOWLIST","transformers,accelerate,datasets,sentence-transformers,faiss-cpu,duckduckgo_search,feedparser,requests,gradio").split(","),
"OPT_MODEL_ALLOWLIST": ENV("HIVE_OPT_MODEL_ALLOWLIST","meta-llama/Meta-Llama-3.1-8B-Instruct,meta-llama/Meta-Llama-3.1-70B-Instruct,TinyLlama/TinyLlama-1.1B-Chat-v1.0").split(","),
"OPT_THRESH_LATENCY_MS": ENV("HIVE_OPT_THRESH_LATENCY_MS","0",int),
"OPT_THRESH_TOKS_PER_S": ENV("HIVE_OPT_THRESH_TOKS_PER_S","0",float),
"OPT_THRESH_QUALITY": ENV("HIVE_OPT_THRESH_QUALITY","0.02",float),
"OPT_SANDBOX_TIMEOUT": ENV("HIVE_OPT_SANDBOX_TIMEOUT","180",int),
}
CFG["VOICE_ASR_MODEL"] = CFG["ASR_SIZE"] # Alias for backward compatibility
# Ensure a sensible default model preference so typed and voice chat use TinyLlama
if not CFG.get("MODEL_OVERRIDE"):
CFG["MODEL_OVERRIDE"] = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
HIVE_INSTANCE = None
CFG['VAD_ENERGY_THRESHOLD'] = 300
CFG['VAD_SILENCE_DURATION'] = 1.0
CFG['VAD_MIN_SPEECH_DURATION'] = 0.2
CFG['VAD_ENERGY_THRESHOLD'] = 300 # Energy threshold for speech detection
CFG['VAD_SILENCE_DURATION'] = 0.7 # How long to wait in silence before considering speech ended
CFG['VAD_MIN_SPEECH_DURATION'] = 0.18 # Minimum duration of speech to be considered a valid segment
CFG['VOICE_VAD_AGGRESSIVENESS'] = 2 # Default VAD aggressiveness
ONLINE_DB = os.path.join(CFG["STATE_DIR"], "online_cache.json")
# Create all necessary directories based on the new specification
HIVE_HOME = CFG["HIVE_HOME"] # type: ignore
DIRS_TO_CREATE = [
os.path.join(HIVE_HOME, "curves"),
os.path.join(HIVE_HOME, "knowledge", "chunks"),
os.path.join(HIVE_HOME, "knowledge", "embeddings"),
os.path.join(HIVE_HOME, "users", "conversations"),
os.path.join(HIVE_HOME, "users", "sessions"),
os.path.join(HIVE_HOME, "system", "logs"),
os.path.join(HIVE_HOME, "system", "backups"),
os.path.join(HIVE_HOME, "voice", "asr_models"),
os.path.join(HIVE_HOME, "voice", "tts_models"),
os.path.join(HIVE_HOME, "voice", "voiceprints"),
os.path.join(HIVE_HOME, "voice", "samples"),
os.path.join(HIVE_HOME, "admin", "logs"),
os.path.join(HIVE_HOME, "packages"),
]
for d in DIRS_TO_CREATE: os.makedirs(d, exist_ok=True)
# Feedback storage directory
FEEDBACK_DIR = os.path.join(HIVE_HOME, "feedback")
os.makedirs(FEEDBACK_DIR, exist_ok=True)
FEEDBACK_DB = os.path.join(FEEDBACK_DIR, "feedback.jsonl")
# Correction storage directory
CORRECTIONS_DB = os.path.join(FEEDBACK_DIR, "corrections.jsonl")
OVERLAY_DIR = os.path.join(HIVE_HOME, "system", "overlay")
OPT_DIR = os.path.join(HIVE_HOME, "system", "opt")
OPT_PROPOSALS = os.path.join(OPT_DIR, "proposals.jsonl")
OPT_RESULTS = os.path.join(OPT_DIR, "results.jsonl")
for p in (OVERLAY_DIR, OPT_DIR):
os.makedirs(p, exist_ok=True)
# Global singleton EventBus used for BEL/Hive IPC
# NOTE: These will be initialized after EventBus and BEL classes are defined (see below)
GLOBAL_EVENT_BUS = None
GLOBAL_BEL = None
# ----------------- sensing / model pick -----------------
class EnvDetector:
"""Implements the Environment Detector and Capability Profiler from Part 1, Section 4.
Detects: CPU, RAM, disk, GPU, audio, camera, display, network, and returns adaptive
config parameters (embedding batch size, cache budgets, retrieval_k) scaled to device.
"""
def _has_gpu_env(self) -> bool:
accel = os.getenv("SPACE_ACCELERATOR", "").lower()
if accel in ("t4", "a10", "a100", "l4", "l40", "h100"): return True
try:
return torch is not None and torch.cuda.is_available()
except Exception:
return False
def _detect_display(self) -> bool:
if _os_name() == 'linux':
return bool(os.environ.get('DISPLAY')) or os.path.exists('/dev/fb0')
return False
def _detect_camera(self) -> bool:
if _os_name() == 'linux':
return any(os.path.exists(f'/dev/video{i}') for i in range(4))
return False
def _detect_audio_input(self) -> bool:
# Check for ALSA or sounddevice availability
try:
if _HAVE_SD:
devices = sd.query_devices() if _HAVE_SD else []
return len(devices) > 0
except Exception:
pass
# Fallback heuristic: assume audio available on Pi
return 'raspberrypi' in platform.machine().lower()
def _detect_audio_output(self) -> bool:
# Similar to input: check for playback devices
try:
if _HAVE_SD:
devices = sd.query_devices() if _HAVE_SD else []
return any(d['max_output_channels'] > 0 for d in devices if isinstance(d, dict))
except Exception:
pass
return 'raspberrypi' in platform.machine().lower()
def _get_disk_info(self, path: str = ".") -> Dict[str, float]:
"""Return disk usage info in GB."""
try:
stat = os.statvfs(path)
total = (stat.f_blocks * stat.f_frsize) / (1024**3)
free = (stat.f_bavail * stat.f_frsize) / (1024**3)
used = total - free
return {"total_gb": total, "free_gb": free, "used_gb": used}
except Exception:
return {"total_gb": 0, "free_gb": 0, "used_gb": 0}
def _get_cpu_count(self) -> int:
"""Return CPU core count."""
try:
return os.cpu_count() or 1
except Exception:
return 1
def probe(self) -> Dict[str, any]:
"""Comprehensive environment profile with adaptive config."""
try:
total_ram_gb = psutil.virtual_memory().total / (1024**3)
free_ram_gb = psutil.virtual_memory().available / (1024**3)
is_pi = 'raspberrypi' in platform.machine().lower() or os.path.exists('/proc/device-tree/model')
disk = self._get_disk_info(CFG.get("HIVE_HOME", "."))
cpu_count = self._get_cpu_count()
# Adaptive parameters based on RAM
if free_ram_gb < 1.5:
embedding_batch_size = 4
cache_budget_gb = 0.5
retrieval_k = 3
model_precision = "int8"
load_full_embedder = False
elif free_ram_gb < 2.5:
embedding_batch_size = 8
cache_budget_gb = 0.8
retrieval_k = 4
model_precision = "int8"
load_full_embedder = False
elif free_ram_gb < 4:
embedding_batch_size = 16
cache_budget_gb = 1.5
retrieval_k = 5
model_precision = "float16"
load_full_embedder = True
else:
embedding_batch_size = 32 if free_ram_gb < 6 else 64
cache_budget_gb = min(4.0, free_ram_gb * 0.3)
retrieval_k = 6
model_precision = "float32"
load_full_embedder = True
profile = {
# Device identification
"device_type": "raspberry_pi" if is_pi else "generic_linux",
"arch": platform.machine(),
"is_pi": is_pi,
"is_headless": not self._detect_display(),
# Memory
"total_ram_gb": round(total_ram_gb, 1),
"free_ram_gb": round(free_ram_gb, 1),
"is_low_memory": total_ram_gb < 6,
# Disk
"disk_total_gb": round(disk["total_gb"], 1),
"disk_free_gb": round(disk["free_gb"], 1),
"disk_used_gb": round(disk["used_gb"], 1),
# Hardware features
"has_gpu": self._has_gpu_env(),
"has_display": self._detect_display(),
"has_camera": self._detect_camera(),
"has_microphone": self._detect_audio_input(),
"has_speaker": self._detect_audio_output(),
"cpu_count": cpu_count,
# Network
"network_up": NET.online_quick(),
# Adaptive parameters
"embedding_batch_size": embedding_batch_size,
"cache_budget_gb": cache_budget_gb,
"retrieval_k": retrieval_k,
"model_precision": model_precision,
"load_full_embedder": load_full_embedder,
# Legacy fields
"max_docs": 70000 if total_ram_gb > 16 else (50000 if total_ram_gb > 8 else (30000 if total_ram_gb > 4 else 10000)),
"batch": 512 if total_ram_gb > 16 else (256 if total_ram_gb > 8 else (128 if total_ram_gb > 4 else 64))
}
return profile
except Exception as e:
logging.error(f"EnvDetector.probe error: {e}")
# Return safe defaults on error
return {
"device_type": "unknown",
"arch": platform.machine(),
"is_pi": False,
"is_headless": True,
"total_ram_gb": 4.0,
"free_ram_gb": 2.0,
"is_low_memory": True,
"disk_total_gb": 0,
"disk_free_gb": 0,
"disk_used_gb": 0,
"has_gpu": False,
"has_display": False,
"has_camera": False,
"has_microphone": False,
"has_speaker": False,
"cpu_count": 1,
"network_up": False,
"embedding_batch_size": 8,
"cache_budget_gb": 1.0,
"retrieval_k": 4,
"model_precision": "int8",
"load_full_embedder": False,
"max_docs": 10000,
"batch": 64
}
def probe_caps():
return EnvDetector().probe()
CANDIDATES=[
("TinyLlama/TinyLlama-1.1B-Chat-v1.0", 0),
("openlm-research/open_llama_3b_v2", 6), # Add OpenLLaMA 3B as a mid-tier option
("meta-llama/Meta-Llama-3.1-8B-Instruct", 12),
("meta-llama/Meta-Llama-3.1-70B-Instruct", 100)
]
def pick_model(caps: Dict[str, any]) -> Tuple[str, dict]: # type: ignore
"""Selects the best model based on available RAM."""
free_ram_gb = caps.get("free_ram_gb", 0)
# If more than 6GB of RAM is free, prefer the stronger 3B model
if free_ram_gb > 6:
model_id = "openlm-research/open_llama_3b_v2"
else:
model_id = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
device = "cuda" if _has_gpu_env() else "cpu"
return model_id, {"device": device}
class Bootstrap:
"""Startup helper for Hive. Detects environment, initializes directories, and spawns Hive core.
Implements headless operation for Pi: skips UI, initializes HTTP API server instead.
"""
def __init__(self, config_path: Optional[str] = None, force_ui: bool = False):
self.config_path = config_path or os.path.join(CFG["HIVE_HOME"], "system", "config.json")
self.config = CFG.copy()
self.env: Optional[Dict] = None
self.hive_instance: Optional["Hive"] = None
self.hive_lite_instance: Optional["Hive"] = None
self.lite_core_ready = threading.Event()
# Indicates whether the lite core finished initialization successfully
self.lite_core_success = False
# Event that signals when voice-related models/services are ready
self.voice_ready = threading.Event()
# Capability/profile information populated after probing environment
self.caps: Dict[str, any] = {}
self.hive_ready = threading.Event()
self.bootstrap_instance = self # For self-reference
self.force_ui = force_ui # Whether UI will be launched (passed from main)
def _ensure_directories(self):
"""Create HIVE_HOME directory structure."""
dirs = [
os.path.join(CFG["HIVE_HOME"], "curves"),
os.path.join(CFG["HIVE_HOME"], "knowledge", "chunks"),
os.path.join(CFG["HIVE_HOME"], "knowledge", "embeddings"),
os.path.join(CFG["HIVE_HOME"], "users", "conversations"),
os.path.join(CFG["HIVE_HOME"], "users", "sessions"),
os.path.join(CFG["HIVE_HOME"], "system", "logs"),
os.path.join(CFG["HIVE_HOME"], "system", "backups"),
os.path.join(CFG["HIVE_HOME"], "voice", "asr_models"),
os.path.join(CFG["HIVE_HOME"], "voice", "tts_models"),
os.path.join(CFG["HIVE_HOME"], "voice", "voiceprints"),
os.path.join(CFG["HIVE_HOME"], "voice", "samples"),
os.path.join(CFG["HIVE_HOME"], "admin", "logs"),
os.path.join(CFG["HIVE_HOME"], "packages"),
]
for d in dirs:
os.makedirs(d, exist_ok=True)
def _load_or_create_config(self):
"""Load config from disk or create defaults."""
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
disk_config = json.load(f)
self.config.update(disk_config)
logging.info(f"Loaded config from {self.config_path}")
except Exception as e:
logging.warning(f"Failed to load config: {e}. Using defaults.")
def run(self):
"""Bootstrap sequence: probe env, init dirs, spawn Hive.
For Pi headless: skips UI, returns HTTP API ready.
"""
logging.info("[Bootstrap] Starting Hive initialization...")
# 1. Ensure directories exist
self._ensure_directories()
logging.info("[Bootstrap] Directory structure initialized.")
# 2. Load/create config
self._load_or_create_config()
# 3. Probe environment
self.env = EnvDetector().probe()
logging.info(f"[Bootstrap] Environment detected: {self.env['device_type']}, "
f"RAM={self.env['free_ram_gb']}GB free, "
f"headless={self.env['is_headless']}, "
f"network={'online' if self.env['network_up'] else 'offline'}")
# 4. Apply adaptive parameters based on environment
if self.env["is_low_memory"]:
logging.info("[Bootstrap] Low-memory device detected. Enabling conservative mode.")
CFG["CTX_TOKENS"] = 1024
CFG["OPT_SANDBOX_TIMEOUT"] = 60
else:
CFG["CTX_TOKENS"] = 2048
# 5. Create lite Hive instance (always available)
logging.info("[Bootstrap] Starting Lite Hive core...")
self.hive_lite_instance = Hive(model_id=None, lite=True)
self.lite_core_ready.set()
# Mark that the lite core initialized successfully
self.lite_core_success = True
logging.info("[Bootstrap] Lite core ready.")
# 5.a Start background asset downloader to prefetch datasets and small models
try:
dl_thread = threading.Thread(target=self._download_initial_assets, name="asset_downloader", daemon=True)
dl_thread.start()
except Exception:
logging.warning("Failed to start asset downloader thread.")
# 5.b Start any lightweight modules for the lite Hive (e.g., persistence)
try:
if self.hive_lite_instance and hasattr(self.hive_lite_instance, 'module_manager'):
logging.info("[Bootstrap] Starting Lite Hive modules (persistence, overlay)...")
self.hive_lite_instance.module_manager.start_all()
except Exception as e:
logging.warning(f"[Bootstrap] Failed to start lite hive modules: {e}")
# 6. Spawn full Hive in background (non-blocking for headless)
# CRITICAL: If UI will be launched (force_ui=True or SPACE_ID set), initialize full core
# SYNCHRONOUSLY so that voice/ASR/TTS features are ready before UI launches.
# Only use async init if explicitly configured via env var and UI is NOT launching.
will_launch_ui = self.force_ui or bool(os.getenv("SPACE_ID"))
force_async_full_explicit = ENV("HIVE_ASYNC_FULL_INIT", "0", bool)
run_full_async = (self.env["is_headless"] and not will_launch_ui) or force_async_full_explicit
if run_full_async:
logging.info("[Bootstrap] Async full-core init enabled. Starting Full Hive in background thread.")
full_hive_thread = threading.Thread(
target=self._init_full_hive_background,
name="full_hive_init",
daemon=False
)
full_hive_thread.start()
else:
# On device with display or when UI launching on Spaces, load full Hive synchronously before UI
logging.info("[Bootstrap] UI launching or display detected; initializing Full Hive synchronously...")
try:
self.hive_instance = Hive(model_id=None, lite=False)
self.hive_ready.set()
# Signal voice ready if voice services are available
if self.hive_instance and hasattr(self.hive_instance, 'asr_service') and self.hive_instance.asr_service and hasattr(self.hive_instance, 'tts_service') and self.hive_instance.tts_service:
self.voice_ready.set()
logging.info("[Bootstrap] Voice services are ready.")
logging.info("[Bootstrap] Full Hive core initialization complete.")
except Exception as e:
logging.warning(f"[Bootstrap] Full Hive initialization failed: {e}. Chat will use lite mode only.")
self.hive_instance = None
logging.info("[Bootstrap] Initialization complete. HTTP API ready.")
return self.hive_lite_instance
def _init_full_hive_background(self):
"""Background thread to initialize full Hive without blocking headless startup."""
try:
logging.info("[Bootstrap] Full Hive initialization starting in background...")
self.hive_instance = Hive(model_id=None, lite=False)
self.hive_ready.set()
# Signal voice ready if voice services are available
if self.hive_instance and hasattr(self.hive_instance, 'asr_service') and self.hive_instance.asr_service and hasattr(self.hive_instance, 'tts_service') and self.hive_instance.tts_service:
self.voice_ready.set()
logging.info("[Bootstrap] Voice services are ready.")
logging.info("[Bootstrap] Full Hive initialization complete.")
except Exception as e:
logging.error(f"[Bootstrap] Full Hive initialization failed: {e}", exc_info=True)
import traceback
logging.error(traceback.format_exc())
self.hive_instance = None
def _download_initial_assets(self):
"""Background task to prefetch datasets and small assets listed in DEFAULT_SOURCES.
Tries multiple backends (`datasets`, `huggingface_hub`) with graceful fallback.
This helps reduce first-interaction latency when Full Hive initializes.
"""
try:
logging.info("[Bootstrap] Asset downloader started.")
# Try to import `datasets` for streaming and processing
try:
from datasets import load_dataset as _load_dataset
except Exception:
_load_dataset = None
# Fallback: huggingface_hub to fetch dataset card files or small assets
try:
from huggingface_hub import hf_hub_download as _hf_hub_download
except Exception:
_hf_hub_download = None
# Get a reference to the full hive instance to access the knowledge store
# It might not be ready immediately, so we'll wait for it.
self.hive_ready.wait(timeout=300) # Wait up to 5 mins for full hive
hive_instance = getattr(self, 'hive_instance', None)
if not hive_instance or not hasattr(hive_instance, 'kstore'):
logging.warning("[Bootstrap] Asset downloader: Full Hive or KnowledgeStore not available. Skipping ingestion.")
return
for src in DEFAULT_SOURCES:
try:
logging.info(f"[Bootstrap] Processing and ingesting dataset: {src}")
if _load_dataset is not None:
try:
# Stream the dataset to avoid downloading it all at once
dataset = _load_dataset(src, split="train", streaming=True)
logging.info(f"[Bootstrap] Streaming dataset {src} for ingestion.")
# Ingest text from a sample of the dataset
for example in dataset.take(100): # Process first 100 examples
text_content = example.get("text") or example.get("content")
if text_content and isinstance(text_content, str):
hive_instance.kstore.ingest_text(text_content, tag=f"ingest:{src}", scope="general")
logging.info(f"[Bootstrap] Ingested sample from {src}.")
# Clean up the cache for this specific dataset to save space
dataset.cleanup_cache_files()
logging.info(f"[Bootstrap] Cleaned up cache for {src}.")
continue
except Exception as e:
logging.warning(f"[Bootstrap] datasets.load_dataset failed for {src}: {e}")
if _hf_hub_download is not None:
try:
# Try to download the dataset README as a lightweight indicator
_hf_hub_download(repo_id=src, filename="README.md", repo_type="dataset")
logging.info(f"[Bootstrap] Dataset {src} README fetched via huggingface_hub.")
continue
except Exception as e:
logging.debug(f"[Bootstrap] hf_hub_download README failed for {src}: {e}")
# As a last resort, attempt an HTTP HEAD request to the HF dataset URL to warm caches
try:
url = f"https://huggingface.co/datasets/{src.replace('/', '%2F')}"
if requests is not None:
requests.head(url, timeout=8)
logging.info(f"[Bootstrap] Performed HTTP probe for {src} to warm CDN.")
except Exception:
pass
except Exception as e:
logging.warning(f"[Bootstrap] Error prefetching {src}: {e}")
# Optionally prefetch small model assets (sentence-transformers embedder)
try:
emb_id = _EMB_ID
if emb_id and _hf_hub_download is not None:
try:
_hf_hub_download(repo_id=emb_id, filename="README.md", repo_type="model")
logging.info(f"[Bootstrap] Prefetched embedder README for {emb_id}.")
except Exception:
pass
except Exception:
pass
logging.info("[Bootstrap] Asset downloader finished.")
except Exception as e:
logging.error(f"[Bootstrap] Asset downloader encountered exception: {e}")
# ----------------- embeddings / curves -----------------
_EMB_ID=os.getenv("HIVE_EMB_ID","sentence-transformers/all-MiniLM-L6-v2")
class GEC:
"""Global Embedding Curve: Wrapper for sentence-transformers model with lazy loading."""
def __init__(self):
if SentenceTransformer is None:
raise RuntimeError("sentence-transformers not installed. Install it with: pip install sentence-transformers")
# LAZY INITIALIZATION: Don't load model until first encode() call
# This prevents blocking Full Mode initialization while downloading 90MB+ model
self.model = None
self._device = "cuda" if _has_gpu_env() else "cpu"
def _ensure_model_loaded(self):
"""Lazy-load the sentence-transformers model on first use."""
if self.model is None:
logging.info(f"[GEC] Loading sentence-transformers model ({_EMB_ID}) on {self._device}...")
start = time.time()
self.model = SentenceTransformer(_EMB_ID).to(self._device)
logging.info(f"[GEC] Model loaded in {time.time()-start:.1f}s")
def encode(self, texts: List[str]):
self._ensure_model_loaded()
return self.model.encode(texts, normalize_embeddings=True)
class SimpleEmbedder:
"""Deterministic fallback embedder when sentence-transformers is unavailable.
Produces fixed-size float32 vectors from SHA256 of text.
"""
def __init__(self, dim: int = 128):
self.dim = dim
def _text_to_vector(self, text: str):
h = hashlib.sha256(text.encode('utf-8')).digest()
# Expand hash to required dimension by repeating and interpreting bytes
vec = []
i = 0
while len(vec) < self.dim:
b = h[i % len(h)]
vec.append((b / 255.0) * 2.0 - 1.0)
i += 1
arr = np.array(vec, dtype='float32')
# normalize
norm = np.linalg.norm(arr)
if norm > 0:
arr = arr / norm
return arr
def encode(self, texts: List[str]):
return np.vstack([self._text_to_vector(t) for t in texts])
class CurveStore:
"""
Manages the FAISS vector index and metadata for Retrieval-Augmented Generation.
This class is designed to be resilient, with fallbacks for environments where
FAISS or sentence-transformers are not available or where memory is constrained.
"""
def __init__(self, d):
self.dir=d; os.makedirs(d, exist_ok=True)
self.idx_path=os.path.join(d,"faiss.index")
self.meta_path=os.path.join(d,"meta.jsonl")
self.dim=384
self.enabled = False
self.index = None
self.metadata = []
# 1. Check for dependencies
if faiss is None:
logging.warning("[CurveStore] FAISS not installed. Vector retrieval will be disabled.")
self.gec = SimpleEmbedder(dim=128) # Use fallback embedder for consistency
return
# 2. Check for sufficient memory
try:
free_gb = psutil.virtual_memory().available / (1024.0 ** 3) if psutil else 2.0
if free_gb < 1.0:
logging.warning(f"[CurveStore] Low memory ({free_gb:.2f}GB free). FAISS will be disabled to prevent OOM errors.")
self.gec = SimpleEmbedder(dim=128)
return
except Exception:
pass # Proceed if psutil fails
# 3. Initialize GEC (sentence-transformers)
try:
self.gec = GEC()
except RuntimeError:
logging.warning("[CurveStore] sentence-transformers not installed. Using fallback embedder.")
self.gec = SimpleEmbedder(dim=128)
# 4. Load or create the FAISS index
self._load_index()
self.enabled = True
def _load_index(self):
"""Loads the FAISS index and metadata from disk."""
if os.path.exists(self.idx_path) and os.path.exists(self.meta_path):
try:
self.index = faiss.read_index(self.idx_path)
with open(self.meta_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
self.metadata = [json.loads(line) for line in lines if line]
logging.info(f"[CurveStore] Loaded FAISS index with {self.index.ntotal} vectors and {len(self.metadata)} metadata entries.")
except Exception as e:
logging.error(f"[CurveStore] Failed to load index/metadata: {e}. Re-initializing.")
self.index = faiss.IndexFlatIP(self.dim)
self.metadata = []
else:
logging.info("[CurveStore] No existing index found. Initializing a new one.")
self.index = faiss.IndexFlatIP(self.dim)
self.metadata = []
def add_texts(self, docs:List[str], metas:List[Dict]):
"""Encodes texts and adds them to the FAISS index and metadata store."""
if not self.enabled or not docs:
return
try:
vecs = np.asarray(self.gec.encode(docs), dtype="float32")
self.index.add(vecs)
self.metadata.extend(metas)
self._save()
except Exception as e:
logging.error(f"[CurveStore] Failed to add texts to index: {e}")
def _save(self):
"""Atomically saves the FAISS index and metadata to disk."""
if not self.enabled:
return
try:
# Save index
tmp_idx = self.idx_path + ".tmp"
faiss.write_index(self.index, tmp_idx)
os.replace(tmp_idx, self.idx_path)
# Save metadata
tmp_meta = self.meta_path + ".tmp"
with open(tmp_meta, 'w', encoding='utf-8') as f:
for m in self.metadata:
f.write(json.dumps(m, ensure_ascii=False) + "\n")
os.replace(tmp_meta, self.meta_path)
except Exception as e:
logging.error(f"[CurveStore] Failed to save index/metadata: {e}")
def search(self, query:str, k:int=6)->List[Dict]:
metas, _ = self.search_with_scores(query, k)
return metas
def search_with_scores(self, query:str, k:int=6):
if not self.enabled or self.index is None or self.index.ntotal == 0:
return [], []
try:
qv = np.asarray(self.gec.encode([query]), dtype="float32")
distances, indices = self.index.search(qv, k)
metas, scores = [], []
query_len = len(query.split())
for idx, dist in zip(indices[0], distances[0]):
if 0 <= idx < len(self.metadata):
meta = self.metadata[idx]
# Simple penalty for long snippets on short queries
text_len = len(meta.get("text", "").split())
penalty = 0.0
if query_len < 4 and text_len > 100:
penalty = 0.15 * (min(text_len, 400) / 400)
# FAISS L2 distance needs conversion to similarity score (0-1)
# This is a heuristic; for IP (dot product) it's direct.
score = max(0.0, 1.0 - (dist / max(1.0, self.index.ntotal)))
metas.append(meta)
scores.append(float(max(0.0, min(1.0, score - penalty))))
return metas, scores
except Exception as e:
logging.error(f"[CurveStore] Search failed: {e}")
return [], []
OFFLINE_MARK = os.path.join(CFG["CURVE_DIR"], ".offline_ready")
def _curves_ready(curve_dir:str)->bool:
idx=os.path.join(curve_dir,"faiss.index")
if os.path.exists(OFFLINE_MARK):
try: return json.load(open(OFFLINE_MARK)).get("ok",True)
except Exception: return True
if os.path.exists(idx):
try: return faiss.read_index(idx).ntotal>0
except Exception: return False
return False
def _mark_offline_ready():
try: json.dump({"ok":True,"ts":time.time()}, open(OFFLINE_MARK,"w",encoding="utf-8"))
except Exception: pass
# ----------- HF Datasets bootstrap -----------
DEFAULT_SOURCES = [
"HuggingFaceFW/finepdfs",
"HuggingFaceM4/FineVision",
"Helsinki-NLP/opus-100",
"facebook/flores",
"HuggingFaceH4/Multilingual-Thinking",
"tatsu-lab/alpaca",
"bigscience/xP3",
"allenai/sciq",
"allenai/c4",
"mozilla-foundation/common_voice_17_0",
"jhu-clsp/jfleg",
"bea2019st/wi_locness",
"fce-m72109/mascorpus",
"bene-ges/en_cmudict",
"tobacco/word2vec",
"allenai/peer_read",
"openslr/librispeech_asr",
"EmpathaticEmbodiedAI/EmpathyRobot",
"conceptnet5/conceptnet5",
"antokun/glove.6B.50d",
"OpenRL/daily_dialog",
"tetti/spelling-dataset-extended",
"rajpurkar/squad_v2",
"lmms-lab/vocalsound",
"grammarly/coedit",
]
def _atomic_write_json(path, data):
tmp = str(path) + f".tmp_{int(time.time())}"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
# Smart token joiner to avoid spaces before punctuation or contractions.
def _smart_join(existing: str, piece: str) -> str:
"""Join streamed token `piece` onto `existing` with correct spacing.
Rules:
- If existing is empty, return piece as-is.
- If piece starts with whitespace, append directly.
- If piece starts with an apostrophe or a closing punctuation, append directly (e.g., "I" + "'ve" -> "I've").
- If both existing's last char and piece's first char are alphanumeric, insert a space.
- Otherwise append without extra space.
"""
if not existing:
return piece
if not piece:
return existing
# preserve explicit leading whitespace, but normalize excessive newlines/spaces
if piece[0].isspace():
# If piece starts with newline(s), collapse to a single newline and ensure separation
if piece.lstrip().startswith('\n') or piece.startswith('\n') or '\n' in piece[:2]:
# collapse any leading whitespace/newlines to a single newline
lead = '\n'
# ensure existing ends with exactly one newline
existing = existing.rstrip('\n') + '\n'
return existing + piece.lstrip('\r\n')
# For ordinary leading spaces/tabs, ensure a single separating space
return existing.rstrip(' ') + ' ' + piece.lstrip()
first = piece[0]
last = existing[-1]
# Characters that should not be separated from the previous token
no_space_prefix = {"'", "’"}
# punctuation that normally attaches to previous token
attach_to_prev = {".", ",", "!", "?", ";", ":", "%", ")"}
# If piece begins with an apostrophe (contraction) or common closing punctuation,
# attach directly to the previous token (e.g., "I" + "'ve" -> "I've").
if first in no_space_prefix or first in attach_to_prev:
# remove an accidental space before punctuation
if existing.endswith(' '):
existing = existing[:-1]
return existing + piece
# If the previous token is an opening bracket or slash, attach directly
if last in {"(", "[", "/"}:
return existing + piece
# If both sides are alphanumeric, we need a space between words
if last.isalnum() and first.isalnum():
return existing + " " + piece
# Default: attach without extra space (covers punctuation like "'s" already handled)
# Trim duplicate spaces if any
if existing.endswith(' ') and piece.startswith(' '):
return existing.rstrip(' ') + piece.lstrip(' ')
return existing + piece
# Extract the assistant's reply from a possibly-verbose model output
def _extract_assistant_reply(text: str) -> str:
"""Return the assistant reply nearest to the last user turn.
Strategy:
1. Normalize role markers (case-insensitive variants of 'User' and 'Assistant').
2. Find the last 'User' marker and take the slice after it.
3. Inside that slice, if there's an 'Assistant' marker, take text after the first 'Assistant' marker up to the next role marker.
4. If no 'User' marker found, fall back to taking the last 'Assistant' block.
5. If nothing matches, return the first line or the whole text trimmed.
"""
if not text:
return ""
t = text
# Normalize to uniform markers to simplify searching
norm = re.sub(r"\r\n", "\n", t)
# Define role patterns (case-insensitive)
role_pat = re.compile(r"^(user|assistant)\s*:\s*", flags=re.I | re.M)
# Find all role marker matches with positions
markers = []
for m in re.finditer(r"(?i)\b(User|Assistant)\b\s*:\s*", norm):
markers.append((m.group(1).lower(), m.start(), m.end()))
# Helper to slice out a block between positions
def slice_between(start_idx, end_idx=None):
return norm[start_idx:end_idx].strip()
# If we have markers, try to locate the last user marker
if markers:
last_user_idx = None
for role, s, e in markers:
if role == 'user':
last_user_idx = e
if last_user_idx is not None:
# Take text after the last user marker
tail = norm[last_user_idx:]
# If an assistant marker appears in tail, take text after it
m = re.search(r"(?i)\bAssistant\b\s*:\s*", tail)
if m:
# start after this assistant marker
start = last_user_idx + m.end()
# find next role marker after start
nxt = re.search(r"(?i)\b(User|Assistant)\b\s*:\s*", norm[start:])
end = start + nxt.start() if nxt else None
return (norm[start:end].strip())
else:
# No assistant marker; return initial portion of tail up to next role marker
nxt = re.search(r"(?i)\b(User|Assistant)\b\s*:\s*", tail)
if nxt:
return tail[:nxt.start()].strip()
return tail.strip()
# No user found; fallback to last assistant block
last_assistant_idx = None
for role, s, e in markers:
if role == 'assistant':
last_assistant_idx = e
if last_assistant_idx is not None:
return slice_between(last_assistant_idx)
# As a final fallback, return the first non-empty line
for line in norm.splitlines():
if line.strip():
return line.strip()
return norm.strip()
# Normalize incoming `history` structures and return only prior user messages.
def _normalize_history_to_user_messages(history, max_user_turns: int = 8) -> list:
"""Accept many history formats and return a list of the last N user message strings.
Supported input formats:
- List[dict] with keys 'role' and 'content'
- List[ (user_str, assistant_str), ... ] (gr.Chatbot style)
- Flat list of strings (assumed to be alternating user/assistant or only user)
- None -> []
The function will never return assistant/model outputs as context.
"""
if not history:
return []
user_msgs = []
try:
# If list of dicts with 'role' and 'content'
if isinstance(history, list) and history and isinstance(history[0], dict):
for item in history:
try:
role = item.get('role', '').lower()
content = item.get('content', '')
except Exception:
continue
if role == 'user' and content:
user_msgs.append(str(content))
# If list of pairs (user, assistant) as produced by some Chatbot components
elif isinstance(history, list) and history and isinstance(history[0], (list, tuple)):
for pair in history:
if not pair: continue
# first element assumed to be user text
u = pair[0]
if u and isinstance(u, str):
user_msgs.append(u)
# Flat list of strings
elif isinstance(history, list) and all(isinstance(x, str) for x in history):
# Try to heuristically select user turns: if there's even length, assume alternating user/assistant, pick even indices
for i, s in enumerate(history):
if i % 2 == 0:
user_msgs.append(s)
else:
# Unknown format: try to stringify any 'content' attributes we can find
for item in history:
if isinstance(item, str):
user_msgs.append(item)
elif isinstance(item, dict) and item.get('role','').lower() == 'user':
user_msgs.append(item.get('content',''))
except Exception:
pass
# Keep only last `max_user_turns` and strip whitespace
user_msgs = [u.strip() for u in user_msgs if u and u.strip()]
return user_msgs[-max_user_turns:]
def _final_sanitize_reply(text: str) -> str:
"""Final defensive sanitization to ensure we return only the first assistant reply.
Steps:
- Normalize newlines.
- Use _extract_assistant_reply to get the assistant block nearest the last user turn.
- Remove any remaining role markers occurring inside the reply.
- Keep only the first paragraph (up to the first blank line) to avoid multi-turn continuation.
- Trim leading/trailing whitespace.
"""
if not text:
return ""
try:
norm = re.sub(r"\r\n", "\n", text)
# First, try to extract the assistant block
ans = _extract_assistant_reply(norm)
if not ans:
ans = norm
# Remove any inline role markers like 'User:' or 'Assistant:'
ans = re.sub(r"(?i)\b(User|Assistant)\b\s*:\s*", "", ans)
# Trim to first paragraph (split by two or more newlines)
parts = re.split(r"\n\s*\n+", ans)
if parts:
ans = parts[0]
# Also, if still contains role markers (sign of transcript), aggressively keep only the first coherent sentence/line
if re.search(r"(?i)(User|Assistant)\b", ans):
# Prefer first sentence; if none, take first non-empty line
m = re.search(r"([\S\s]*?[\.\!\?])(?:\s|$)", ans)
if m:
ans = m.group(1)
else:
# fallback to first non-empty line
for line in ans.splitlines():
if line.strip():
ans = line.strip(); break
return ans.strip()
except Exception:
# Fallback minimal cleanup
s = re.sub(r"(?i)\b(User|Assistant)\b\s*:\s*", "", text)
s = re.split(r"\n\s*\n+", s)[0]
return s.strip()
def _load_json(path, default):
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return default
return default
def _save_json(path, data):
# This function is not defined in the provided code. Assuming it should be _atomic_write_json
_atomic_write_json(path, data)
class KnowledgeStore:
def __init__(self, storage_path: str):
self.base = _Path(storage_path)
self.knowledge_dir = self.base / "knowledge"
self.chunks_dir = self.knowledge_dir / "chunks"
self.curves_dir = self.base / "curves"
for d in [self.knowledge_dir, self.chunks_dir, self.curves_dir]:
d.mkdir(parents=True, exist_ok=True)
self.manifest_path = self.knowledge_dir / "knowledge_manifest.json"
self.embedding_queue_path = self.knowledge_dir / "embedding_queue.jsonl"
self._lock = threading.RLock()
self._load_manifest()
def _load_manifest(self):
with self._lock:
if self.manifest_path.exists():
try:
with open(self.manifest_path, 'r', encoding='utf-8') as f:
self.manifest = json.load(f)
except json.JSONDecodeError:
self.manifest = self._default_manifest()
else:
self.manifest = self._default_manifest()
self._save_manifest()
def _default_manifest(self):
return {
"total_chunks": 0, "total_texts": 0, "chunks_by_tag": {},
"chunks_by_scope": {}, "chunk_index": {}, "last_vector_build": 0,
"vector_count": 0
}
def _save_manifest(self):
with self._lock:
_atomic_write_json(self.manifest_path, self.manifest)
def _normalize_text(self, text: str) -> str:
return unicodedata.normalize("NFC", text).strip()
def _chunk_text(self, text: str, target_size: int = 1000) -> List[str]:
# Simple sentence-based chunking for now.
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks, current_chunk = [], ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > target_size:
if current_chunk: chunks.append(current_chunk)
current_chunk = sentence
else:
current_chunk += (" " + sentence) if current_chunk else sentence
if current_chunk: chunks.append(current_chunk)
return chunks
def ingest_text(self, text: str, tag: str="ingest", scope: str="general", metadata: Optional[Dict]=None) -> Optional[str]:
with self._lock:
normalized = self._normalize_text(text)
if not normalized: return None
texts = self._chunk_text(normalized)
if not texts: return None
chunk_id = f"chunk_{int(time.time())}_{hashlib.sha1(texts[0].encode('utf-8')).hexdigest()[:8]}"
chunk_data = {
"chunk_id": chunk_id, "timestamp": time.time(), "tag": tag, "scope": scope,
"text_count": len(texts), "texts": texts, "metadata": metadata or {},
"quality_score": 0.7, "importance_score": 0.5, # Defaults
"embeddings_generated": False
}
chunk_file = self.chunks_dir / f"{chunk_id}.json"
_atomic_write_json(chunk_file, chunk_data)
# Update manifest
self.manifest["total_chunks"] += 1
self.manifest["total_texts"] += len(texts)
self.manifest.setdefault("chunks_by_tag", {}).setdefault(tag, []).append(chunk_id)
self.manifest.setdefault("chunks_by_scope", {}).setdefault(scope, []).append(chunk_id)
self.manifest.setdefault("chunk_index", {})[chunk_id] = {
"timestamp": chunk_data["timestamp"], "tag": tag, "scope": scope,
"text_count": len(texts), "quality_score": chunk_data["quality_score"]
}
self._save_manifest()
# Enqueue for embedding
# Prepare per-text metadata entries and persist them to the knowledge curves folder
metas = []
for idx, t in enumerate(texts):
metas.append({
'id': f"{chunk_id}_{idx}",
'chunk_id': chunk_id,
'text': t,
'timestamp': chunk_data['timestamp'],
})
try:
meta_file = self.curves_dir / "meta.jsonl"
with open(meta_file, 'a', encoding='utf-8') as f:
for m in metas:
f.write(json.dumps(m, ensure_ascii=False) + "\n")
except Exception:
# If we can't write meta.jsonl, continue and still queue embeddings
pass
try:
with open(self.embedding_queue_path, "a", encoding="utf-8") as f:
f.write(json.dumps({"chunk_id": chunk_id, "status": "queued"}) + "\n")
except Exception:
pass
return chunk_id
# ----------- voice: ASR/TTS/phonics -----------
G2P = G2p() if G2p is not None else None
class ASRService:
"""Handles ASR, including transcription and language detection."""
def __init__(self):
# This will be initialized in the VoiceServicesModule
self.model = get_asr()
def transcribe(self, audio_path: str, uid: Optional[str], forced_lang: Optional[str] = None) -> dict:
prior = _load_json(ADAPT_DB, {}).get(uid or "guest", {}).get("lang_prior")
language = forced_lang or prior or None
# Assuming get_asr() returns a valid model object
segs, info = self.model.transcribe(audio_path, language=language, beam_size=5, vad_filter=True)
text = " ".join([s.text for s in segs]).strip()
detected_lang = info.language
if not forced_lang and text:
prof = _load_json(ADAPT_DB, {})
p = prof.get(uid or "guest", {})
p["lang_prior"] = detected_lang
prof[uid or "guest"] = p
_save_json(ADAPT_DB, prof)
return {"text": text, "language": detected_lang, "confidence": info.language_probability, "segments": [{"start": s.start, "end": s.end, "text": s.text} for s in segs]}
ASR_MODELS={"tiny":"tiny","base":"base","small":"small","medium":"medium","large":"large-v3"}
def _asr_model_name(): return ASR_MODELS.get(CFG["VOICE_ASR_MODEL"],"small")
_ASR=None
def get_asr():
global _ASR
if _ASR is not None: return _ASR
if WhisperModel is None:
raise RuntimeError("faster-whisper not installed. Install it with: pip install faster-whisper")
size=_asr_model_name(); device="cuda" if (_has_gpu_env()) else "cpu"
compute_type="float16" if device=="cuda" else "int8"
_ASR=WhisperModel(size, device=device, compute_type=compute_type); return _ASR
PIPER_MODELS={
"en": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/amy/low/en_US-amy-low.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/amy/low/en_US-amy-low.onnx.json"),
"es": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx.json"),
"fr": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/fr/fr_FR/gilles/medium/fr_FR-gilles-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/fr/fr_FR/gilles/medium/fr_FR-gilles-medium.onnx.json"),
"de": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/de/de_DE/thorsten-deepbinner/low/de_DE-thorsten-deepbinner-low.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/de/de_DE/thorsten-deepbinner/low/de_DE-thorsten-deepbinner-low.onnx.json"),
"zh": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/low/zh_CN-huayan-low.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/low/zh_CN-huayan-low.onnx.json"),
"ar": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/ar/ar_JO/farah/medium/ar_JO-farah-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/ar/ar_JO/farah/medium/ar_JO-farah-medium.onnx.json"),
"pt": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/pt/pt_BR/edresson/low/pt_BR-edresson-low.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/pt/pt_BR/edresson/low/pt_BR-edresson-low.onnx.json"),
"ko": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/ko/ko_KR/minji/low/ko_KR-minji-low.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/ko/ko_KR/minji/low/ko_KR-minji-low.onnx.json"),
"ru": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/ru/ru_RU/irina/medium/ru_RU-irina-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/ru/ru_RU/irina/medium/ru_RU-irina-medium.onnx.json"),
"hi": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/hi/hi_IN/hindu/medium/hi_IN-hindu-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/hi/hi_IN/hindu/medium/hi_IN-hindu-medium.onnx.json"),
"bn": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/bn/bn_BD/nisingha/medium/bn_BD-nisingha-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/bn/bn_BD/nisingha/medium/bn_BD-nisingha-medium.onnx.json"),
"ja": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/ja/ja_JP/takumi/medium/ja_JP-takumi-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/ja/ja_JP/takumi/medium/ja_JP-takumi-medium.onnx.json"),
"sw": ("https://huggingface.co/rhasspy/piper-voices/resolve/main/sw/sw_KE/pwh/medium/sw_KE-pwh-medium.onnx",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/sw/sw_KE/pwh/medium/sw_KE-pwh-medium.onnx.json"),
}
def _download(url,dst, timeout=30): # type: ignore
if os.path.exists(dst): return dst
os.makedirs(os.path.dirname(dst),exist_ok=True); urllib.request.urlretrieve(url,dst); return dst # TODO: add timeout
_TTS_CACHE={}
def get_tts(lang: str = "en") -> PiperVoice: # type: ignore
if PiperVoice is None:
raise RuntimeError("piper-tts not installed. Install it with: pip install piper-tts")
lang=lang if lang in PIPER_MODELS else "en"
if lang in _TTS_CACHE: return _TTS_CACHE[lang]
mu,cu=PIPER_MODELS[lang]; m=_download(mu,f"./models/piper/{os.path.basename(mu)}"); c=_download(cu,f"./models/piper/{os.path.basename(cu)}")
v=PiperVoice.load(m,c); _TTS_CACHE[lang]=v; return v
def _embed_mfcc(path):
y, sr = librosa.load(path, sr=16000)
mf=librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
return mf.mean(axis=1)
def enroll_voice(uid:str, path:str) -> bool:
db=_load_json(VOICES_DB, {}); db[uid]=_embed_mfcc(path).astype(float).tolist(); _save_json(VOICES_DB, db); return True
def identify_voice(path:str, threshold:float=0.70) -> Optional[str]:
db=_load_json(VOICES_DB, {});
if not db: return None
emb=_embed_mfcc(path).reshape(1,-1)
keys=list(db.keys()); mats=np.array([db[k] for k in keys])
sims=cosine_similarity(emb, mats)[0]; i=int(np.argmax(sims)); return keys[i] if sims[i]>=threshold else None
_BASIC={'a':'a as in apple /æ/','e':'e as in elephant /ɛ/','i':'i as in igloo /ɪ/','o':'o as in octopus /ɒ/','u':'u as in umbrella /ʌ/',
'c':'c as in cat /k/ (before e/i/y often /s/)','g':'g as in goat /g/ (before e/i/y often soft /dʒ/)','y':'y as in yellow /j/ or happy /i/'}
def phonics(word:str)->str:
toks=G2P(word); phones=[t for t in toks if re.match(r"[A-Z]+[0-2]?$", t)]
hints=[];
for ch in word.lower():
if ch in _BASIC and _BASIC[ch] not in hints: hints.append(_BASIC[ch])
return f"Phonemes: {' '.join(phones)} | Hints: {('; '.join(hints)) if hints else '🐝'}"
def lid_chunk(text:str, min_len:int=12)->List[Tuple[str,str]]:
parts=re.split(r"([.!?;\u2026\u2028\u2029])+\s{2,}|", text)
chunks=[]; buf=""
for p in parts:
if not p: continue
buf+=p
if len(buf)>=min_len or re.match(r"[.!?;\u2026\u2028\u2029]", p):
lang,_=langid.classify(buf.strip()); chunks.append((buf.strip(), lang)); buf=""
if buf.strip():
lang,_=langid.classify(buf.strip()); chunks.append((buf.strip(), lang))
return chunks
async def web_search_snippets(query: str, max_results: int = 5, timeout: int = 8) -> List[Dict]:
"""Performs a web search and returns snippets."""
if DDGS is None:
print("[WebSearch] duckduckgo_search is not installed. Skipping web search.")
return []
results = []
try:
# The `duckduckgo-search` library was renamed to `ddgs` and the API changed.
# The async `atext` method is now a synchronous `text` method.
from ddgs import DDGS as DDGS_SYNC
with DDGS_SYNC(timeout=timeout) as ddgs:
for r in ddgs.text(query, max_results=max_results):
results.append(r)
except Exception as e:
print(f"[WebSearch] Error during web search: {e}")
return results
class TTSService:
"""Manages TTS models and synthesizes speech, handling different languages and environments."""
def __init__(self):
self.voices = {}
self.default_lang = CFG.get("TTS_LANG", "en")
self.has_piper = PiperVoice is not None
def _get_voice(self, lang: str):
"""Safely load a Piper voice model, returning None if unavailable."""
if not self.has_piper:
return None
lang = lang if lang in PIPER_MODELS else self.default_lang
if lang not in self.voices:
try:
self.voices[lang] = get_tts(lang)
except Exception as e:
print(f"[TTSService] Error loading voice for '{lang}': {e}")
return None
return self.voices.get(lang)
def _generate_sine_wave_audio(self, text: str) -> Optional[str]:
"""Generate a fallback sine-wave audio file as an audible cue.
This ensures audio feedback even when Piper TTS is unavailable.
"""
try:
import wave, struct, math
out_path = os.path.join(tempfile.gettempdir(), f"hive_tts_{int(time.time())}.wav")
sr = 22050
duration = max(0.6, min(3.0, len(text) / 40.0))
freq = 440.0
nframes = int(sr * duration)
amplitude = 16000
with wave.open(out_path, 'w') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sr)
for i in range(nframes):
value = int(amplitude * math.sin(2.0 * math.pi * freq * (i / sr)))
data = struct.pack('<h', value)
wf.writeframesraw(data)
return out_path
except Exception as e:
print(f"[TTSService] Fallback sine-wave generation failed: {e}")
return None
def synthesize(self, text: str, uid: Optional[str]) -> Optional[str]:
"""Synthesize text to speech with graceful fallback for all environments.
Returns path to audio file, or None if synthesis impossible.
"""
if not text or not text.strip():
return None
lang = _load_json(ADAPT_DB, {}).get(uid or "guest", {}).get("lang_prior", self.default_lang)
voice = self._get_voice(lang)
out_path = os.path.join(tempfile.gettempdir(), f"hive_tts_{int(time.time())}.wav")
# Try Piper TTS if available
if voice:
try:
# PiperVoice.synthesize signature can vary; handle common patterns
data = voice.synthesize(text)
# If synth returns (audio, sr)
if isinstance(data, tuple) and len(data) == 2:
audio, sr = data
try:
if sf is not None:
sf.write(out_path, audio, sr)
return out_path
except Exception as e:
print(f"[TTSService] soundfile write failed: {e}")
# If synth writes to file-like or path
try:
# Some PiperVoice implementations accept a file-like object
with open(out_path, 'wb') as f:
voice.synthesize(text, f)
return out_path
except Exception as e:
print(f"[TTSService] Piper file write failed: {e}")
except Exception as e:
print(f"[TTSService] Piper synthesis failed: {e}")
# Fallback: generate a sine-wave audio file as an audible cue
# This ensures ALL environments get audio feedback
return self._generate_sine_wave_audio(text)
def synthesize_multilang(text:str, fallback="en")->str:
"""Synthesizes text with multiple languages by chunking and using appropriate voices."""
chunks = lid_chunk(text)
if not chunks:
chunks = [(text, fallback)]
audio_segments = []
for chunk_text, lang in chunks:
voice = get_tts(lang)
audio, _ = voice.synthesize(chunk_text)
audio_segments.append(audio)
mix = np.concatenate(audio_segments) if audio_segments else np.zeros(1)
sr = get_tts(fallback).sample_rate # Use fallback sample rate for consistency
outp=os.path.join(tempfile.gettempdir(), f"hive_tts_{int(time.time())}.wav")
sf.write(outp, mix if mix is not None else np.zeros(1), sr or 22050, subtype="PCM_16"); return outp
# ----------- compiler / engine -----------
class EngineCurve:
def __init__(self):
self.stats={"runs":0,"ok":0,"latency_ms":[]}
self.router_rules=[]
def choose_route(self, msg:str)->str:
# This is a simplified version. The full logic is now in IntentRouter.
return "tutor"
def run(self, message:str, snippets:List[Dict])->Dict: return {"ok":True,"route":"tutor"}
# ----------- wifi auto-connect (non-blocking) -----------
NET_STATE_DB=os.path.join(CFG["STATE_DIR"],"wifi_known.json")
def _os_name(): return platform.system().lower()
def _fast_probe(host="8.8.8.8", port=53, timeout=1.5) -> bool:
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM); s.connect((host, port)); s.close()
return True
except Exception:
return False
def _http_probe(url="https://huggingface.co", timeout=2.5)->float:
try:
t0=time.time(); r=requests.head(url, timeout=timeout)
if r.status_code<500: return (time.time()-t0)*1000.0
except Exception: pass
return -1.0
def _load_known()->List[dict]:
data=_load_json(NET_STATE_DB, []); out=[]
for d in data:
if isinstance(d,dict) and "ssid" in d:
out.append({"ssid":d["ssid"],"priority":int(d.get("priority",0))})
out.sort(key=lambda x: x.get("priority",0), reverse=True); return out
def _get_saved_password(ssid:str)->Optional[str]:
if keyring:
try: return keyring.get_password("hive_wifi", ssid) or "" # type: ignore
except Exception: return None
return None
def _connect_linux(ssid, password, timeout=12)->Tuple[bool,str]:
try:
cmd=["nmcli","device","wifi","connect",ssid]+(["password",password] if password else [])
p=subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return (p.returncode==0), (p.stdout or p.stderr or "").strip()
except Exception as e: return False, f"nmcli error: {e}"
def _connect_windows(ssid, password)->Tuple[bool,str]:
try:
p=subprocess.run(["netsh","wlan","connect","name="+ssid,"ssid="+ssid], capture_output=True, text=True)
if p.returncode==0 and "success" in (p.stdout+p.stderr).lower(): return True,"Connected."
if not password: return False,"No saved password."
xml=f'''<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>{ssid}</name><SSIDConfig><SSID><name>{ssid}</name></SSIDConfig>
<connectionType>ESS</connectionType><connectionMode>auto</connectionMode>
<MSM><security><authEncryption><authentication>WPA2PSK</authentication>
<encryption>AES</encryption><useOneX>false</useOneX></authEncryption>
<sharedKey><keyType>passPhrase</keyType><protected>false</protected>
<keyMaterial>{password}</keyMaterial></sharedKey></security></MSM></WLANProfile>'''
tmp=os.path.join(os.getenv("TEMP","/tmp"), f"wifi_{int(time.time())}.xml"); open(tmp,"w",encoding="utf-8").write(xml)
a=subprocess.run(["netsh","wlan","add","profile","filename="+tmp,"user=all"], capture_output=True, text=True)
if a.returncode!=0: return False, a.stderr or a.stdout or "add profile failed"
c=subprocess.run(["netsh","wlan","connect","name="+ssid,"ssid="+ssid], capture_output=True, text=True)
return (c.returncode==0), (c.stderr or c.stdout or "").strip()
except Exception as e: return False, f"netsh error: {e}"
def _connect_macos(ssid, password)->Tuple[bool,str]:
try:
out=subprocess.check_output(["networksetup","-listallhardwaresports"], stderr=subprocess.DEVNULL).decode("utf-8","ignore")
dev=None
for block in out.split("\n\n"):
if "Wi-Fi" in block or "AirPort" in block:
for l in block.splitlines():
if l.strip().startswith("Device:"): dev=l.split(":",1)[1].strip(); break
if dev: break
if not dev: return False,"Wi-Fi device not found"
cmd=["networksetup","-setairportnetwork",dev, ssid]+([password] if password else [])
p=subprocess.run(cmd, capture_output=True, text=True)
return (p.returncode==0), (p.stderr or p.stdout or "").strip()
except Exception as e: return False, f"networksetup error: {e}"
def _connect_os(ssid,password,timeout=12)->Tuple[bool,str]:
osn=_os_name()
if osn=="linux": return _connect_linux(ssid,password,timeout)
if osn=="windows": return _connect_windows(ssid,password)
if osn=="darwin": return _connect_macos(ssid,password)
return False, f"Unsupported OS: {osn}"
class AutoConnector:
def __init__(self):
self.last_attempt=0.0; self.cooldown_s=30.0; self.per_ssid_timeout=10.0; self.total_budget_s=18.0; self.thread=None; self._lock=threading.Lock()
def online_quick(self)->bool: return _fast_probe(timeout=1.2)
def quality_ms(self)->float: return _http_probe(timeout=2.0)
def _run_once(self):
if self.online_quick(): return
known=_load_known();
if not known: return
t_start=time.time()
for item in known:
if time.time()-t_start>self.total_budget_s: return
ssid=item["ssid"]; pw=_get_saved_password(ssid)
ok,_msg=_connect_os(ssid,pw,timeout=int(self.per_ssid_timeout))
if ok and self.online_quick(): return
def kick_async(self):
with self._lock:
now=time.time()
if now - self.last_attempt < self.cooldown_s: return
self.last_attempt=now
if self.thread and self.thread.is_alive(): return
self.thread = threading.Thread(target=self._run_once, daemon=True); self.thread.start()
NET = AutoConnector()
def _has_gpu_env() -> bool:
"""Global helper to check for GPU environment."""
return EnvDetector()._has_gpu_env()
# ----------- coverage heuristic -----------
def coverage_score_from_snippets(snippets: list, scores: list) -> float:
if not snippets or not scores: return 0.0
s = sorted(scores, reverse=True)[:3]
base = sum(s) / len(s) if s else 0.0 # type: ignore
bonus = min(0.15, 0.03 * len(snippets))
return float(max(0.0, min(1.0, base + bonus)))
# ----------- RBAC / users / lockouts (Restored) -----------
USERS_DB=os.path.join(CFG["STATE_DIR"],"users.json")
LOCKS_DB=os.path.join(CFG["STATE_DIR"],"lockouts.json")
VOICES_DB=os.path.join(CFG["STATE_DIR"],"voices.json")
ADAPT_DB=os.path.join(CFG["STATE_DIR"],"speech_adapt.json")
def _init_users():
d={"owner":{"id":"owner:1","name":CFG["OWNER_NAME"],"role":"owner","pass":CFG["OWNER_PASS"],"second":CFG["OWNER_SECOND"],"prefs":{"activation_names":[CFG["AGENT_NAME"]],"language":"en"}},
"admins_super":[],"admins_general":[],"users":[]}
_save_json(USERS_DB,d); return d
def _load_users():
d=_load_json(USERS_DB, None); return d if d else _init_users()
def _find_user(d, name_or_id):
pools=[("owner",[d.get("owner")]),("admin_super",d.get("admins_super", [])),("admin_general",d.get("admins_general", [])),("user",d.get("users", []))]
for role,pool in pools:
for u in pool or []:
if u and (u.get("id")==name_or_id or u.get("name")==name_or_id): return u, role
return None, None
PERMS={
"owner":{"can_add":["admin_super","admin_general","user"],"can_remove":["admin_super","admin_general","user"],
"can_edit_role_of":["admin_super","admin_general","user"],"can_edit_profile_of":["owner","admin_super","admin_general","user"],
"can_view_scopes":"all","maintenance":"full","code_edit":"approve_and_edit"},
"admin_super":{"can_add":["admin_general","user"],"can_remove":["admin_general","user"],
"can_edit_role_of":["admin_general","user"],"can_edit_profile_of":["admin_general","user"],
"can_view_scopes":"self_only","maintenance":"advanced","code_edit":"suggest_only"},
"admin_general":{"can_add":["user"],"can_remove":["user"],"can_edit_role_of":["user"],"can_edit_profile_of":["user"],
"can_view_scopes":"self_only","maintenance":"basic","code_edit":"suggest_only"},
"user":{"can_add":[],"can_remove":[],"can_edit_role_of":[],"can_edit_profile_of":["user"],
"can_view_scopes":"self_only","maintenance":"none","code_edit":"none"},
"guest":{"can_add":[],"can_remove":[],"can_edit_role_of":[],"can_edit_profile_of":[],
"can_view_scopes":"self_only","maintenance":"none","code_edit":"none"},
}
def attempt_login(name_or_id:str, password:str="", second:Optional[str]=None):
d=_load_users(); locks=_load_json(LOCKS_DB,{ })
def lock_fail(lid, msg):
st=locks.get(lid, {"fails":0,"until":0}); st["fails"]=st.get("fails",0)+1; dur=180 if st["fails"]>=3 else 0; st["until"]=time.time()+dur if dur else 0
locks[lid]=st; _save_json(LOCKS_DB,locks); return False, msg
u,_=_find_user(d, name_or_id)
if not u: return False, "Profile not found."
role=u.get("role","user"); lid=str(u.get("id", u.get("name"))); now=time.time(); st=locks.get(lid, {"fails":0,"until":0})
if now < st.get("until",0): return False, f"Locked; try again in ~{int(st['until']-now)}s."
if role in ("admin_general","admin_super","owner") and (password!=u.get("pass") or (role=="owner" and u.get("second") and second!=u.get("second"))): return lock_fail(lid, "Credentials incorrect.")
locks[lid]={"fails":0,"until":0}; _save_json(LOCKS_DB,locks); return True, f"Welcome, {u.get('name')} ({role})."
# ----------- overlay / hotpatch -----------
RUNTIME_OVERRIDES = os.path.join(HIVE_HOME, "system", "runtime_overrides.json")
ALLOWED_PATCH_KEYS={"prompt_head","retrieval_k","token_budget","temperature","router_rules","web_threshold"}
def _load_overrides():
if os.path.exists(RUNTIME_OVERRIDES):
try: return json.load(open(RUNTIME_OVERRIDES,"r",encoding="utf-8"))
except Exception: return {}
return {}
def _save_overrides(ovr:dict):
_atomic_write_json(RUNTIME_OVERRIDES, ovr)
class RuntimeOverlay:
def __init__(self): self.ovr=_load_overrides()
def apply_to(self, hive: "Hive"):
o=self.ovr or {}
if isinstance(o.get("prompt_head"),str) and hasattr(hive, 'compiler'): hive.compiler.override_head=o["prompt_head"]
if isinstance(o.get("token_budget"),int) and hasattr(hive, 'compiler'): hive.compiler.override_budget=max(256, min(8192, o["token_budget"]))
hive.retrieval_k=int(o.get("retrieval_k",6)); hive.retrieval_k=max(3,min(24,hive.retrieval_k))
hive.decoding_temperature=float(o.get("temperature",0.7)); hive.decoding_temperature=max(0.0,min(1.5,hive.decoding_temperature))
rr=o.get("router_rules") or []
if isinstance(rr,list) and hasattr(hive, 'engine') and hive.engine is not None:
try: hive.engine.router_rules=[re.compile(pat,re.I) for pat in rr if isinstance(pat,str) and pat]
except re.error: hive.engine.router_rules=[]
t=o.get("web_threshold",None); hive.web_threshold=float(t) if isinstance(t,(int,float)) else 0.40
def patch(self, patch:dict, actor_role:str="hive")->Tuple[bool,str]:
if not CFG["ALLOW_RUNTIME_HOTPATCH"]: return False,"Runtime hotpatch disabled."
if actor_role not in ("hive","admin_general","admin_super","owner"): return False,"Unauthorized actor."
for k in list(patch.keys()):
if k not in ALLOWED_PATCH_KEYS: patch.pop(k,None)
if not patch: return False,"No allowed keys."
self.ovr.update(patch); _save_overrides(self.ovr); return True,"Patched."
# ----------- safe reboot -----------
def _persist_before_reboot():
try: _atomic_write_json(os.path.join(HIVE_HOME, "system", "last_reboot.json"), {"ts":time.time(),"note":"self-reboot"})
except Exception: pass
def safe_reboot(reason:str="optimization"):
if not CFG["ALLOW_SELF_REBOOT"]: return False,"Self-reboot disabled."
_persist_before_reboot()
try:
os.execv(sys.executable, [sys.executable, os.path.abspath(__file__)] + sys.argv[1:])
except Exception:
os._exit(3)
return True, f"Rebooting: {reason}"
# ----------- self optimizer (bounded) -----------
class SelfOptimizer(threading.Thread): # type: ignore
def __init__(self, hive: "Hive"):
super().__init__(daemon=True); self.hive=hive; self.stop=False; self.tick=45.0
self.last_pkg_check = 0
self.last_code_review = 0
self.last_maintenance_run = 0
self.code_review_interval = 3600 * 24 # Check for self-improvement once a day
self.pkg_check_interval = 3600 * 6 # Check for package updates every 6 hours
def _check_for_package_updates(self):
"""Checks for updates to packages in the allowlist and proposes changes."""
if time.time() - self.last_pkg_check < self.pkg_check_interval:
return
self.last_pkg_check = time.time()
print("[SelfOptimizer] Checking for package updates...")
try:
# Use pip to check for outdated packages
outdated_raw = subprocess.check_output([sys.executable, "-m", "pip", "list", "--outdated"], text=True)
for line in outdated_raw.splitlines()[2:]: # Skip header
parts = line.split()
if len(parts) < 3: continue
pkg_name, current_ver, latest_ver = parts[0], parts[1], parts[2]
# If the outdated package is in our allowlist, prepare a proposal
if pkg_name in CFG["OPT_PKG_ALLOWLIST"]:
print(f"[SelfOptimizer] Found update for {pkg_name}: {current_ver} -> {latest_ver}")
proposal = ChangeProposal(
kind="package",
name=pkg_name,
version=latest_ver,
reason=f"Autonomous proposal to update from {current_ver} to {latest_ver}",
proposer="hive_optimizer"
)
# Only propose if changes and compiler modules are available
if hasattr(self.hive, 'changes') and self.hive.changes is not None and hasattr(self.hive, 'compiler') and self.hive.compiler is not None:
try:
proposal_id = self.hive.changes.propose(proposal)
# Automatically test the new proposal
test_result = self.hive.changes.test_and_compare(proposal_id, proposal)
print(f"[SelfOptimizer] Test result for {pkg_name} update: {test_result.get('passed')}, Delta: {test_result.get('delta')}")
except Exception as e:
print(f"[SelfOptimizer] Error proposing package update for {pkg_name}: {e}")
else:
print("[SelfOptimizer] changes or compiler module not available; skipping package proposal")
except Exception as e:
print(f"[SelfOptimizer] Error checking for package updates: {e}")
def _propose_self_improvement(self):
"""Asks the LLM to review a part of its own code and proposes a change if valid."""
if time.time() - self.last_code_review < self.code_review_interval:
return
self.last_code_review = time.time()
print("[SelfOptimizer] Performing autonomous code review...")
# Verify that chat and changes modules are available
if not hasattr(self.hive, 'chat') or not callable(getattr(self.hive, 'chat')) or not hasattr(self.hive, 'changes') or self.hive.changes is None:
print("[SelfOptimizer] chat or changes module not available; skipping self-improvement")
return
try:
# Read its own source code
with open(__file__, 'r', encoding='utf-8') as f:
own_code = f.read()
# Select a function to review (e.g., coverage_score_from_snippets)
target_func_name = "coverage_score_from_snippets"
match = re.search(rf"def {target_func_name}\(.*?^$", own_code, re.S | re.M)
if not match:
print(f"[SelfOptimizer] Could not find function {target_func_name} to review.")
return
func_code = match.group(0)
prompt = f"""
Review the following Python function for correctness, efficiency, and adherence to best practices.
If you find an improvement, provide ONLY the complete, new, improved function code. Do not add any explanation.
If no improvement is needed, return the original code exactly as it is.
Original function:
```python
{func_code}
```
"""
# Use the Hive's own chat method to get the LLM's suggestion
chat_result = self.hive.chat(prompt, "owner", "hive_optimizer")
response_stream, _ = chat_result
suggested_code = "".join(chunk for chunk in response_stream if chunk)
# If the suggestion is different and seems valid, propose it as a code change
if suggested_code.strip() != func_code.strip() and "def" in suggested_code:
new_source = own_code.replace(func_code, suggested_code)
proposal = ChangeProposal(kind="code", name=__file__, patch_text=new_source, reason=f"Autonomous self-improvement of {target_func_name}", proposer="hive_optimizer")
if hasattr(self.hive, 'changes') and self.hive.changes is not None:
try:
proposal_id = self.hive.changes.propose(proposal)
print(f"[SelfOptimizer] Proposing self-improvement change {proposal_id}.")
test_result = self.hive.changes.test_and_compare(proposal_id, proposal)
print(f"[SelfOptimizer] Test result for self-improvement: {test_result.get('passed')}, Delta: {test_result.get('delta')}")
except Exception as e:
print(f"[SelfOptimizer] Error proposing self-improvement: {e}")
else:
print("[SelfOptimizer] changes module not available; skipping code proposal")
except Exception as e:
print(f"[SelfOptimizer] Error during self-improvement proposal: {e}")
def _run_periodic_maintenance(self):
"""Performs periodic maintenance tasks like summarizing large conversation logs."""
interval_h = int(CFG.get("HIVE_MAINTENANCE_INTERVAL_H", 6))
if time.time() - self.last_maintenance_run < interval_h * 3600:
return
logging.info("[SelfOptimizer] Starting periodic maintenance run...")
self.last_maintenance_run = time.time()
try:
conv_dir = os.path.join(self.hive.config["HIVE_HOME"], "users", "conversations")
if not os.path.isdir(conv_dir):
return
log_summary_threshold_lines = 200 # Summarize logs longer than this
for filename in os.listdir(conv_dir):
if not filename.endswith(".jsonl"):
continue
log_path = os.path.join(conv_dir, filename)
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
if len(lines) > log_summary_threshold_lines:
logging.info(f"[SelfOptimizer] Summarizing long conversation log: {filename} ({len(lines)} lines)")
# Keep the most recent 20% of the conversation
keep_recent_count = len(lines) // 5
to_summarize_lines = lines[:-keep_recent_count]
recent_lines = lines[-keep_recent_count:]
# Format the old part for the summarizer
text_to_summarize = "\n".join([json.loads(line).get("text", "") + "\n" + json.loads(line).get("reply", "") for line in to_summarize_lines])
summary_text = self.hive.summarize_for_memory(text_to_summarize)
summary_line = json.dumps({"ts": time.time(), "user": "system", "text": "[CONVERSATION SUMMARY]", "reply": summary_text}) + "\n"
# Atomically rewrite the log file
_atomic_write_json(log_path, [summary_line] + recent_lines)
logging.info(f"[SelfOptimizer] Finished summarizing {filename}.")
except Exception as e:
logging.error(f"[SelfOptimizer] Error during periodic maintenance: {e}")
def run(self):
while not self.stop:
# Wait for the full hive instance to be ready before starting checks
if not self.hive.llm_ready.is_set():
logging.debug("[SelfOptimizer] Waiting for full Hive core to be ready...")
self.hive.llm_ready.wait(timeout=self.tick)
continue
time.sleep(self.tick)
if not CFG["AUTO_SELF_OPTIMIZE"]: continue
# --- Autonomous Proposal Generation & Maintenance ---
self._check_for_package_updates()
self._propose_self_improvement()
self._run_periodic_maintenance()
# --- Real-time Overlay Adjustments ---
vm=psutil.virtual_memory(); ovr={}
if vm.percent>88: # type: ignore
compiler = getattr(self.hive, 'compiler', None)
current_budget = compiler.override_budget if compiler and hasattr(compiler, 'override_budget') else CFG["CTX_TOKENS"]
ovr["token_budget"]=max(512,int(0.75*(current_budget or CFG["CTX_TOKENS"]))) # type: ignore
ovr["temperature"]=max(0.2,self.hive.decoding_temperature-0.1)
# Verify that required modules exist before accessing
if not hasattr(self.hive, 'compiler') or self.hive.compiler is None:
continue
if not hasattr(self.hive, 'engine') or self.hive.engine is None:
continue
if not hasattr(self.hive, 'overlay') or self.hive.overlay is None:
continue
vm=psutil.virtual_memory(); ovr={}
lat=(sum(self.hive.engine.stats["latency_ms"][-10:])/max(1,len(self.hive.engine.stats["latency_ms"][-10:]))) if self.hive.engine.stats["latency_ms"] else 0
if lat>1200: ovr["retrieval_k"]=max(3,getattr(self.hive, 'retrieval_k', 6)-1)
if ovr:
ok,_=self.hive.overlay.patch(ovr, actor_role="hive")
if ok: self.hive.overlay.apply_to(self.hive)
if CFG["ALLOW_SELF_REBOOT"] and vm.percent>94:
safe_reboot("refresh memory")
from abc import ABC, abstractmethod # type: ignore
class IModule(ABC): # type: ignore
"""Interface for a Hive module."""
def __init__(self, hive_instance: "Hive"):
self.hive = hive_instance
@abstractmethod
def start(self):
"""Start the module."""
pass
@abstractmethod
def stop(self):
"""Stop the module."""
pass
def get_status(self) -> dict:
return {"status": "unknown"}
class ModuleManager:
"""Manages the lifecycle of Hive modules."""
def __init__(self):
self.modules: "OrderedDict[str, IModule]" = collections.OrderedDict()
def register(self, name: str, module: IModule):
self.modules[name] = module
def start_all(self):
print("[ModuleManager] Starting all modules...")
for name, module in self.modules.items():
print(f"[ModuleManager] Starting {name}...")
module.start()
print("[ModuleManager] All modules started.")
def stop_all(self):
print("[ModuleManager] Stopping all modules...")
for name, module in reversed(self.modules.items()):
module.stop()
print("[ModuleManager] All modules stopped.")
class EventBus:
"""Simple thread-safe event bus for in-process messaging.
Used by BEL and HiveCore to exchange 'bel_in:' / 'bel_out:' messages.
Implements a blocking `get` with optional timeout and a non-blocking `post`.
"""
def __init__(self):
self._store: Dict[str, object] = {}
self._cond = threading.Condition()
def post(self, key: str, payload: object) -> None:
with self._cond:
self._store[key] = payload
self._cond.notify_all()
def get(self, key: str, timeout: Optional[float] = None):
end = time.time() + timeout if timeout is not None else None
with self._cond:
while key not in self._store:
remaining = None if end is None else end - time.time()
if remaining is not None and remaining <= 0:
return None
self._cond.wait(timeout=remaining)
return self._store.pop(key)
def subscribe_to_prefix(self, prefix: str, callback):
"""Background thread that calls callback for keys matching prefix."""
def listener():
while True:
with self._cond:
for key in list(self._store.keys()):
if key.startswith(prefix):
payload = self._store.pop(key)
callback(key, payload)
time.sleep(0.1)
threading.Thread(target=listener, daemon=True).start()
def short_id(n: int = 8) -> str:
"""Generate a short random hex ID of length n."""
import secrets
return secrets.token_hex(n // 2)[:n]
class BEL:
"""Bridge / permissioned IO abstraction used by external callers to post messages to Hive.
BEL wraps EventBus with a minimal permission model for file/console access.
"""
def __init__(self, event_bus: EventBus, config: Optional[Dict] = None):
self.event_bus = event_bus
self.config = config or {}
self.permissions = {
"file_read": True,
"file_write": True,
"list_dir": True,
"console": True,
}
def send_to_curve(self, payload: Dict) -> str:
key = f"bel_in:{short_id(8)}"
self.event_bus.post(key, payload)
return key
def receive_from_curve(self, in_key: str, timeout: float = 10.0):
out_key = f"bel_out:{in_key.split(':',1)[1]}"
return self.event_bus.get(out_key, timeout=timeout)
# Initialize global EventBus and BEL after their class definitions
GLOBAL_EVENT_BUS = EventBus()
GLOBAL_BEL = BEL(GLOBAL_EVENT_BUS)
# ----------- internal optimization stack -----------
def _append_jsonl(path, rec):
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def record_feedback(message_text: str, uid: Optional[str], role: str, feedback_type: str, request: Optional[object] = None):
"""Records feedback (up/down) for a given message. Returns a short ack string."""
try:
session = getattr(request, "session_hash", None) if request is not None else None
except Exception:
session = None
rec = {
"ts": time.time(),
"ts_readable": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"user_id": uid,
"role": role,
"feedback": feedback_type,
"message": (message_text or "<none>")[:2000],
"session": session,
}
try:
_append_jsonl(FEEDBACK_DB, rec)
# Route feedback to appropriate admins and create notifications
try:
recipients = _determine_feedback_recipients(uid, role)
for r in recipients:
_add_notification_for_recipient(r, sender=uid or "<anon>", subject=f"User feedback: {feedback_type}", body=rec.get('message',''))
except Exception as e:
print(f"[Feedback] Failed to route feedback notifications: {e}")
return "✅ Thanks — your feedback has been recorded."
except Exception as e:
print(f"[Feedback] Failed to record feedback: {e}")
return "⚠️ Failed to record feedback."
def view_feedback_as_markdown(role: str, limit: int = 200):
"""Returns the latest feedback entries as markdown. Only intended for owner/admin viewing."""
if role not in ("owner", "admin", "admin_general", "admin_super"):
return "🔒 You do not have permission to view feedback."
if not os.path.exists(FEEDBACK_DB):
return "No feedback recorded yet."
try:
with open(FEEDBACK_DB, "r", encoding="utf-8") as f:
lines = f.readlines()[-limit:]
items = [json.loads(l) for l in lines]
out_lines = ["### Recent Feedback\n"]
for it in reversed(items):
out_lines.append(f"- **{it.get('ts_readable','?')}** | **{it.get('feedback')}** | user:`{it.get('user_id')}` | role:`{it.get('role')}`\n - {it.get('message')[:300]}\n")
return "\n".join(out_lines)
except Exception as e:
print(f"[Feedback] Failed to read feedback: {e}")
return "Error reading feedback."
def record_correction(original_user_input: str, original_assistant_reply: str, corrected_assistant_reply: str, uid: Optional[str], role: str, request: Optional[object] = None):
"""Records a user's explicit correction of an assistant's reply."""
if not corrected_assistant_reply or corrected_assistant_reply.strip() == original_assistant_reply.strip():
return "No correction provided."
try:
session = getattr(request, "session_hash", None) if request is not None else None
except Exception:
session = None
rec = {
"ts": time.time(),
"ts_readable": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"user_id": uid,
"role": role,
"original_input": (original_user_input or "<none>")[:2000],
"original_reply": (original_assistant_reply or "<none>")[:2000],
"corrected_reply": (corrected_assistant_reply or "<none>")[:2000],
"session": session,
}
try:
_append_jsonl(CORRECTIONS_DB, rec)
return "✅ Correction recorded. Thank you for helping improve the system!"
except Exception as e:
print(f"[Correction] Failed to record correction: {e}")
return "⚠️ Failed to record correction."
# Notifications for owners/admins
NOTIFICATIONS_DB = os.path.join(HIVE_HOME, "system", "notifications.jsonl")
def notify_owner(subject: str, body: str) -> bool:
"""Record a notification and attempt to email the owner if SMTP is configured.
Returns True if recorded (and email sent when configured), False on failure.
"""
rec = {
"ts": time.time(),
"ts_readable": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"subject": subject,
"body": body
}
try:
_append_jsonl(NOTIFICATIONS_DB, rec)
except Exception as e:
print(f"[Notify] Failed to write notification: {e}")
return False
# Optional SMTP notify if env is configured
try:
smtp_host = os.getenv('SMTP_HOST')
smtp_port = int(os.getenv('SMTP_PORT') or 0)
smtp_user = os.getenv('SMTP_USER')
smtp_pass = os.getenv('SMTP_PASS')
owner_email = os.getenv('OWNER_EMAIL') or os.getenv('HIVE_OWNER_EMAIL')
if smtp_host and smtp_port and owner_email:
import smtplib
from email.message import EmailMessage
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = smtp_user or f'hive@{socket.gethostname()}'
msg['To'] = owner_email
msg.set_content(body)
server = smtplib.SMTP(smtp_host, smtp_port, timeout=10)
server.starttls()
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"[Notify] SMTP notify failed or not configured: {e}")
# Not fatal — notification recorded
return True
def _add_notification_for_recipient(recipient: str, sender: str, subject: str, body: str, meta: Optional[dict] = None):
"""Append a notification targeted to `recipient` (user id or role name).
A notification has an `id`, `recipient`, `sender`, `subject`, `body`, `ts`, and `viewed_by` list.
"""
nid = f"notif_{int(time.time()*1000)}_{random.randint(0,9999)}"
rec = {
"id": nid,
"ts": time.time(),
"ts_readable": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"recipient": recipient,
"sender": sender,
"subject": subject,
"body": body,
"viewed_by": [],
}
if meta: rec['meta'] = meta
try:
_append_jsonl(NOTIFICATIONS_DB, rec)
except Exception as e:
print(f"[Notify] Failed to append notification: {e}")
def _load_notifications() -> List[Dict]:
if not os.path.exists(NOTIFICATIONS_DB):
return []
try:
with open(NOTIFICATIONS_DB, 'r', encoding='utf-8') as f:
return [json.loads(l) for l in f.readlines() if l.strip()]
except Exception as e:
print(f"[Notify] Failed to load notifications: {e}")
return []
def _save_notifications(all_notifs: List[Dict]):
try:
with open(NOTIFICATIONS_DB, 'w', encoding='utf-8') as f:
for n in all_notifs:
f.write(json.dumps(n, ensure_ascii=False) + "\n")
except Exception as e:
print(f"[Notify] Failed to save notifications: {e}")
def _get_notifications_for_user(uid: Optional[str], role: str) -> List[Dict]:
nlist = _load_notifications()
out = []
for n in nlist:
recp = n.get('recipient')
# recipient may be a specific id, a role string (e.g., 'admin_general'), or 'admins' meaning all admins
if recp == uid or recp == role or (recp == 'admins' and role.startswith('admin')) or (recp == 'owner' and role == 'owner'):
out.append(n)
return out
def _mark_notifications_viewed(uid: Optional[str], role: str):
alln = _load_notifications()
changed = False
for n in alln:
recp = n.get('recipient')
if recp == uid or recp == role or (recp == 'admins' and role.startswith('admin')) or (recp == 'owner' and role == 'owner'):
if uid and uid not in n.get('viewed_by', []):
n.setdefault('viewed_by', []).append(uid)
changed = True
if changed:
_save_notifications(alln)
def _determine_feedback_recipients(user_id: Optional[str], user_role: str) -> List[str]:
"""Determine which admin recipients should receive feedback notifications.
Returns a list of recipient identifiers (specific user ids or role strings).
"""
d = _load_users()
recipients = []
# If the feedback comes from an admin, escalate up
if user_role in ('admin_general', 'admin_super'):
if user_role == 'admin_general':
# notify super admins and owner
recipients.extend([u.get('id') for u in d.get('admins_super', []) if u])
recipients.append(d.get('owner', {}).get('id'))
elif user_role == 'admin_super':
recipients.append(d.get('owner', {}).get('id'))
return [r for r in recipients if r]
# For regular users: try to find assigned managers in their profile
user_obj, _ = _find_user(d, user_id or '')
if user_obj:
mgrs = user_obj.get('managers') or user_obj.get('assigned_admins') or []
# If managers exist, notify them and any supers/owner above them
if mgrs:
for mid in mgrs:
recipients.append(mid)
# find manager role and add owner if manager is super
mu, mrole = _find_user(d, mid)
if mrole == 'admin_general':
recipients.extend([u.get('id') for u in d.get('admins_super', []) if u])
if d.get('owner'):
recipients.append(d.get('owner', {}).get('id'))
return list(dict.fromkeys([r for r in recipients if r]))
# Fallback: broadcast to all admins (general + super) and owner
recipients.extend([u.get('id') for u in d.get('admins_general', []) if u])
recipients.extend([u.get('id') for u in d.get('admins_super', []) if u])
if d.get('owner'): recipients.append(d.get('owner', {}).get('id'))
return list(dict.fromkeys([r for r in recipients if r]))
# ---------- Self-Optimization / Internal Auto-Improve ----------
def _load_proposals() -> List[Dict]:
if not os.path.exists(OPT_PROPOSALS):
return []
with open(OPT_PROPOSALS, 'r', encoding='utf-8') as f:
return [json.loads(l) for l in f.readlines() if l.strip()]
def _safe_to_apply_proposal(cp: "ChangeProposal") -> bool:
"""Ensure proposals do not modify admin controls or privileged files.
Only allow prompt/fallback_kb/model_config updates automatically.
"""
disallowed = ["OWNER_PASS", "OWNER_SECOND", "HIVE_OWNER_USER", "OPT_PKG_ALLOWLIST", "OPT_MODEL_ALLOWLIST", "HIVE_WAKE_WORDS"]
# If proposal name touches any disallowed keyword, reject automatic apply
for k in disallowed:
if k.lower() in (cp.name or "").lower() or k.lower() in (cp.patch_text or "").lower():
return False
# Allow only these safe kinds for auto-apply; 'code' is allowed only if very explicit
allowed = ("prompt", "fallback_kb", "model_config", "package", "kb_snippet")
if cp.kind not in allowed:
return False
# Disallow code edits that touch admin/auth surfaces
if cp.kind == "code" and ("admin" in (cp.name or "").lower() or "auth" in (cp.name or "").lower()):
return False
# Optional quality threshold: require cp.reason to contain a quality marker or cp.patch_text to include inline score
try:
q = None
if isinstance(cp.patch_text, str) and 'quality_score=' in cp.patch_text:
for part in cp.patch_text.split():
if part.startswith('quality_score='):
q = float(part.split('=',1)[1])
if q is not None:
if q < float(CFG.get('OPT_THRESH_QUALITY', 0.02)):
return False
except Exception:
return False
return True
def _apply_proposal(cp: "ChangeProposal") -> Dict:
"""Apply a ChangeProposal in a conservative, sandboxed manner for allowed types.
Returns a result dict appended to OPT_RESULTS.
"""
result = {"update_for": cp.id, "applied": False, "reason": "", "backup": None}
try:
if not _safe_to_apply_proposal(cp):
result["reason"] = "Not allowed to auto-apply this proposal (privileged target)."
_append_jsonl(OPT_RESULTS, result)
return result
# create backup folder for this proposal
backup_root = os.path.join(CFG.get('HIVE_HOME', '.'), 'opt_backups')
os.makedirs(backup_root, exist_ok=True)
bid = f"{cp.id}_{int(time.time())}"
this_backup = os.path.join(backup_root, bid)
os.makedirs(this_backup, exist_ok=True)
backup_meta = {'proposal_id': cp.id, 'ts': int(time.time()), 'files': []}
def _save_backup(filepath):
try:
if not os.path.exists(filepath):
return None
rel = os.path.relpath(filepath, CFG.get('HIVE_HOME', '.'))
safe_name = rel.replace(os.sep, '__')
dest = os.path.join(this_backup, safe_name)
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copy2(filepath, dest)
backup_meta['files'].append({'orig': filepath, 'backup': dest})
return dest
except Exception:
return None
# Example: apply fallback_kb changes by writing to a known JSON file
if cp.kind == "fallback_kb":
kb_path = os.path.join(CFG["HIVE_HOME"], "knowledge", "fallback_kb.json")
os.makedirs(os.path.dirname(kb_path), exist_ok=True)
_save_backup(kb_path)
try:
kb = {}
if os.path.exists(kb_path):
with open(kb_path, 'r', encoding='utf-8') as f:
kb = json.load(f)
try:
patch = json.loads(cp.patch_text)
kb.update(patch)
except Exception:
for ln in (cp.patch_text or "").splitlines():
if ':' in ln:
k, v = ln.split(':', 1)
kb[k.strip()] = v.strip()
with open(kb_path, 'w', encoding='utf-8') as f:
json.dump(kb, f, ensure_ascii=False, indent=2)
try:
globals()['FALLBACK_KB'].update(kb)
except Exception:
pass
result['applied'] = True
result['reason'] = 'Applied fallback_kb update.'
except Exception as e:
result['reason'] = f'Failed to apply fallback_kb: {e}'
elif cp.kind == 'prompt' or cp.kind == 'kb_snippet' or cp.kind == 'response_tweak':
overlay_path = os.path.join(OVERLAY_DIR, f"prompt_patch_{cp.id}.txt")
os.makedirs(os.path.dirname(overlay_path), exist_ok=True)
_save_backup(overlay_path)
with open(overlay_path, 'w', encoding='utf-8') as f:
f.write(cp.patch_text or '')
result['applied'] = True
result['reason'] = 'Wrote prompt patch to overlay.'
else:
result['reason'] = 'Proposal kind not implemented for auto-apply.'
# persist backup metadata so revert is possible
try:
with open(os.path.join(this_backup, 'meta.json'), 'w', encoding='utf-8') as mf:
json.dump(backup_meta, mf)
result['backup'] = this_backup
except Exception:
pass
except Exception as e:
result['reason'] = f'Exception applying proposal: {e}'
_append_jsonl(OPT_RESULTS, result)
return result
def auto_self_optimize_once(hive_factory_callable) -> Dict:
"""Run one iteration of the self-optimization routine.
It aggregates feedback, proposes a small change, runs synthetic eval, and
applies the change only if internal tests pass. Returns a summary dict.
"""
try:
# Aggregate feedback to find negative signals
if not os.path.exists(FEEDBACK_DB):
return {"ok": False, "reason": "no feedback"}
with open(FEEDBACK_DB, 'r', encoding='utf-8') as f:
items = [json.loads(l) for l in f.readlines() if l.strip()]
if not items:
return {"ok": False, "reason": "no feedback entries"}
# Simple aggregation: count down votes per message snippet
counts = {}
for it in items:
key = it.get('message','')[:200]
counts.setdefault(key, {"up":0, "down":0})
if it.get('feedback') == 'up': counts[key]['up'] += 1
else: counts[key]['down'] += 1
# Find top-downvoted messages
candidates = sorted([(k,v) for k,v in counts.items()], key=lambda x: x[1]['down'], reverse=True)
if not candidates or candidates[0][1]['down'] < 2:
return {"ok": False, "reason": "no strong negative signals"}
target_msg = candidates[0][0]
# Create a conservative ChangeProposal to improve prompts or fallback KB
cp = ChangeProposal(kind='fallback_kb', name='auto_from_feedback', patch_text=json.dumps({target_msg[:40]: 'Improved guidance based on feedback.'}), proposer='auto_opt', id=f"auto_{int(time.time())}")
_append_jsonl(OPT_PROPOSALS, cp.__dict__)
# Run synthetic eval comparing before/after using sandboxed hive instances
def hive_factory():
return hive_factory_callable()
# Evaluate baseline
baseline = _synthetic_eval(hive_factory, [target_msg])
# Apply proposal in sandbox (we simulate apply_proposal, then run eval again)
apply_result = _apply_proposal(cp)
if not apply_result.get('applied'):
return {"ok": False, "reason": "apply failed", "apply_result": apply_result}
# After apply, re-evaluate
after = _synthetic_eval(hive_factory, [target_msg])
# If quality improved by threshold, accept; otherwise revert (log)
q_gain = after.get('quality',0) - baseline.get('quality',0)
if q_gain >= float(CFG.get('OPT_THRESH_QUALITY', 0.02)):
res = {"ok": True, "change_id": cp.id, "quality_gain": q_gain}
else:
# Revert using backup metadata created during apply
try:
revert_res = _revert_proposal(cp.id)
res = {"ok": False, "reason": "no quality improvement, reverted", "quality_gain": q_gain, "revert": revert_res}
except Exception as e:
res = {"ok": False, "reason": f"no quality improvement and revert failed: {e}", "quality_gain": q_gain}
_append_jsonl(OPT_RESULTS, {"update_for": cp.id, "applied": res.get('ok', False), "detail": res})
return res
except Exception as e:
return {"ok": False, "reason": f"exception: {e}"}
def auto_self_optimize_background(hive_factory_callable, interval: int = 300):
"""Background thread to periodically run auto self-optimization if enabled."""
def _run_loop():
while True:
try:
if CFG.get('AUTO_SELF_OPTIMIZE'):
res = auto_self_optimize_once(hive_factory_callable)
# Log result and notify owner/admin if a change was applied or reverted
if res.get('ok') and res.get('change_id'):
print(f"[AutoOpt] Applied proposal: {res['change_id']} (quality gain: {res.get('quality_gain')})")
elif 'revert' in res:
print(f"[AutoOpt] Reverted proposal: {res.get('change_id')} due to no quality gain.")
except Exception as e:
print(f"[AutoOpt] Error during optimization loop: {e}")
time.sleep(interval)
t = threading.Thread(target=_run_loop, daemon=True)
t.start()
@dataclass
class ChangeProposal: # type: ignore
kind: str # "model" | "package" | "code"
name: str # model id / package name / file target
version: str = "" # type: ignore
patch_text: str = "" # for "code": full replacement or diff
reason: str = "" # type: ignore
created_ts: float = field(default_factory=time.time)
proposer: str = "hive" # type: ignore
id: str = "" # type: ignore
class Sandbox:
def __init__(self):
self.root=os.path.join(OPT_DIR, f"sandbox_{int(time.time())}")
os.makedirs(self.root, exist_ok=True)
self.venv=os.path.join(self.root,"venv")
def _run(self, args, timeout):
p=subprocess.run(args, capture_output=True, text=True, timeout=timeout)
return p.returncode, (p.stdout or "") + (p.stderr or "")
def create(self):
rc,out=self._run([sys.executable,"-m","venv",self.venv], timeout=120)
if rc!=0: raise RuntimeError("venv create failed: "+out)
def pip(self, pkg_spec):
py=os.path.join(self.venv,"bin","python") if os.name!="nt" else os.path.join(self.venv,"Scripts","python.exe")
rc,out=self._run([py,"-m","pip","install","--upgrade",pkg_spec], timeout=CFG["OPT_SANDBOX_TIMEOUT"])
if rc!=0: raise RuntimeError("pip install failed: "+out)
def run_snippet(self, code:str):
py=os.path.join(self.venv,"bin","python") if os.name!="nt" else os.path.join(self.venv,"Scripts","python.exe")
tmp=os.path.join(self.root,"snippet.py"); open(tmp,"w",encoding="utf-8").write(code)
rc,out=self._run([py,tmp], timeout=CFG["OPT_SANDBOX_TIMEOUT"]); return rc,out
def _find_backup_for_proposal(pid: str) -> Optional[str]:
backup_root = os.path.join(CFG.get('HIVE_HOME', '.'), 'opt_backups')
if not os.path.exists(backup_root):
return None
for name in os.listdir(backup_root):
if name.startswith(pid + '_') or name.startswith(pid):
candidate = os.path.join(backup_root, name)
meta = os.path.join(candidate, 'meta.json')
if os.path.exists(meta):
return candidate
return None
def _revert_proposal(proposal_id: str) -> dict:
"""Attempt to revert an applied proposal using the backup created at apply time.
Returns result dict with ok status and message.
"""
try:
bdir = _find_backup_for_proposal(proposal_id)
if not bdir:
return {'ok': False, 'error': 'no backup found'}
meta_file = os.path.join(bdir, 'meta.json')
try:
with open(meta_file, 'r', encoding='utf-8') as mf:
meta = json.load(mf)
except Exception:
meta = None
restored = []
for entry in (meta or {}).get('files', []):
orig = entry.get('orig')
backup = entry.get('backup')
if orig and backup and os.path.exists(backup):
try:
with open(backup, 'rb') as src:
data = src.read()
os.makedirs(os.path.dirname(orig), exist_ok=True)
with open(orig, 'wb') as dst:
dst.write(data)
restored.append(orig)
except Exception:
pass
res = {'ok': True, 'restored': restored, 'proposal_id': proposal_id}
_append_jsonl(OPT_RESULTS, {'update_for': proposal_id, 'reverted': True, 'restored': restored})
return res
except Exception as e:
return {'ok': False, 'error': str(e)}
def _synthetic_eval(hive_factory, prompts: List[str]) -> Dict:
lat_ms=[]; toks_s=[]; quality=0.0
for p in prompts:
t0=time.time()
h=hive_factory()
out=h.pipe(h.compiler.compile(p, []), max_new_tokens=64, do_sample=False, temperature=0.2) # type: ignore
t1=time.time()
text=out[0]["generated_text"]
lat_ms.append((t1-t0)*1000)
toks=max(1,len(text.split())); toks_s.append(toks/max(0.001,(t1-t0)))
q=sum(1 for w in set(re.findall(r"\w+", p.lower())) if w in text.lower())/max(1,len(set(re.findall(r"\w+", p.lower()))))
quality+=q
n=max(1,len(prompts))
return {"lat_ms":sum(lat_ms)/n, "toks_s":sum(toks_s)/n, "quality":quality/n}
class ChangeManager:
def __init__(self, hive_cls):
self.hive_cls=hive_cls
def _allowed_pkg(self, name):
return any(name.strip().startswith(allow.strip()) for allow in CFG["OPT_PKG_ALLOWLIST"])
def _allowed_model(self, mid):
return mid in CFG["OPT_MODEL_ALLOWLIST"]
def propose(self, cp: ChangeProposal)->str:
cp.id=f"chg_{int(time.time())}_{abs(hash(cp.name))%100000}"; _append_jsonl(OPT_PROPOSALS, cp.__dict__); return cp.id
def test_and_compare(self, cp_id:str, proposal: ChangeProposal)->Dict:
"""
Tests a proposal in a sandbox, compares it against the baseline,
and automatically applies it if it passes and auto-apply is enabled.
"""
def base_hive(): return self.hive_cls(model_id=None, lite=True)
prompts=["Summarize the water cycle.","Translate to French: the quick brown fox jumps over the lazy dog."]
base=_synthetic_eval(base_hive, prompts)
sand=Sandbox(); sand.create()
model_override=None
try:
# Install requirements in sandbox venv
reqs = ["numpy>=1.24.0","psutil>=5.9.0","requests>=2.31.0","gradio>=4.44.0","sentence-transformers>=3.0.0","faiss-cpu>=1.8.0",
"transformers>=4.44.0","accelerate>=0.33.0","datasets>=2.21.0","soundfile>=0.12.1","faster-whisper>=1.0.0","langid>=1.1.6",
"piper-tts>=1.2.0","g2p_en>=2.1.0","librosa>=0.10.1","scikit-learn>=1.1.0","feedparser>=6.0.11","duckduckgo_search>=6.2.10",
"keyring>=24.3.1"]
for req in reqs:
sand.pip(req)
if proposal.kind=="package":
if not self._allowed_pkg(proposal.name): return {"ok":False,"reason":"package not allowlisted"}
spec=proposal.name + (("=="+proposal.version) if proposal.version else "")
sand.pip(spec)
elif proposal.kind=="model":
if not self._allowed_model(proposal.name): return {"ok":False,"reason":"model not allowlisted"}
model_override=proposal.name
elif proposal.kind=="code":
target=os.path.basename(__file__); patched=os.path.join(sand.root,target)
with open(patched,"w",encoding="utf-8") as f: f.write(proposal.patch_text or "")
code=f"import importlib.util, json; p=r'{patched}'; spec=importlib.util.spec_from_file_location('hmod',p); m=importlib.util.module_from_spec(spec); spec.loader.exec_module(m); h=m.Hive(); print(json.dumps({{'ok':True}}))"
rc,out=sand.run_snippet(code)
if rc!=0 or '"ok": true' not in out.lower(): return {"ok":False,"reason":"patch smoke test failed","out":out}
except Exception as e:
return {"ok":False,"reason":f"sandbox setup failed: {e}"} # type: ignore
# For candidate evaluation, we need a full hive instance to access the compiler,
# but we can initialize it in a "lite" way to avoid reloading the main model.
def cand_hive():
# Create a full instance but tell it not to load the LLM pipeline again.
# This gives us access to all modules like the compiler.
return self.hive_cls(model_id=model_override, lite=False)
def cand_hive(): return self.hive_cls(model_id=model_override, lite=True) if model_override else self.hive_cls(model_id=None, lite=True)
cand=_synthetic_eval(cand_hive, prompts)
delta={"lat_ms": base["lat_ms"]-cand["lat_ms"], "toks_s": cand["toks_s"]-base["toks_s"], "quality": cand["quality"]-base["quality"]}
passed=True
if CFG["OPT_THRESH_LATENCY_MS"]>0 and delta["lat_ms"]<CFG["OPT_THRESH_LATENCY_MS"]: passed=False
if CFG["OPT_THRESH_TOKS_PER_S"]>0 and delta["toks_s"]<CFG["OPT_THRESH_TOKS_PER_S"]: passed=False
if delta["quality"]<CFG["OPT_THRESH_QUALITY"]: passed=False
result={"ok":True,"proposal":proposal.__dict__,"base":base,"cand":cand,"delta":delta,"passed":passed, "ts": time.time()}
_append_jsonl(OPT_RESULTS, result)
# Automatically apply if tests passed and auto-apply is on
if passed and CFG.get("OPT_AUTO_APPLY"):
apply_ok, apply_msg = self.apply(result)
result["applied"] = {"ok": apply_ok, "message": apply_msg, "ts": time.time()}
_append_jsonl(OPT_RESULTS, {"update_for": cp_id, "applied": result["applied"]})
return result
def apply(self, result:Dict)->Tuple[bool,str]:
prop=result.get("proposal",{}); kind=prop.get("kind"); name=prop.get("name","")
if not result.get("passed"): return False,"did not meet thresholds"
if kind=="package":
if not self._allowed_pkg(name): return False,"package not allowlisted"
try:
subprocess.check_call([sys.executable,"-m","pip","install","--upgrade", name + (("=="+prop.get("version","")) if prop.get("version") else "")])
return True,"package installed"
except Exception as e: return False,f"pip failed: {e}"
if kind=="model":
if not self._allowed_model(name): return False,"model not allowlisted"
pref=os.path.join(OPT_DIR,"preferred_model.json"); _atomic_write_json(pref, {"model_id":name,"ts":time.time()})
return True,"model preference recorded (takes effect after restart)"
if kind=="code":
is_pi = 'raspberrypi' in platform.machine().lower()
if is_pi and hasattr(self.hive_cls, 'bootstrap_instance') and self.hive_cls.bootstrap_instance:
print("[ChangeManager] Raspberry Pi detected, attempting hot-reload.")
try:
target=os.path.abspath(__file__)
with open(target, "w", encoding="utf-8") as f: f.write(prop.get("patch_text","") or "")
self.hive_cls.bootstrap_instance.soft_restart()
return True, "Code hot-reloaded without a full reboot."
except Exception as e:
return False, f"Hot-reload failed: {e}. A manual restart is required."
try:
target=os.path.abspath(__file__); backup=target+f".bak_{int(time.time())}"; shutil.copyfile(target,backup)
with open(target,"w",encoding="utf-8") as f: f.write(prop.get("patch_text","") or ""); return True,"code updated (backup created); restart recommended"
except Exception as e: return False,f"code write failed: {e}"
return False,"unknown change type"
class ChangeManagerModule(ChangeManager, IModule): # type: ignore
def __init__(self, hive_instance: "Hive"):
IModule.__init__(self, hive_instance)
ChangeManager.__init__(self, hive_instance.__class__)
def start(self): pass
def stop(self): pass
class SelfOptimizerModule(SelfOptimizer, IModule):
def __init__(self, hive_instance: "Hive"):
IModule.__init__(self, hive_instance)
SelfOptimizer.__init__(self, hive_instance)
def start(self):
super().start()
def stop(self): self.stop = True
class LibrarianCurve:
"""Implements the Librarian from Part 2, Section 7."""
def __init__(self, curve_store: CurveStore, k_store: KnowledgeStore):
self.store = curve_store
self.k_store = k_store
def retrieve_scoped_with_scores(self, query: str, role: str, user_id: Optional[str], k: int = 6):
# This is a simplified retrieval. A full implementation would use the role and user_id for scoping.
return self.store.search_with_scores(query, k=k)
class LibrarianModule(LibrarianCurve, IModule):
"""Module wrapper for LibrarianCurve."""
def __init__(self, hive_instance: "Hive", curve_store: "CurveStore", k_store: "KnowledgeStore"):
IModule.__init__(self, hive_instance)
LibrarianCurve.__init__(self, curve_store, k_store)
def start(self): pass
def stop(self): pass
class VADService:
"""
A Voice Activity Detector service that processes an audio stream to find speech segments.
It uses webrtcvad for the core VAD logic and manages buffering and state.
"""
def __init__(self, aggressiveness: int = 2, sample_rate: int = 16000):
if not _HAVE_VAD:
raise ImportError("webrtcvad library is not installed.")
self.vad = webrtcvad.Vad(aggressiveness)
self.sample_rate = sample_rate
self.frame_duration_ms = 30 # webrtcvad supports 10, 20, 30 ms frames
self.frame_size = int(self.sample_rate * self.frame_duration_ms / 1000)
self.speech_buffer = bytearray()
self.silence_frames = 0
self.is_speaking = False
self.silence_threshold_frames = int(CFG['VAD_SILENCE_DURATION'] * 1000 / self.frame_duration_ms)
self.min_speech_frames = int(CFG['VAD_MIN_SPEECH_DURATION'] * 1000 / self.frame_duration_ms)
def process_stream(self, audio_chunk):
"""Processes a chunk of audio, yielding speech segments when detected."""
audio_int16 = (audio_chunk * 32767).astype(np.int16)
audio_bytes = audio_int16.tobytes()
for i in range(0, len(audio_bytes), self.frame_size * 2): # 2 bytes per sample
frame = audio_bytes[i:i + self.frame_size * 2]
if len(frame) < self.frame_size * 2: continue
is_speech = self.vad.is_speech(frame, self.sample_rate)
if is_speech:
self.speech_buffer.extend(frame)
self.silence_frames = 0
self.is_speaking = True
elif self.is_speaking: # End of speech
self.is_speaking = False
if len(self.speech_buffer) / (self.sample_rate * 2) >= CFG['VAD_MIN_SPEECH_DURATION']:
yield np.frombuffer(self.speech_buffer, dtype=np.int16).astype(np.float32) / 32767.0
self.speech_buffer.clear()
class VoiceServicesModule(IModule):
def __init__(self, hive_instance: "Hive"):
super().__init__(hive_instance)
def start(self):
# Capability gating: only initialize services if the environment supports them
caps = getattr(self.hive, 'caps', {}) or {}
has_mic = caps.get('has_microphone', False) or caps.get('has_audio_input', False)
has_speaker = caps.get('has_speaker', False) or caps.get('has_audio_output', False)
has_camera = caps.get('has_camera', False)
# VAD depends on system libraries as well as microphone availability
if has_mic and _HAVE_VAD:
try:
self.hive.vad_service = VADService(aggressiveness=CFG["VOICE_VAD_AGGRESSIVENESS"])
except Exception as e:
logging.warning(f"VAD initialization failed: {e}")
self.hive.vad_service = None
# ASR: only if microphone is present and faster-whisper available
if has_mic:
try:
self.hive.asr_service = ASRService()
except Exception as e:
logging.warning(f"ASR initialization skipped/failed: {e}")
self.hive.asr_service = None
else:
self.hive.asr_service = None
# TTS: only if speaker/output available and Piper present
if has_speaker:
try:
self.hive.tts_service = TTSService()
except Exception as e:
logging.warning(f"TTS initialization skipped/failed: {e}")
self.hive.tts_service = None
else:
self.hive.tts_service = None
# Video: only if camera is present and OpenCV is available
if has_camera:
try:
self.hive.video_service = VideoService(self.hive)
if self.hive.video_service:
self.hive.video_service.start()
except Exception as e:
logging.warning(f"VideoService initialization skipped/failed: {e}")
self.hive.video_service = None
else:
self.hive.video_service = None
def stop(self):
if self.hive.video_service: self.hive.video_service.stop_event.set()
class VideoService(IModule):
"""Handles video capture from a webcam."""
def __init__(self, hive_instance: "Hive"):
super().__init__(hive_instance)
self.cap = None
self.stop_event = threading.Event()
# CRITICAL FIX: Don't probe camera hardware during init - adds 15-20s on CPU hosts
# Check system capabilities to see if camera exists before attempting VideoCapture
self._has_camera = hive_instance.caps.get("has_camera", False) if hasattr(hive_instance, 'caps') and hive_instance.caps else False
def get_frame(self):
# Lazy initialization: only open camera when actually requested
if not _HAVE_CV or not self._has_camera:
return None
if self.cap is None:
self.cap = cv2.VideoCapture(0)
ret, frame = self.cap.read()
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if ret else None
def start(self):
"""Required by IModule - video capture starts automatically in __init__"""
pass
def stop(self):
"""Required by IModule - cleanup video resources"""
self.stop_event.set()
if self.cap:
self.cap.release()
class PersistenceEngine(IModule):
"""Persistence engine with a lightweight embedding worker that consumes the KnowledgeStore's embedding queue.
This keeps the per-turn saves durable while offloading embedding work to a background thread.
Batch size adapts to device capabilities (Pi=4-8, larger systems=16-64).
"""
def __init__(self, hive_instance: "Hive", embedding_batch_size: Optional[int] = None):
super().__init__(hive_instance)
# Reference to hive instance and config paths
self.hive = hive_instance
self.hive_home = hive_instance.config.get("HIVE_HOME", HIVE_HOME)
self.users_dir = os.path.join(self.hive_home, "users", "conversations")
os.makedirs(self.users_dir, exist_ok=True)
# Embedding worker control
self._worker_thread: Optional[threading.Thread] = None
self._worker_stop = threading.Event()
self._worker_lock = threading.Lock()
self._batch_size = embedding_batch_size or hive_instance.embedding_batch_size or 8
self._sleep_interval = 2.0 if hive_instance.caps.get("is_low_memory") else 1.0
def start(self):
"""Start background workers for persistence/embedding processing."""
# Start embedding worker thread (daemon)
with self._worker_lock:
try:
if self._worker_thread and self._worker_thread.is_alive():
logging.info("PersistenceEngine: embedding worker already running.")
return True
# Respect low-memory devices by lowering batch sizes and adding a delay
if getattr(self.hive, 'caps', {}).get('is_low_memory'):
old = self._batch_size
self._batch_size = max(1, min(self._batch_size, 4))
self._sleep_interval = max(self._sleep_interval, 2.0)
logging.info(f"PersistenceEngine: low-memory detected, adjusting batch_size {old} -> {self._batch_size}, sleep={self._sleep_interval}s")
self._worker_stop.clear()
t = threading.Thread(target=self._embedding_worker_loop, name="embedding_worker", daemon=True)
t.start()
self._worker_thread = t
logging.info("PersistenceEngine: embedding worker started.")
return True
except Exception as e:
logging.error(f"PersistenceEngine.start failed: {e}")
return False
def stop(self):
"""Stop background workers gracefully."""
try:
self._worker_stop.set()
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=5.0)
logging.info("PersistenceEngine: embedding worker stopped.")
return True
except Exception as e:
logging.warning(f"PersistenceEngine.stop error: {e}")
return False
def _atomic_append_jsonl(self, path: str, rec: dict) -> bool:
# Append a JSON line and fsync to ensure durability
try:
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
f.flush()
try:
os.fsync(f.fileno())
except Exception:
pass
return True
except Exception as e:
logging.error(f"PersistenceEngine failed to append to {path}: {e}")
return False
def save_turn(self, record: dict) -> bool:
"""Save a single conversation turn to the per-user conversation file.
Record expected keys: 'user', 'text', 'reply', 'ts' (timestamp)
"""
try:
uid = record.get('user') or 'guest'
safe_uid = re.sub(r"[^a-zA-Z0-9_\-]", "_", str(uid))[:64]
fname = os.path.join(self.users_dir, f"{safe_uid}.jsonl")
rec = {
"ts": record.get('ts', time.time()),
"user": uid,
"text": record.get('text'),
"reply": record.get('reply'),
}
ok = self._atomic_append_jsonl(fname, rec)
# Also append to a global conversation log for auditing
_append_jsonl(os.path.join(self.hive_home, "system", "logs", "conversations.jsonl"), {**rec, "user_file": fname})
return ok
except Exception as e:
logging.error(f"PersistenceEngine.save_turn error: {e}")
return False
def _embedding_worker_loop(self):
"""Simple persistent embedding worker that consumes `embedding_queue.jsonl` from the KnowledgeStore.
Behavior:
- Read the embedding queue file atomically and collect up to `_batch_size` queued entries.
- For each chunk_id, load chunk JSON from KnowledgeStore and compute embeddings using
the CurveStore (module `store`) via `add_texts`.
- After successful embedding, mark the chunk file `embeddings_generated` true and remove the queue entry.
This implementation is intentionally conservative (small batches, frequent sleeps) to suit low-RAM devices.
"""
try:
kstore: Optional[KnowledgeStore] = None
curve: Optional[CurveStore] = None
# Resolve modules if available
try:
if hasattr(self.hive, 'module_manager') and 'kstore' in getattr(self.hive, 'module_manager').modules:
kstore = self.hive.module_manager.modules['kstore']
if hasattr(self.hive, 'module_manager') and 'store' in getattr(self.hive, 'module_manager').modules:
curve = self.hive.module_manager.modules['store']
except Exception:
kstore = None; curve = None
# If kstore not available, attempt to construct a lightweight KnowledgeStore pointing to HIVE_HOME
if kstore is None:
try:
kstore = KnowledgeStore(self.hive_home)
except Exception:
kstore = None
logging.info(f"Embedding worker loop starting (batch_size={self._batch_size}, sleep={self._sleep_interval}) qpath_hint={os.path.join(self.hive_home,'knowledge','embedding_queue.jsonl')}")
while not self._worker_stop.is_set():
try:
if kstore is None or curve is None:
# Try to refresh references from module_manager
try:
if hasattr(self.hive, 'module_manager'):
kstore = self.hive.module_manager.modules.get('kstore', kstore)
curve = self.hive.module_manager.modules.get('store', curve)
except Exception:
pass
qpath = getattr(kstore, 'embedding_queue_path', os.path.join(self.hive_home, 'knowledge', 'embedding_queue.jsonl')) if kstore else os.path.join(self.hive_home, 'knowledge', 'embedding_queue.jsonl')
if not os.path.exists(qpath):
# No queue file yet - sleep a bit and continue
logging.debug(f"Embedding worker: queue file {qpath} not found; sleeping {self._sleep_interval}s")
time.sleep(self._sleep_interval)
continue
# Read queue atomically
with open(qpath, 'r', encoding='utf-8') as f:
lines = [l.strip() for l in f.readlines() if l.strip()]
queued = []
rest = []
for ln in lines:
try:
item = json.loads(ln)
except Exception:
continue
if item.get('status') == 'queued' and len(queued) < self._batch_size:
queued.append(item)
else:
rest.append(item)
if not queued:
logging.debug("Embedding worker: no queued items found; sleeping")
time.sleep(self._sleep_interval)
continue
# Process queued items
for item in queued:
chunk_id = item.get('chunk_id')
if not chunk_id: continue
try:
chunk_file = os.path.join(self.hive_home, 'knowledge', 'chunks', f"{chunk_id}.json")
if not os.path.exists(chunk_file):
continue
chunk = json.load(open(chunk_file, 'r', encoding='utf-8'))
texts = chunk.get('texts', [])
metas = []
for idx, t in enumerate(texts):
metas.append({'id': f"{chunk_id}_{idx}", 'chunk_id': chunk_id, 'text': t, 'timestamp': chunk.get('timestamp', time.time())})
# Use CurveStore.add_texts for embedding; if not available, skip but keep manifest updated
try:
if curve is not None and hasattr(curve, 'add_texts'):
try:
curve.add_texts(texts, metas)
except Exception as e:
logging.warning(f"Embedding add_texts failed for chunk {chunk_id}: {e}")
# Requeue item for retry
rest.append(item)
continue
else:
logging.info(f"Embedding worker: no curve store available for chunk {chunk_id}; marking as processed without embeddings")
# Mark chunk as embedded (or at least processed) to avoid infinite loop
chunk['embeddings_generated'] = True
_atomic_write_json(chunk_file, chunk)
except Exception as e:
logging.warning(f"Embedding worker failed for chunk {chunk_id}: {e}")
# Requeue this item by appending back to rest so it can be retried
rest.append(item)
except Exception as e:
logging.error(f"Exception processing embedding queue item {item}: {e}")
rest.append(item)
# Rewrite the queue file with remaining (unprocessed) items
try:
tmp_q = qpath + f".tmp_{int(time.time())}"
with open(tmp_q, 'w', encoding='utf-8') as f:
for r in rest:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
os.replace(tmp_q, qpath)
except Exception as e:
logging.warning(f"Embedding worker failed to rewrite queue file: {e}")
# Small sleep to avoid tight loop
time.sleep(self._sleep_interval)
except Exception as e:
logging.error(f"Embedding worker main loop error: {e}")
time.sleep(1.0)
except Exception as e:
logging.error(f"PersistenceEngine embedding worker failed to start: {e}")
return
class DialogueManager(IModule):
"""Manages conversation context, user preferences, and dialogue flow."""
def __init__(self, hive_instance: "Hive"):
super().__init__(hive_instance)
self.user_prefs: Dict[str, Dict] = {}
def get_user_prefs(self, caller_id: Optional[str]) -> Dict:
"""Returns user preferences (language, phonics settings, etc.)"""
if caller_id and caller_id in self.user_prefs:
return self.user_prefs[caller_id]
return {"language": "en", "phonics_on": False}
def process_turn(self, history: list, user_id: Optional[str], role: str, session_id: str):
"""Process a conversation turn - stub implementation that yields nothing."""
return
yield
def start(self): pass # type: ignore
def stop(self): pass # type: ignore
# ----------- Hive core -----------
# type: ignore
class PromptCompiler:
def __init__(self):
self.override_head=None
self.override_budget=None
self.personas = { # type: ignore
"default": "You are a helpful assistant. Use the conversation history and relevant examples to answer the user's question.",
"en": "You are an English tutor AI. Use the conversation history and relevant examples to respond in a helpful, concise, and grammatically correct manner.",
"essay_review": "You are a writing critic. Use the conversation history and relevant examples to provide a detailed review of the following essay, focusing on structure, clarity, and vocabulary.",
"pronounce": "You are a pronunciation coach. Use the conversation history and relevant examples to explain how to say the word, using the provided phonetic hints.",
} # type: ignore
def compile(self, final_instruction: str, snippets: List[Dict], token_budget: int = 600, intent: str = "default", user_prefs: Optional[Dict] = None, role: str = "guest") -> str:
if self.override_budget:
pass
prefs = user_prefs or {}
user_lang = prefs.get("language", "en")
learning_level = prefs.get("learning_level", "intermediate") # e.g., beginner, intermediate, advanced
# Simple ranker: prioritize snippets with more overlapping words.
query_words = set(re.findall(r"\w+", final_instruction.lower()))
def rank_score(snippet): # type: ignore
text = (snippet.get("text", "") or "").lower()
return len(query_words.intersection(re.findall(r"\w+", text)))
ranked = sorted(snippets, key=rank_score, reverse=True)
# Consolidate the top snippets into a context block.
context_block = ""
if ranked:
for i, snippet in enumerate(ranked[:5]): # Use top 5 snippets
text = (snippet.get("text", "") or "").strip()
context_block += f"- {text}\n"
context_section = f"Relevant Examples:\n{context_block.strip()}" if context_block else "Relevant Examples:\nNo relevant examples found."
# Select persona based on intent and user profile
head = self.override_head or self.personas.get(intent, self.personas.get(user_lang, self.personas["default"]))
# Add personalization based on user profile
if learning_level == "beginner":
head += " Keep your language very simple and be extra encouraging."
if role in ("owner", "admin_super", "admin_general"):
head += f" You are speaking to an administrator ({role}). You may provide more technical details or system status if relevant."
# Build conversation history section from snippets that are part of the conversation
history_snippets = [s for s in snippets if s.get("dataset") == "conversation"]
history_text = ""
if history_snippets:
for turn in history_snippets:
# Assuming conversation turns are stored with 'user' and 'tutor' keys in metadata
history_text += f"User: {turn.get('user', '')}\nTutor: {turn.get('tutor', '')}\n"
history_section = f"Conversation History:\n{history_text.strip()}" if history_text else "Conversation History:\nNo recent history."
# Assemble the final prompt
return f"{head}\n\n{history_section}\n\n{context_section}\n\nNew User Input:\n{final_instruction}\n\nTutor Response:"
class KnowledgeStoreModule(KnowledgeStore, IModule): # type: ignore
def __init__(self, hive_instance: "Hive"): IModule.__init__(self, hive_instance); KnowledgeStore.__init__(self, hive_instance.config["HIVE_HOME"])
def start(self): pass
def stop(self): pass
class CurveStoreModule(CurveStore, IModule): # type: ignore
def __init__(self, hive_instance: "Hive"):
IModule.__init__(self, hive_instance)
CurveStore.__init__(self, hive_instance.config["CURVE_DIR"])
def start(self): pass
def stop(self): pass
class EngineModule(EngineCurve, IModule):
def __init__(self, hive_instance: "Hive"):
IModule.__init__(self, hive_instance)
EngineCurve.__init__(self)
def start(self): pass
def stop(self): pass
class OverlayModule(RuntimeOverlay, IModule):
def __init__(self, hive_instance: "Hive"):
IModule.__init__(self, hive_instance)
RuntimeOverlay.__init__(self)
def start(self): self.apply_to(self.hive)
def stop(self): pass
class CompilerModule(PromptCompiler, IModule):
def __init__(self, hive_instance: "Hive"): IModule.__init__(self, hive_instance); PromptCompiler.__init__(self); hive_instance.decoding_temperature=0.7
def start(self): self.hive.decoding_temperature = 0.4 if not self.hive.lite_mode else 0.7
def stop(self): pass
class Hive:
def __init__(self, model_id: Optional[str]=None, device: Optional[str]=None, caps: Optional[Dict]=None, lite: bool = False):
self.config = CFG
self.caps = caps or probe_caps()
self.lite_mode = lite
self.actual_mode = "unknown" # 'lite' | 'full-local' | 'full-remote' | 'full-fallback'
self.module_manager = ModuleManager() # type: ignore
self.llm_ready = threading.Event()
self.pipe = None
self.tok = None
self.web_threshold = 0.4 # Default value, can be overridden by overlay
self.model = None
self.client = None # For remote inference
# Module attributes will be provided via ModuleManager and property
self.lora_adaptation_counter = 0 # For periodic saving
self.optimizer = None # For LoRA adaptation
# accessors below. Do not pre-create instance attributes that would
# shadow those @property descriptors (they are non-data descriptors
# and would be bypassed by instance attributes).
self.kstore = None # Will be set in full mode
# Note: other modules (store, compiler, engine, overlay, changes,
# persistence, selfopt, librarian) are available via module_manager
# and accessed through their property getters defined later.
# Apply adaptive parameters from environment
# and accessed through their property getters defined later.
# Apply adaptive parameters from environment
self.retrieval_k = self.caps.get("retrieval_k", 6) # type: ignore
self.embedding_batch_size = self.caps.get("embedding_batch_size", 8) # type: ignore
self.cache_budget_gb = self.caps.get("cache_budget_gb", 1.5) # type: ignore
self.model_precision = self.caps.get("model_precision", "float32") # type: ignore
self.load_full_embedder = self.caps.get("load_full_embedder", True) # type: ignore
self.decoding_temperature = 0.7
if not model_id:
model_id, info = pick_model(self.caps)
device = info.get("device", "cpu")
self.model_id = model_id or CFG["MODEL_OVERRIDE"] or CANDIDATES[0][0]
self.device = device or ("cuda" if _has_gpu_env() else "cpu")
if self.lite_mode:
self._init_lite_mode()
else:
try:
self._init_full_mode()
except Exception as e:
print(f"[Hive] Full mode initialization failed: {e}. Falling back to lite mode.")
import traceback
traceback.print_exc()
print("[Hive] Switching to lite mode...")
self.lite_mode = True
self.actual_mode = "lite"
self._init_lite_mode()
print("[Hive] Successfully initialized in fallback lite mode.")
# Start background listener for BEL/EventBus messages if available
try:
self._bel_thread = threading.Thread(target=self._bel_listener, daemon=True, name="bel_listener")
self._bel_thread.start()
except Exception:
pass
def _init_lite_mode(self): # type: ignore
"""Initializes the Hive in lite mode."""
print("[Hive] Initializing in Lite Mode.")
try: self.actual_mode = 'lite'
except Exception: pass
# Minimal module set for lite mode: persistence so saves work, overlay for runtime patches
try:
persistence_module = PersistenceEngine(self, embedding_batch_size=1)
overlay_module = OverlayModule(self)
# Register minimal modules
for name, module in [("persistence", persistence_module), ("overlay", overlay_module)]:
self.module_manager.register(name, module)
except Exception as e:
print(f"[Hive] Lite mode module registration error: {e}")
# Setup the lightweight LLM pipeline (may be a remote client or fallback)
self._setup_llm_pipeline()
def _init_full_mode(self):
"""Initializes the Hive in full-featured mode with adaptive parameters."""
print(f"[Hive] Initializing in Full Mode (retrieval_k={self.retrieval_k}, "
f"embedding_batch_size={self.embedding_batch_size}, "
f"cache_budget_gb={self.cache_budget_gb}).")
# Instantiate modules first to resolve dependencies, then register them.
try:
kstore_module = KnowledgeStoreModule(self)
except Exception as e:
print(f"[Hive] Warning: KnowledgeStoreModule failed: {e}")
raise
try:
store_module = CurveStoreModule(self)
except Exception as e:
print(f"[Hive] Warning: CurveStoreModule failed: {e}")
raise
try:
librarian_module = LibrarianModule(self, store_module, kstore_module)
except Exception as e:
print(f"[Hive] Warning: LibrarianModule failed: {e}")
raise
try:
compiler_module = CompilerModule(self)
except Exception as e:
print(f"[Hive] Warning: CompilerModule failed: {e}")
raise
try:
engine_module = EngineModule(self)
except Exception as e:
print(f"[Hive] Warning: EngineModule failed: {e}")
raise
try:
overlay_module = OverlayModule(self)
except Exception as e:
print(f"[Hive] Warning: OverlayModule failed: {e}")
raise
try:
changes_module = ChangeManagerModule(self)
except Exception as e:
print(f"[Hive] Warning: ChangeManagerModule failed: {e}")
raise
# Capability-driven gating: only enable voice/video modules when device supports them
caps = getattr(self, 'caps', {}) or {}
has_audio_in = caps.get('has_microphone', False) or caps.get('has_audio_input', False)
has_audio_out = caps.get('has_speaker', False) or caps.get('has_audio_output', False)
has_camera = caps.get('has_camera', False)
voice_video_module = None
if has_audio_in or has_audio_out or has_camera:
try:
voice_video_module = VoiceServicesModule(self)
except Exception as e:
print(f"[Hive] Warning: could not create VoiceServicesModule: {e}")
# Persistence, SelfOpt, and Dialogue are important but not critical
persistence_module = None
try:
persistence_module = PersistenceEngine(self, embedding_batch_size=self.embedding_batch_size)
except Exception as e:
print(f"[Hive] Warning: PersistenceEngine failed: {e}")
selfopt_module = None
try:
selfopt_module = SelfOptimizerModule(self)
except Exception as e:
print(f"[Hive] Warning: SelfOptimizerModule failed: {e}")
dialogue_module = None
try:
dialogue_module = DialogueManager(self)
except Exception as e:
print(f"[Hive] Warning: DialogueManager failed: {e}")
# Register all instantiated modules in the correct order.
reg_list = [("kstore", kstore_module), ("store", store_module), ("librarian", librarian_module),
("compiler", compiler_module), ("engine", engine_module), ("overlay", overlay_module),
("changes", changes_module)]
if persistence_module:
reg_list.append(("persistence", persistence_module))
if selfopt_module:
reg_list.append(("selfopt", selfopt_module))
if dialogue_module:
reg_list.append(("dialogue", dialogue_module))
if voice_video_module is not None:
reg_list.insert(7, ("voice_video", voice_video_module))
for name, module in reg_list:
self.module_manager.register(name, module)
# Key modules are registered with the ModuleManager. Avoid assigning
# them to instance attributes here because doing so would shadow the
# @property accessors defined later (which fetch modules from the
# ModuleManager). We keep `kstore` as a direct attribute for callers
# that expect it.
self.kstore = kstore_module
# Mark full mode ready
try:
self.actual_mode = 'full-local'
except Exception:
pass
# CRITICAL FIX: Only re-initialize the LLM if NEITHER the local model NOR the remote client is already set up.
if not (hasattr(self, 'model') and self.model) and not (hasattr(self, 'client') and self.client):
print("[Hive] Loading LLM pipeline in Full Mode...")
if not self._setup_llm_pipeline():
# Propagate failure if LLM setup fails completely
raise RuntimeError("Failed to set up any LLM pipeline.")
else:
print("[Hive] LLM pipeline already loaded, skipping reload.")
# Start all modules
try:
self.module_manager.start_all()
except Exception as e:
print(f"[Hive] Warning: Some modules failed to start: {e}")
def _detect_display(self) -> bool:
"""Detect if a display is available on the system."""
if _os_name() == 'linux':
return bool(os.environ.get('DISPLAY')) or os.path.exists('/dev/fb0')
return False
def _load_local_model(self, trust: bool, **kwargs):
"""Loads the tokenizer and model for local inference."""
print(f"[Hive] Loading local model: {self.model_id} on device: {self.device}") # type: ignore
# TinyLlama (and similar Llama forks) require sentencepiece/tokenizer support.
if isinstance(self.model_id, str) and "TinyLlama" in self.model_id:
try:
import sentencepiece as _sp
except Exception:
raise ImportError("TinyLlama tokenizers require 'sentencepiece'. Please install it (pip install sentencepiece)")
self.tok = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=True, use_fast=False)
else:
self.tok = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=trust, chat_template=None, use_fast=False)
if self.tok.pad_token is None:
self.tok.pad_token = self.tok.eos_token
try:
self.model = AutoModelForCausalLM.from_pretrained(self.model_id, trust_remote_code=trust, **kwargs)
except Exception as e:
raise RuntimeError(f"Failed to load model {self.model_id}: {e}")
self.model.eval()
# If LoRA adaptation is enabled, wrap the base model with a PeftModel
if not self.lite_mode and CFG.get("HIVE_ENABLE_LORA_ADAPT", False):
print("[Hive] LoRA adaptation enabled. Wrapping model with PEFT.")
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=8,
lora_alpha=16,
target_modules=["q_proj", "v_proj"], # Common for Llama-like models
lora_dropout=0.05,
bias="none",
inference_mode=False # Set to False to enable training
)
self.model = get_peft_model(self.model, lora_config)
self.model.print_trainable_parameters()
# Try to load existing adapter weights
lora_dir = CFG.get("HIVE_LORA_DIR")
if lora_dir and os.path.exists(lora_dir):
try:
self.model = PeftModel.from_pretrained(self.model, lora_dir)
logging.info(f"Successfully loaded existing LoRA adapter from {lora_dir}")
except Exception as e:
logging.warning(f"Found LoRA directory but failed to load adapter: {e}. Starting with a fresh adapter.")
# Define stop tokens for generation
stop_token_names = ["<|endoftext|>", "<|file_separator|>", "<|user|>", "<|assistant|>", "<|im_start|>", "<|im_end|>", "</s>"] # type: ignore
self.stop_tokens = [tid for tid in self.tok.convert_tokens_to_ids(stop_token_names) if tid is not None]
if self.tok.eos_token_id is not None:
self.stop_tokens.append(self.tok.eos_token_id)
self.stopping_criteria = StoppingCriteriaList([StopOnTokens(self.stop_tokens)])
def _setup_llm_pipeline(self) -> bool:
"""Sets up the language model, tokenizer, and pipeline."""
trust = True
kwargs = {}
if torch and torch.cuda.is_available() and self.device == "cuda":
kwargs.update(dict(torch_dtype=torch.float16, device_map="auto"))
# Determine if remote inference is explicitly configured
use_remote_explicitly = CFG.get("HIVE_USE_HF_INFERENCE", False)
remote_endpoint_url = CFG.get("HIVE_HF_ENDPOINT")
if use_remote_explicitly or remote_endpoint_url:
logging.info("[Hive] Remote inference is explicitly configured. Attempting to connect.")
if self._try_setup_remote_inference(self.model_id, remote_endpoint_url):
self.llm_ready.set()
return True
else:
logging.error("[Hive] Explicit remote inference failed. No further fallbacks will be attempted.")
self._setup_fallback_llm() # Setup fallback but still signal failure to the caller
self.llm_ready.set()
return False
# --- Local-First, Remote-Fallback Strategy ---
# 1. Attempt to load the local model.
logging.info("[Hive] Attempting to load model locally first...")
try:
self._load_local_model(trust, **kwargs)
self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tok, device=self.device)
self.actual_mode = 'full-local'
logging.info(f"[Hive] Successfully loaded local model: {self.model_id}")
self.llm_ready.set()
return True
except Exception as e:
logging.warning(f"[Hive] Local model loading failed: {e}. Will attempt remote fallback.")
# 2. If local loading fails, attempt to fall back to remote inference.
logging.info("[Hive] Attempting to fall back to remote Hugging Face Inference API...")
if self._try_setup_remote_inference(self.model_id):
self.llm_ready.set()
return True
# 3. If both local and remote fail, set up the deterministic fallback LLM.
logging.error("[Hive] Both local and remote model setups failed. Activating deterministic fallback mode.")
self._setup_fallback_llm()
self.llm_ready.set()
return False # Signal that we are in a degraded state
def _try_setup_remote_inference(self, model_id_for_remote: str, endpoint_url: Optional[str] = None) -> bool:
"""Attempts to set up the Hugging Face InferenceClient. Returns True on success."""
try:
from huggingface_hub import InferenceClient
token = CFG.get("HF_READ_TOKEN") or os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN")
logging.info(f"[Hive] Setting up remote client for model: {model_id_for_remote} at endpoint: {endpoint_url or 'default'}")
self.client = InferenceClient(model=model_id_for_remote if not endpoint_url else None, token=token, timeout=60, base_url=endpoint_url)
def _remote_pipe(prompt, **kwargs):
messages = [{"role": "user", "content": prompt}]
resp = self.client.chat_completion(messages, max_tokens=kwargs.get('max_new_tokens', 256), temperature=kwargs.get('temperature', 0.7), stream=False)
return [{"generated_text": resp.choices[0].message.content}]
self.pipe = _remote_pipe
self.tok = AutoTokenizer.from_pretrained(model_id_for_remote, trust_remote_code=True, token=token)
self.model = None # type: ignore
self.actual_mode = 'full-remote'
logging.info("[Hive] Remote inference client successfully configured.")
return True
except Exception as e:
logging.warning(f"[Hive] Failed to set up remote inference client: {e}")
self.client = None
return False
def _setup_fallback_llm(self):
"""Configures the system to use the deterministic FallbackLLM."""
logging.warning("[Hive] Activating deterministic FallbackLLM due to model loading failures.")
self.model = FallbackLLM() # type: ignore
self.tok = None
def _fallback_pipe(prompt, **kwargs):
text = self.model.generate_text(prompt, **kwargs)
return [{"generated_text": text}]
self.pipe = _fallback_pipe
self.actual_mode = 'full-fallback' if not self.lite_mode else 'lite'
def summarize_for_memory(self, text:str, max_new_tokens:int=160)->str:
# Guard: Summarization requires a real local model.
if not self.model or not self.tok or not isinstance(self.model, torch.nn.Module):
return f"(Summary unavailable in {self.actual_mode} mode)"
prompt = (
"You are an English tutor AI. Summarize the following conversation concisely "
"while preserving important context, corrections, and grammar tips. Keep it brief.\n\n"
"Conversation:\n"
f"{text[:4000]}\n\n"
"Summary:"
)
inputs = self.tok(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
output_ids = self.model.generate(
inputs.input_ids,
attention_mask=inputs.attention_mask,
max_new_tokens=max_new_tokens,
do_sample=False,
temperature=0.01,
pad_token_id=self.tok.eos_token_id,
stopping_criteria=self.stopping_criteria,
)
# Decode only the newly generated tokens
generated_tokens = output_ids[0, inputs.input_ids.shape[1]:]
generated_text = self.tok.decode(generated_tokens, skip_special_tokens=True)
return generated_text.strip()
def summarize_active_memory(self, history: List[Dict]) -> List[Dict]:
"""
If conversation history exceeds a threshold, summarize the oldest part
to keep the context manageable while retaining information.
"""
threshold = int(CFG.get("HIVE_SUMMARY_THRESHOLD", 30))
if len(history) <= threshold:
return history
logging.info(f"Conversation history length ({len(history)}) exceeds threshold ({threshold}). Summarizing...")
# Keep the most recent 1/3 of the conversation as-is
keep_recent_count = threshold // 3
to_summarize = history[:-keep_recent_count]
recent_turns = history[-keep_recent_count:]
# Format the old part of the conversation for the summarizer
text_to_summarize = "\n".join([f"User: {turn.get('content', '')}" if turn.get('role') == 'user' else f"Assistant: {turn.get('content', '')}" for turn in to_summarize])
# Use the LLM to create a summary
summary_text = self.summarize_for_memory(text_to_summarize)
summary_turn = {"role": "system", "content": f"[Previous conversation summary: {summary_text}]"}
return [summary_turn] + recent_turns
def add_curve(self, text:str, meta:Dict, scope:str="general"): # type: ignore
if self.lite_mode: return
self.librarian.ingest_text(text, meta, scope)
class SandboxRunner:
"""Minimal sandbox runner using multiprocessing and resource limits.
Executes provided Python code in a child process with optional CPU/memory limits.
"""
def __init__(self, mem_limit_mb: int = 256, cpu_seconds: int = 5):
self.mem_limit_mb = mem_limit_mb
self.cpu_seconds = cpu_seconds
def _child_exec(self, code: str, q):
# Child process: apply resource limits and execute code in restricted globals
try:
# Apply limits if possible
try:
import resource
# Address space limit (in bytes)
as_bytes = self.mem_limit_mb * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (as_bytes, as_bytes))
resource.setrlimit(resource.RLIMIT_CPU, (self.cpu_seconds, self.cpu_seconds))
except Exception as e:
pass
# Restrict builtins: remove dangerous functions
safe_builtins = {k: __builtins__[k] for k in ("abs", "min", "max", "len", "range", "print", "str", "int", "float", "bool", "list", "dict", "set", "tuple") if k in __builtins__}
# Override import to prevent network/file imports
def _blocked_import(*args, **kwargs):
raise ImportError("Imports are disabled in the sandboxed runner")
safe_builtins['__import__'] = _blocked_import
globals_dict = {"__builtins__": safe_builtins}
locals_dict = {}
# Capture stdout/stderr
import io, sys
old_out, old_err = sys.stdout, sys.stderr
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
try:
exec(code, globals_dict, locals_dict)
out = sys.stdout.getvalue()
err = sys.stderr.getvalue()
q.put({"ok": True, "stdout": out, "stderr": err, "exitcode": 0})
finally:
sys.stdout, sys.stderr = old_out, old_err
except Exception as e:
try:
q.put({"ok": False, "stdout": "", "stderr": str(e), "exitcode": 1})
except Exception:
pass
def _bel_listener(self):
"""Background thread that listens for `bel_in:` messages on the global EventBus
and dispatches them to an internal handler. Posts results to `bel_out:<id>`.
"""
logging.info("BEL listener thread started.")
eb = globals().get('GLOBAL_EVENT_BUS', None)
if eb is None:
logging.warning("No GLOBAL_EVENT_BUS found; BEL listener exiting.")
return
while True:
try:
# Blocking wait for any key that starts with bel_in:
# Simple polling approach: iterate keys in the store under lock to find any bel_in
with eb._cond:
keys = [k for k in list(eb._store.keys()) if k.startswith('bel_in:')]
if not keys:
eb._cond.wait(timeout=1.0)
continue
key = keys[0]
payload = eb._store.pop(key)
# Process payload synchronously and post output
try:
out_payload = self._handle_bel_in(key, payload)
except Exception as e:
out_payload = {"ok": False, "error": str(e)}
out_key = f"bel_out:{key.split(':',1)[1]}"
eb.post(out_key, out_payload)
except Exception as e:
logging.error(f"Exception in BEL listener: {e}")
time.sleep(1.0)
def _handle_bel_in(self, key: str, payload: dict):
"""Minimal implementation of the BEL handling pipeline: validate, run chat, persist.
Returns a dict suitable for posting to bel_out.<id>
"""
# Validate payload shape
uid = payload.get('user_id')
text = payload.get('text') or payload.get('query') or ''
if not text:
return {"ok": False, "error": "No text provided"}
# Use the current hive instance to generate a reply (sync streaming not supported here)
try:
stream, postproc = self.chat(text, effective_role=payload.get('role','user'), caller_id=uid, history=payload.get('history', []))
# Collect full reply
reply = ''.join([chunk for chunk in stream])
# Optionally post-process
try:
processed = postproc(reply) if postproc else reply
except Exception:
processed = reply
# Persist minimal conversation record if persistence available
try:
if hasattr(self, 'persistence') and self.persistence:
self.persistence.save_turn({'user': uid, 'text': text, 'reply': processed, 'ts': time.time()})
except Exception:
pass
return {"ok": True, "reply": processed}
except Exception as e:
return {"ok": False, "error": str(e)}
def _stream_chat_completion_response(self, stream):
"""
Helper to parse a stream of bytes from a chat completion response into a stream of ChatCompletionStreamOutput objects.
Adapted from huggingface_hub.inference._common._stream_chat_completion_response.
"""
# This function is now a simple pass-through. The logic is moved to _iterate_remote_stream.
yield from stream
def online_update(self, query_hint: Optional[str]=None)->Dict:
if self.lite_mode: return {"ok": False, "reason": "Online features are disabled in Lite Mode."}
if not CFG["ONLINE_ENABLE"]: return {"ok":False,"reason":"online disabled"}
if not online_available(int(CFG["ONLINE_TIMEOUT"])): return {"ok":False,"reason":"offline"}
seen=_load_json(ONLINE_DB, {}) # type: ignore
urls=[u.strip() for u in (CFG["ONLINE_SOURCES"] or "").split(",") if u.strip()]
items=fetch_rss(urls, timeout=int(CFG["ONLINE_TIMEOUT"]), limit=30)
added=0
for it in items: # type: ignore
key=hashlib.sha1(((it.get("link") or "")+(it.get("title") or "")).encode("utf-8","ignore")).hexdigest()
if key in seen: continue
base=(it.get("title","")+"\n\n"+it.get("summary","")).strip()
summ=self.summarize_for_memory(base)
self.add_curve(summ, {"dataset":"online_rss","url":it.get("link"),"title":it.get("title"),"published":it.get("published")}, scope="general")
seen[key]=int(time.time()); added+=1 # type: ignore
_save_json(ONLINE_DB, seen); return {"ok":True,"added":added}
def web_update_and_store(self, query:str, max_docs:int, timeout:int)->int:
if self.lite_mode: return 0 # type: ignore
if not (CFG["ONLINE_ENABLE"] and online_available(timeout)): return 0
hits=asyncio.run(web_search_snippets(query, max_results=max_docs, timeout=timeout)); added=0
for h in hits:
body=(h.get("title","")+"\n\n"+(h.get("body","") or "")).strip()
if not body: continue
summ=self.summarize_for_memory(body)
meta={"dataset":"web_update","source":h.get("href",""),"title":h.get("title",""),"ts":time.time()}
self.add_curve(summ, meta, scope="general"); added+=1
return added
def _iterate_remote_stream(self, stream):
"""
Iterates over the remote stream and yields content.
This is in a separate function to isolate the for loop from the main try/except block.
It also handles StopIteration gracefully, which can occur with stream iterators.
Using a while/next loop is more robust for some stream iterators than a for loop.
"""
iterator = iter(stream)
while True:
try:
chunk = next(iterator)
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except StopIteration:
# The stream has ended, which is expected. Break the loop.
break
@property
def store(self) -> 'CurveStore': return self.module_manager.modules.get("store") # type: ignore
@property
def librarian(self) -> 'LibrarianCurve': return self.module_manager.modules.get("librarian") # type: ignore
@property
def engine(self) -> 'EngineCurve': return self.module_manager.modules.get("engine") # type: ignore
@property
def overlay(self) -> 'RuntimeOverlay': return self.module_manager.modules.get("overlay") # type: ignore
@property
def changes(self) -> 'ChangeManager': return self.module_manager.modules.get("changes") # type: ignore
@property
def compiler(self) -> 'PromptCompiler': return self.module_manager.modules.get("compiler") # type: ignore
@property
def selfopt(self) -> 'SelfOptimizer': return self.module_manager.modules.get("selfopt") # type: ignore
@property
def persistence(self) -> 'PersistenceEngine': return self.module_manager.modules.get("persistence") # type: ignore
@property
def dialogue_manager(self) -> 'DialogueManager': return self.module_manager.modules.get("dialogue") # type: ignore
def _prepare_chat_input(self, message: str, user_lang: str, phonics_on: bool, prompt_override: str | None) -> tuple[str, str]: # type: ignore
"""Determines intent and prepares the final message for the LLM."""
# In lite mode or if engine is not available, skip intent routing
intent = "chat"
if not self.lite_mode and self.engine is not None:
try:
intent = self.engine.choose_route(message)
except Exception:
intent = "chat"
final_message = message
if intent == "pronounce" or (phonics_on and user_lang == 'en' and not self.lite_mode):
match = re.search(r"(pronounce|say|spell|spelling of)\s+['\"]?([a-zA-Z\-']+)['\"]?", message, re.I)
word_to_process = match.group(2) if match else (message.split()[-1] if len(message.split()) < 4 else None)
if word_to_process:
phonics_hint = phonics(word_to_process)
final_message = f"Explain how to pronounce the word '{word_to_process}'. Use this phonics hint in your explanation: {phonics_hint}"
elif prompt_override:
final_message = f"{prompt_override}\n\nHere is the text to work on:\n{message}"
if "review" in prompt_override.lower() or "essay" in prompt_override.lower():
intent = "essay_review"
return final_message, intent
def _get_retrieval_context(self, message: str, effective_role: str, caller_id: str | None, k: int) -> list[dict]: # type: ignore
"""Performs RAG, with web search fallback if necessary."""
if self.lite_mode or self.librarian is None:
return []
try:
logging.info(f"Performing RAG for message: '{message[:50]}...'")
online_now = NET.online_quick()
if not online_now:
NET.kick_async()
snippets, scores = self.librarian.retrieve_scoped_with_scores(message, effective_role, caller_id, k=k)
cov = coverage_score_from_snippets(snippets, scores) # type: ignore
logging.info(f"Retrieved {len(snippets)} snippets from local curves. Coverage score: {cov:.2f} (Threshold: {self.web_threshold})")
if cov < self.web_threshold and CFG["ONLINE_ENABLE"] and online_now:
logging.info("Coverage below threshold. Attempting web search to augment knowledge.")
self.web_update_and_store(message, max_docs=int(CFG["ONLINE_MAX_RESULTS"] or 5), timeout=int(CFG["ONLINE_TIMEOUT"] or 8))
logging.info("Web search complete. Re-retrieving from updated curves.")
snippets, _ = self.librarian.retrieve_scoped_with_scores(message, effective_role, caller_id, k=k)
return snippets
except Exception as e:
logging.warning(f"RAG retrieval failed: {e}")
return []
def _postprocess_and_log(self, full_output: str, message: str, effective_role: str, caller_id: str | None, intent: str, snippets: list[dict]):
"""Cleans the LLM output and logs the interaction."""
try:
reply = _extract_assistant_reply(full_output)
# Final defensive sanitization to ensure we keep only the first assistant reply
reply = _final_sanitize_reply(reply)
except Exception:
# Fallback to previous simple heuristic
reply = full_output.rsplit("Assistant:", 1)[-1].strip()
reply = _final_sanitize_reply(reply)
if CFG["NO_PROFANITY"]:
reply = re.sub(r"\b(fuck|shit|bitch|asshole|cunt|dick|pussy|nigger|motherfucker)\b", "[censored]", reply, flags=re.I)
if caller_id and not self.lite_mode:
log_path = os.path.join(CFG["HIVE_HOME"], "users", "conversations", f"{caller_id}.jsonl")
log_entry = {"ts": time.time(), "message": message, "effective_role": effective_role, "intent": intent, "snippets_used": [s.get("text", "")[:100] for s in snippets[:3]], "reply": reply}
_append_jsonl(log_path, log_entry)
return reply
def adapt_lora(self, user_input: str, tutor_output: str):
"""
Fine-tunes the LoRA adapter online with a single interaction pair.
This is a lightweight, on-the-fly adaptation.
"""
if self.lite_mode or not CFG.get("HIVE_ENABLE_LORA_ADAPT", False) or not self.model or not self.optimizer or not hasattr(self.model, 'train'):
return
try:
self.model.train() # Set model to training mode
# Prepare input and labels for training
prompt = f"User: {user_input}\nTutor:"
full_text = f"{prompt} {tutor_output}"
inputs = self.tok(prompt, return_tensors="pt").to(self.device)
labels = self.tok(full_text, return_tensors="pt").input_ids.to(self.device)
# Forward pass
outputs = self.model(**inputs, labels=labels)
loss = outputs.loss
# Backward pass and optimization
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
logging.info(f"LoRA online adaptation complete. Loss: {loss.item():.4f}")
# Periodically save the adapter to disk
self.lora_adaptation_counter += 1
if self.lora_adaptation_counter % 10 == 0:
lora_dir = CFG.get("HIVE_LORA_DIR")
if lora_dir:
self.model.save_pretrained(lora_dir)
logging.info(f"Saved updated LoRA adapter to {lora_dir}")
except Exception as e:
logging.error(f"Error during LoRA adaptation: {e}")
finally:
if self.model:
self.model.eval() # Set model back to evaluation mode
def chat_stream(self, prompt: str, max_new_tokens: int, temperature: float):
"""Generator that yields tokens as they are generated. This is the unified streaming entry point."""
try:
# Wait for LLM to finish loading (background thread may still be working)
if hasattr(self, 'llm_ready'):
print("[chat_stream] Waiting for LLM to finish loading...", flush=True)
self.llm_ready.wait(timeout=120) # Wait up to 2 minutes for model to load
print("[chat_stream] LLM is ready, proceeding with generation.", flush=True)
except Exception as e:
print(f"[chat_stream] Error waiting for LLM: {e}")
yield f"[Error: LLM initialization failed: {e}]"
return
# Demo mode path (lightweight fallback for HF Spaces or explicit demo)
if hasattr(self, 'pipe') and callable(self.pipe) and not (hasattr(self, 'model') and self.model) and not (hasattr(self, 'client') and self.client):
# In demo mode: use the demo pipe to get a response
try:
response = self.pipe(prompt, max_new_tokens=max_new_tokens, temperature=temperature)
if isinstance(response, list) and response and isinstance(response[0], dict):
text = response[0].get("generated_text", "")
# Yield the text word-by-word to simulate streaming
for word in text.split():
yield word + " "
else:
yield str(response)
except Exception as e:
yield f"[Demo mode error: {e}]"
return
# Fallback lightweight path
if isinstance(getattr(self, 'model', None), FallbackLLM):
# Mark actual mode as degraded so the UI can display an appropriate badge
try: self.actual_mode = 'full-fallback' if not getattr(self,'lite_mode',False) else 'lite'
except Exception: pass
# Emit a clear, one-time warning to the user about limitations of degraded mode
if not getattr(self, '_degraded_alerted', False):
warning = (
"[DEGRADED MODE] The system is running a lightweight fallback model with limited knowledge and capabilities.\n"
"Responses may be inaccurate, incomplete, or overly generic. Install 'transformers' and 'torch', or provide a Hugging Face token/endpoint, to enable the full model.\n"
"If possible, retry later once dependencies are installed.\n"
)
yield warning
try: self._degraded_alerted = True
except Exception: pass
# Stream the fallback model's output after warning
yield from self.model.stream(prompt, max_new_tokens=max_new_tokens, temperature=temperature)
return
if hasattr(self, 'client') and self.client: # Remote Inference
stop_sequences = ["</s>", "Assistant:"] + [self.tok.decode(st) for st in self.stop_tokens]
try:
messages = [{"role": "user", "content": prompt}] # Or build from history if needed
stream = self.client.chat_completion(
messages,
max_tokens=int(max_new_tokens),
temperature=float(temperature),
stop=stop_sequences,
stream=True
)
# DEFINITIVE FIX: The stream object from the client raises StopIteration in a way
# that gets caught by the generic `except Exception`. To prevent this, we wrap the
# standard `for` loop (which is the correct way to iterate) in a `try...except StopIteration`
# block. This catches the "end of stream" signal and handles it gracefully.
try: # type: ignore
for chunk in stream:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except StopIteration: # type: ignore
pass # This is expected and means the stream finished successfully.
except Exception as e: # Catch all exceptions
error_message = f"[Error: Remote model request failed: {type(e).__name__}]"
# Try to get more details from HTTPError
if hasattr(e, 'response') and e.response is not None:
try:
details = e.response.json()
error_message += f" - Details: {details.get('error', 'N/A')}"
except json.JSONDecodeError:
error_message += f" - Status: {e.response.status_code}, Body: {e.response.text[:100]}"
print(f"[ModelBridge] {error_message} | Full exception: {e}")
yield error_message
return # IMPORTANT: Exit here if we are using remote inference
if not ((hasattr(self, 'client') and self.client) or (hasattr(self, 'model') and self.model)):
yield "[Error: Model is not available]"
return
if not self.model: # Guard against local model failing to load
yield "[Error: Local model is not loaded properly.]"
return
try:
streamer = TextIteratorStreamer(self.tok, skip_prompt=True, skip_special_tokens=True)
inputs = self.tok(prompt, return_tensors="pt").to(self.device)
# Start generation in a separate thread to allow streaming
generation_kwargs = dict(
inputs=inputs.input_ids,
attention_mask=inputs.attention_mask,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=temperature,
pad_token_id=self.tok.eos_token_id,
stopping_criteria=self.stopping_criteria,
)
# Run generation in a separate thread
thread = threading.Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
# Yield tokens from the streamer
for new_text in streamer:
yield new_text
thread.join() # Ensure the generation thread finishes
except Exception as e:
error_message = f"[Error: Local model generation failed: {e}]"
print(f"[ModelBridge] {error_message}")
yield error_message
return
def chat(self, message:str, effective_role:str, caller_id: Optional[str],
k:int=None, max_new_tokens:int=1024, temperature:float=None, prompt_override: Optional[str] = None, history: Optional[list] = None): # type: ignore
"""
Handles a chat message by retrieving context, compiling a prompt, and streaming the response.
This method now includes in-session memory summarization for long conversations.
This method is designed to be responsive by offloading slow I/O (RAG) to a background thread.
ALWAYS returns a tuple: (response_stream, post_process_func).
"""
try:
temp = temperature if temperature is not None else (self.decoding_temperature if not self.lite_mode else 0.7)
user_prefs = {}
# Summarize active memory if it's too long
if history:
history = self.summarize_active_memory(history)
if not self.lite_mode and "dialogue" in self.module_manager.modules:
user_prefs = self.module_manager.modules["dialogue"].get_user_prefs(caller_id)
final_message, intent = self._prepare_chat_input(message, user_prefs.get("language", "en"), user_prefs.get("phonics_on", False), prompt_override)
if self.lite_mode:
# Lite mode: build a simple, model-agnostic prompt and return a post-process hook
# Use plain-role markers (User/Assistant) for clarity across different chat templates
head = (
"You are an encouraging and concise English tutor. "
"Answer the user's question directly and only as the assistant. "
"Do NOT simulate, repeat, or quote any previous conversation, assistant replies, or role markers. "
"Respond ONLY to the user's latest question."
)
prompt_parts = [head]
# Normalize history defensively to ensure no assistant/model outputs are used as context
prior_user_messages = _normalize_history_to_user_messages(history, max_user_turns=8)
for content in prior_user_messages:
prompt_parts.append(f"User: {content}")
prompt_parts.append(f"User: {final_message}")
prompt_parts.append("Assistant:")
prompt = "\n".join(prompt_parts)
response_stream = self.chat_stream(prompt, max_new_tokens=int(max_new_tokens), temperature=float(temp))
def _lite_postprocess(full_output: str) -> str:
# Extract the assistant reply from the raw model output.
out = full_output
try: # type: ignore
reply = _extract_assistant_reply(out)
reply = _final_sanitize_reply(reply)
return reply
except Exception:
# Best-effort fallback
out = out
if 'Assistant:' in out:
out = out.split('Assistant:', 1)[1]
if 'User:' in out:
out = out.split('User:', 1)[0]
return _final_sanitize_reply(out.strip())
return response_stream, _lite_postprocess
# --- Full Mode: Asynchronous RAG + Streaming ---
# If compiler is not available, fall back to lite mode behavior
if self.compiler is None:
print("[Hive.chat] Compiler not available, falling back to lite mode prompt")
head = (
"You are an encouraging and concise English tutor. "
"Answer the user's question directly and only as the assistant. "
"Do NOT simulate, repeat, or quote any previous conversation, assistant replies, or role markers. "
"Respond ONLY to the user's latest question."
)
prompt_parts = [head]
prior_user_messages = _normalize_history_to_user_messages(history, max_user_turns=8)
for content in prior_user_messages:
prompt_parts.append(f"User: {content}")
prompt_parts.append(f"User: {final_message}")
prompt_parts.append("Assistant:")
prompt = "\n".join(prompt_parts)
response_stream = self.chat_stream(prompt, max_new_tokens=int(max_new_tokens), temperature=float(temp))
return response_stream, lambda x: x
snippet_queue = queue.Queue()
kk = k if k is not None else (self.retrieval_k if hasattr(self, 'retrieval_k') else 6)
threading.Thread(target=lambda: snippet_queue.put(self._get_retrieval_context(message, effective_role, caller_id, kk)), daemon=True).start()
# Wait for snippets from the background thread
snippets = snippet_queue.get()
prompt = self.compiler.compile(final_message, snippets, token_budget=int(CFG["CTX_TOKENS"]), intent=intent)
stream = self.chat_stream(prompt, max_new_tokens=int(max_new_tokens), temperature=float(temp))
def post_process_func(full_output):
reply = self._postprocess_and_log(full_output, message, effective_role, caller_id, intent, snippets)
# Hook for potential LoRA adaptation
self.adapt_lora(message, reply)
return reply
return stream, post_process_func
except Exception as e:
print(f"[Hive.chat] Exception in chat method: {e}")
import traceback
traceback.print_exc()
# Return a fallback generator and identity post-process
def _error_gen():
yield f"[Error: {str(e)[:100]}]"
return _error_gen(), lambda x: x
def online_available(timeout: int = 2) -> bool:
"""Checks if an online connection is available."""
return NET.quality_ms() > 0
# --- Global UI Helper ---
# This function needs to be defined before launch_ui so it can be used by event handlers.
def get_hive_instance(bootstrap_instance: "Bootstrap" = None):
"""
Global function to safely get the current Hive instance.
It prioritizes the full instance if ready, otherwise falls back to the lite one.
"""
try:
if bootstrap_instance is None:
bootstrap_instance = globals().get('bootstrap')
if not bootstrap_instance:
logging.error("[get_hive_instance] CRITICAL: No bootstrap instance available.")
# Return the last known global instance as a final fallback
return HIVE_INSTANCE
full = getattr(bootstrap_instance, 'hive_instance', None) if bootstrap_instance else None
lite = getattr(bootstrap_instance, 'hive_lite_instance', None) if bootstrap_instance else None
# 1. Prioritize the full instance if it's ready and not in lite mode.
if full and not getattr(full, 'lite_mode', True) and callable(getattr(full, 'chat', None)):
logging.debug(f"[get_hive_instance] Using full instance (mode: {getattr(full, 'actual_mode', 'unknown')}).")
return full
# 2. Otherwise, use the lite instance if it's available.
elif lite and callable(getattr(lite, 'chat', None)):
logging.debug("[get_hive_instance] Using lite instance while full core initializes.")
return lite
# 3. If neither is ready, return the last known instance or None.
else:
logging.warning("[get_hive_instance] Neither full nor lite instance is ready.")
return lite or full or HIVE_INSTANCE # Prefer lite, then full, then global fallback
except Exception as e:
logging.error(f"[get_hive_instance] Exception occurred: {e}")
return HIVE_INSTANCE # Return last known global on any error.
# --------------- UI ---------------
HELP=f"""
**Admin/User mode**: Admins (general/super) and Owner log in with password (Owner also needs second factor). After login choose Admin or User mode.
**Owner-only code edits** are enforced via Change Manager policy. Hive can sandbox, test, and propose; code writes require Owner approval (`OPT_AUTO_APPLY=1`) unless Owner applies manually.
**Offline/Online**: Works fully offline from curves. If online and enabled, fetches RSS/web snippets ➡️ summarizes locally ➡️ saves to curves (persists offline).
**Voice**: Faster-Whisper ASR (auto language), Piper TTS mixed-language, phonics hints (English).
**Privacy**: Sensitive/first-person inputs route to user-private library; neutral info to general.
"""
# --- Placeholder functions to resolve NameErrors ---
def staged_ingest_chain_if_enabled(curve_dir: str) -> None:
"""
If enabled via config, runs a staged data ingestion process.
This function respects chaining flags to run multiple stages across reboots,
and resource limits to avoid filling the disk.
"""
if not CFG.get("HIVE_INGEST_STAGED"):
logging.info("[Ingest] Staged ingestion is disabled by config.")
return
state_file = os.path.join(CFG["STATE_DIR"], "ingest_state.json")
state = _load_json(state_file, {"last_stage_completed": -1, "runs_this_boot": 0})
if not CFG.get("HIVE_INGEST_CHAIN") and not CFG.get("HIVE_INGEST_NEXT"):
logging.info("[Ingest] Ingestion chaining is disabled and HIVE_INGEST_NEXT is not set. Skipping.")
return
if state.get("runs_this_boot", 0) >= int(CFG.get("HIVE_INGEST_CHAIN_MAX", 2)):
logging.info(f"[Ingest] Reached max ingestion runs for this boot ({CFG['HIVE_INGEST_CHAIN_MAX']}).")
return
try:
free_gb = psutil.disk_usage(curve_dir).free / (1024**3)
if free_gb < int(CFG.get("HIVE_INGEST_MIN_FREE_GB", 8)):
logging.warning(f"[Ingest] Insufficient disk space ({free_gb:.1f}GB free). Need at least {CFG['HIVE_INGEST_MIN_FREE_GB']}GB. Skipping.")
return
except Exception as e:
logging.error(f"[Ingest] Could not check disk space: {e}")
return
sources = (CFG.get("INGEST_SOURCES") or "").split(',')
if not any(s.strip() for s in sources):
logging.info("[Ingest] No ingestion sources configured in HIVE_INGEST_SOURCES. Skipping.")
return
stage_size = int(CFG.get("HIVE_INGEST_STAGE_SIZE", 3))
start_index = (state.get("last_stage_completed", -1) + 1) * stage_size
if start_index >= len(sources):
logging.info("[Ingest] All ingestion stages are complete.")
state["last_stage_completed"] = (len(sources) // stage_size)
_save_json(state_file, state)
return
current_stage_sources = sources[start_index : start_index + stage_size]
logging.info(f"[Ingest] Starting ingestion stage {state.get('last_stage_completed', -1) + 1} with sources: {current_stage_sources}")
# Process each source in the current stage with robust error handling
for source_name in current_stage_sources:
try:
# Here you would call the actual data loading and processing logic for a single source.
# For this implementation, we'll simulate it with a log message.
logging.info(f"[Ingest] Processing source: {source_name}...")
# e.g., data = load_dataset(source_name); hive.kstore.ingest_text(...)
except Exception as e:
logging.error(f"[Ingest] Failed to process source '{source_name}': {e}. Skipping to next source.")
continue # Move to the next source in the stage
time.sleep(5) # Simulate work
state["last_stage_completed"] += 1
state["runs_this_boot"] = state.get("runs_this_boot", 0) + 1
_save_json(state_file, state)
logging.info(f"[Ingest] Stage {state['last_stage_completed']} complete. Total runs this boot: {state['runs_this_boot']}.")
def _archive_memory(curve_dir: str) -> Tuple[bool, str]:
"""
Compresses the memory (curves) directory into a .tar.gz archive.
The archive path is determined by the HIVE_ARCHIVE_PATH config variable.
"""
if not os.path.isdir(curve_dir):
msg = f"Memory archive failed: Source directory '{curve_dir}' not found."
logging.error(f"[{__name__}] {msg}")
return False, msg
archive_path = os.path.join(CFG["HIVE_HOME"], CFG["HIVE_ARCHIVE_PATH"])
logging.info(f"[{__name__}] Starting memory archive: '{curve_dir}' -> '{archive_path}'")
try:
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(curve_dir, arcname=os.path.basename(curve_dir))
msg = f"Memory archived successfully to '{archive_path}'."
logging.info(f"[{__name__}] {msg}")
return True, msg
except Exception as e:
msg = f"Memory archive failed: {e}"
logging.error(f"[{__name__}] {msg}")
return False, msg
def restore_curves_if_missing(curve_dir: str) -> Tuple[bool, str]:
"""
If memory (curves) are missing, restores them from a pre-configured source.
Priority: Hugging Face Dataset > Remote URL > Local Archive.
"""
if not CFG.get("CURVES_AUTO_RESTORE"):
return False, "Auto-restore is disabled by config."
if _curves_ready(curve_dir):
return False, "Curves already exist, skipping restore."
os.makedirs(curve_dir, exist_ok=True)
archive_path = None
source_type = None
# 1. Try Hugging Face Hub dataset
if CFG.get("CURVES_HF_DATASET"):
try:
from huggingface_hub import hf_hub_download
logging.info(f"[Restore] Attempting to download from HF Dataset: {CFG['CURVES_HF_DATASET']}")
archive_path = hf_hub_download(
repo_id=str(CFG["CURVES_HF_DATASET"]),
filename=str(CFG.get("CURVES_HF_SUBPATH") or "curves.tar.gz"),
repo_type="dataset",
token=CFG.get("HF_READ_TOKEN") or None,
)
source_type = "Hugging Face Dataset"
except Exception as e:
logging.warning(f"[Restore] Failed to download from HF Dataset: {e}")
# 2. Try remote URL if HF download failed or wasn't configured
if not archive_path and CFG.get("CURVES_ARCHIVE_URL"):
url = str(CFG["CURVES_ARCHIVE_URL"])
logging.info(f"[Restore] Attempting to download from URL: {url}")
try:
tmp_path = os.path.join(tempfile.gettempdir(), "curves_download.tar.gz")
_download(url, tmp_path)
archive_path = tmp_path
source_type = "Remote URL"
except Exception as e:
logging.warning(f"[Restore] Failed to download from URL: {e}")
# 3. Try local archive if other methods failed
if not archive_path and CFG.get("CURVES_ARCHIVE_LOCAL"):
local_path = os.path.join(CFG["HIVE_HOME"], str(CFG["CURVES_ARCHIVE_LOCAL"]))
if os.path.exists(local_path):
logging.info(f"[Restore] Found local archive: {local_path}")
archive_path = local_path
source_type = "Local Archive"
if not archive_path:
return False, "No valid restore source found or configured."
logging.info(f"[Restore] Extracting memory from '{archive_path}' (Source: {source_type}) to '{curve_dir}'...")
try:
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(path=os.path.dirname(curve_dir)) # Extract into parent of curve_dir
msg = f"Memory restored successfully from {source_type}."
logging.info(f"[Restore] {msg}")
return True, msg
except Exception as e:
msg = f"Failed to extract memory archive: {e}"
logging.error(f"[Restore] {msg}")
return False, msg
def fetch_rss(urls: List[str], timeout: int = 10, limit: int = 50) -> List[Dict]:
"""Fetches and parses multiple RSS feeds, returning a list of entries."""
if not feedparser:
logging.warning("[RSS] feedparser is not installed. Skipping RSS fetch.")
return []
all_entries = []
socket.setdefaulttimeout(timeout)
for url in urls:
try:
feed = feedparser.parse(url)
for entry in feed.entries:
all_entries.append(entry)
except Exception as e:
logging.error(f"[RSS] Failed to fetch or parse feed '{url}': {e}")
return all_entries[:limit]
def launch_ui(bootstrap_instance: "Bootstrap"):
# Make bootstrap_instance available to all nested event handlers
def replay_assistant(chatbot_history, selected_text=None):
"""Replay the last assistant message (or selected text) as audio.
Gracefully falls back to sine-wave audio if TTS unavailable.
Works in all environments (HF Spaces, local or containerized hosts).
"""
hive = get_hive_instance(bootstrap_instance)
if not hive:
return None
# Ensure TTS service exists
if not hasattr(hive, 'tts_service') or hive.tts_service is None:
try:
hive.tts_service = TTSService()
except Exception as e:
print(f"[UI] Could not create TTS service: {e}")
return None
if not chatbot_history:
return None
# If a specific message was selected on the client, use it
if selected_text:
text = selected_text
try:
path = hive.tts_service.synthesize(text, None)
if path:
print(f"[UI] replay_assistant generated audio: {path}")
return path
except Exception as e:
print(f"[UI] replay_assistant TTS failed: {e}")
import traceback
traceback.print_exc()
return None
# Otherwise, find the last assistant message
for m in reversed(chatbot_history):
if m.get('role') == 'assistant' and m.get('content'):
text = m.get('content')
try:
path = hive.tts_service.synthesize(text, None)
if path:
print(f"[UI] replay_assistant generated audio: {path}")
return path
except Exception as e:
print(f"[UI] replay_assistant TTS failed: {e}")
import traceback
traceback.print_exc()
return None
return None
def edit_last_input(chatbot_history, selected_text=None):
# This function is now defined before it's used.
# The implementation is moved from later in the file.
pass # Placeholder for the moved function body
_bootstrap = bootstrap_instance
with gr.Blocks(title="Hive 🐝") as demo:
# [All the UI code between here and the final return]
with gr.Row():
with gr.Column(scale=3):
gr.Markdown(f"## {CFG['AGENT_NAME']} 🐝")
# Small live badge showing the actual_mode (updated on load)
agent_mode_badge = gr.Markdown("**Mode:** unknown", elem_id="agent_mode_badge")
core_status = gr.Markdown("⏳ **Initializing Full Hive Core...** (Est. 1-5 mins). You can chat with the Lite model now. Advanced features will be enabled shortly.") # type: ignore
init_progress = gr.Markdown("🔄 **Initialization Progress:** Starting...", visible=True) # type: ignore
chatbot = gr.Chatbot(height=600, type="messages", label="Chat", placeholder="Initializing...", show_copy_button=True)
chatbot.elem_id = "hive_chatbot"
with gr.Row():
# Add a stop button, initially hidden
stop_btn = gr.Button("Stop", variant="stop", visible=False, scale=1)
with gr.Row(scale=2):
clear_btn = gr.Button("🗑️ Clear Chat", min_width=50)
correction_btn = gr.Button("✍️ Correct Last Reply", min_width=50)
vad_indicator = gr.HTML("", elem_id="vad-indicator-container")
msg = gr.MultimodalTextbox(placeholder="Please wait for the model to load...", interactive=False, show_label=False, container=False, scale=7)
with gr.Row(visible=False):
replay_svg = '<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><polygon points="11 5 6 9 2 9 2 15 6 15 11 19 11 5"></polygon><path d="M19.07 4.93a10 10 0 0 1 0 14.14M15.54 8.46a5 5 0 0 1 0 7.07"></path></svg>'
edit_svg = '<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M11 4H4a2 2 0 0 0-2 2v14a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2v-7"></path><path d="M18.5 2.5a2.121 2.121 0 0 1 3 3L12 15l-4 1 1-4 9.5-9.5z"></path></svg>'
up_svg = '<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M7 10v12"></path><path d="M15 5.88 14 10h5.83a2 2 0 0 1 1.92 2.56l-2.33 8A2 2 0 0 1 18.5 22H4a2 2 0 0 1-2-2v-8a2 2 0 0 1 2-2h3Z"></path></svg>'
down_svg = '<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M7 14v-12"></path><path d="M15 18.12 14 14H8.17a2 2 0 0 1-1.92-2.56l2.33-8A2 2 0 0 1 10.5 2H20a2 2 0 0 1 2 2v8a2 2 0 0 1-2 2h-3Z"></path></svg>'
small_replay_btn = gr.Button(value=replay_svg, elem_id="small_replay_template")
small_edit_btn = gr.Button(value=edit_svg, elem_id="small_edit_template")
small_up_btn = gr.Button(value=up_svg, elem_id="small_up_template")
small_down_btn = gr.Button(value=down_svg, elem_id="small_down_template")
feedback_ack = gr.Markdown("", visible=True)
# --- Custom JavaScript and CSS for per-message buttons ---
gr.HTML("""
<script>
function setupHiveChatbotButtons() {
const chatbox = document.getElementById('hive_chatbot');
if (!chatbox) { return; }
// Function to add buttons to a message
const addButtonsToMessage = (messageElement) => {
if (messageElement.querySelector('.hive-action-buttons')) { return; }
const isUserMessage = messageElement.classList.contains('user');
const messageContentElement = messageElement.querySelector('.message-text-content');
if (!messageContentElement) { return; }
const messageText = messageContentElement.innerText;
const buttonContainer = document.createElement('div');
buttonContainer.className = 'hive-action-buttons';
// --- 1. Handle custom buttons (replay, edit, feedback) ---
const templateMap = {
'small_replay_template': 'replay',
'small_edit_template': 'edit',
'small_up_template': 'upvote',
'small_down_template': 'downvote'
};
for (const templateId in templateMap) {
// Edit button only for user messages
if (templateId === 'small_edit_template' && !isUserMessage) { continue; }
// Replay button only for assistant messages
if (templateId === 'small_replay_template' && isUserMessage) { continue; }
const templateButton = document.getElementById(templateId);
if (templateButton) {
const newButton = templateButton.cloneNode(true);
newButton.id = '';
newButton.style.display = 'inline-block';
newButton.onclick = () => {
const selectedMessageInput = document.getElementById('selected_message').querySelector('textarea');
if (selectedMessageInput) {
selectedMessageInput.value = messageText;
const event = new Event('input', { bubbles: true });
selectedMessageInput.dispatchEvent(event);
}
templateButton.click();
};
buttonContainer.appendChild(newButton);
}
}
// --- 2. Handle Gradio's built-in copy button ---
const copyButton = messageElement.querySelector('.copy-button');
if (copyButton) {
// Clone it to move it into our container
const newCopyButton = copyButton.cloneNode(true);
newCopyButton.style.display = 'inline-block';
newCopyButton.addEventListener('click', (e) => {
e.stopPropagation();
navigator.clipboard.writeText(messageText).then(() => {
// Optional: show a "Copied!" tooltip
});
});
buttonContainer.appendChild(newCopyButton);
// Hide the original copy button
copyButton.style.display = 'none';
}
// Append the buttons container to the message
messageElement.appendChild(buttonContainer);
};
// Use MutationObserver to detect when new messages are added
const observer = new MutationObserver((mutationsList) => {
for (const mutation of mutationsList) {
if (mutation.type === 'childList') {
mutation.addedNodes.forEach(node => {
if (node.nodeType === 1 && node.classList.contains('message-row')) {
addButtonsToMessage(node);
}
});
}
}
});
// Initial scan for existing messages
chatbox.querySelectorAll('.message-row').forEach(addButtonsToMessage);
// Start observing the chatbox for changes
observer.observe(chatbox, { childList: true, subtree: true });
}
// Run the setup function when the Gradio app is ready
document.addEventListener('DOMContentLoaded', () => {
const interval = setInterval(() => {
if (document.getElementById('hive_chatbot')) {
clearInterval(interval);
setupHiveChatbotButtons();
}
}, 100);
});
</script>
<style>
.hive-action-buttons {
display: flex;
gap: 4px;
margin-top: 4px;
opacity: 0; /* Hidden by default */
justify-content: flex-end; /* Align buttons to the right */
transition: opacity 0.3s ease-in-out; /* Smooth transition for the fade effect */
}
.message-row:hover .hive-action-buttons, .message-row:focus-within .hive-action-buttons {
opacity: 1;
}
.hive-action-buttons button {
background: none;
border: none;
border-radius: 50%; /* Make buttons circular */
padding: 4px;
width: 28px;
height: 28px;
display: flex;
align-items: center;
justify-content: center;
cursor: pointer;
color: #6b7280; /* Gray color for the icon */
transition: background-color 0.2s ease;
}
.hive-action-buttons button:hover {
background-color: #f3f4f6; /* Light gray background on hover */
color: #111827; /* Darker icon on hover */
}
</style>
""")
# Hook feedback buttons to record feedback for the selected message
def _feedback_handler(selected_msg, uid, role, typ, request):
return record_feedback(selected_msg, uid, role, typ, request)
# UI for correction
with gr.Blocks() as correction_modal:
with gr.Column(visible=False, elem_id="correction_ui") as correction_ui:
gr.Markdown("### Correct Assistant's Last Reply", elem_id="correction_header")
original_user_input_state = gr.Textbox(label="Original User Prompt", interactive=False)
original_assistant_reply_state = gr.Textbox(label="Original Assistant Reply", interactive=False)
corrected_reply_box = gr.Textbox(lines=5, label="Corrected Response")
with gr.Row():
submit_correction_btn = gr.Button("Submit Correction")
cancel_correction_btn = gr.Button("Cancel")
def show_correction_ui(chatbot_history):
if not chatbot_history or len(chatbot_history) < 2:
return gr.update(visible=False), "", "", ""
last_user = chatbot_history[-2].get('content', '') if chatbot_history[-2].get('role') == 'user' else ''
last_assistant = chatbot_history[-1].get('content', '') if chatbot_history[-1].get('role') == 'assistant' else ''
return gr.update(visible=True), last_user, last_assistant, last_assistant
with gr.Column(scale=1, min_width=300):
with gr.Sidebar():
uid_state=gr.State(None); role_state=gr.State("guest"); mode_state=gr.State("user"); phonics_state=gr.State(False) # type: ignore
# Hidden textbox to store the currently selected message text (set by client-side JS)
selected_message = gr.Textbox(visible=False, elem_id="selected_message")
feedback_up_state = gr.State("up")
feedback_down_state = gr.State("down")
with gr.Accordion("Login & Profile", open=True):
login_name=gr.Textbox(label="Name or ID")
login_pass=gr.Textbox(label="Password (if required)", type="password")
login_second=gr.Textbox(label="Second (owner only)", type="password")
login_btn=gr.Button("Login")
login_status=gr.Markdown(elem_id="login_status") # type: ignore
profile_status = gr.Markdown("Login to see your profile.")
profile_save_btn = gr.Button("Save Profile")
with gr.Accordion("🌐 Language Preference", open=False):
profile_lang = gr.Dropdown(choices=["en", "zh", "fr", "es", "de", "ar", "pt", "ko", "ru", "hi", "bn", "ja", "sw"],
label="Preferred Language", value="en")
with gr.Accordion("🗣️ Phonics Assist", open=False):
gr.Markdown("Enable to get phonetic hints for English words when using the 'pronounce' command.")
profile_phonics = gr.Checkbox(label="Enable Phonics Assist (for English)")
with gr.Accordion("🧠 Memory & Vocabulary", open=False):
summary_output = gr.Markdown("Initializing... (Full core required, est. 1-2 min)")
summary_btn = gr.Button("Show Memory Summary", interactive=False)
vocab_output = gr.Markdown("---")
vocab_btn = gr.Button("Get New Word", interactive=False)
progress_output = gr.Markdown("---")
with gr.Accordion("🗣️ Voice & Hands-Free", open=False, visible=True) as voice_accordion:
voice_status_md = gr.Markdown("Initializing voice models... (Est. 15-90 sec)")
vocal_chat_state = gr.State({"active": False, "last_interaction_time": time.time(), "status_text": "Status: Inactive", "always_listening": False})
vocal_chat_btn = gr.Button("Start Hands-Free Conversation", interactive=False)
stop_voice_btn = gr.Button("Stop Voice", interactive=True, visible=True)
vocal_chat_status = gr.Markdown("Status: Inactive", visible=False) # Hide old status markdown
# Users can opt-in to always-listening mode; in UI context it just keeps
# the mic active without a timeout.
profile_always_listen = gr.Checkbox(label="Always listen (stay active)", value=False)
conversation_timeout_slider = gr.Slider(minimum=5, maximum=61, value=15, step=1, label="Conversation Timeout (seconds)", info="Slide to max (61) to disable timeout.")
unified_mic = gr.Audio(sources=["microphone"], streaming=True, visible=False, autoplay=False, elem_id="unified_mic")
ptt_reply_audio = gr.Audio(type="filepath", label="Assistant's Voice Reply", autoplay=True, visible=False) # Keep for audio output
with gr.Accordion("Voice Login", open=False):
gr.Markdown("Enroll your voice to enable password-free login for user accounts.")
enroll_audio = gr.Audio(sources=["microphone"], type="filepath", label="Record 5-10s for voiceprint", interactive=False)
with gr.Row():
enroll_btn = gr.Button("Enroll Voice for Current User", interactive=False)
enroll_status = gr.Markdown()
gr.Markdown("---")
gr.Markdown("After enrolling, you can log in by recording your voice here.")
with gr.Row():
who_btn = gr.Button("Login by Voice", interactive=False)
who_status = gr.Markdown()
# After the selected_message and uid/role states exist, wire feedback buttons
small_up_btn.click(_feedback_handler, [selected_message, uid_state, role_state, feedback_up_state], [feedback_ack])
small_down_btn.click(_feedback_handler, [selected_message, uid_state, role_state, feedback_down_state], [feedback_ack])
# Wire up replay and edit buttons
small_replay_btn.click(replay_assistant, [chatbot, selected_message], [ptt_reply_audio])
small_edit_btn.click(edit_last_input, [chatbot, selected_message], [msg])
# Wire up correction UI
correction_btn.click(show_correction_ui, [chatbot], [correction_ui, original_user_input_state, original_assistant_reply_state, corrected_reply_box])
submit_correction_btn.click(record_correction, [original_user_input_state, original_assistant_reply_state, corrected_reply_box, uid_state, role_state], [feedback_ack]).then(lambda: gr.update(visible=False), None, [correction_ui])
cancel_correction_btn.click(lambda: gr.update(visible=False), None, [correction_ui])
with gr.Accordion("📸 Camera", open=False, visible=True) as camera_accordion:
camera_status_md = gr.Markdown("Camera feature disabled or initializing...")
video_out = gr.Image(label="Camera Feed", type="pil", interactive=False, visible=False)
with gr.Accordion("🌐 Network", open=False, visible=True) as network_accordion:
network_status_md = gr.Markdown("Initializing network features...")
wifi_status=gr.Markdown("Wi-Fi: checking...")
connect_now=gr.Button("Try auto-connect now (non-blocking)")
online_now=gr.Button("Fetch updates now", interactive=False)
online_status=gr.Markdown()
with gr.Accordion("⚙️ Admin Console", open=False, visible=False) as admin_accordion:
admin_info=gr.Markdown("Login as an admin and switch to Admin mode to use these tools.")
mode_picker=gr.Radio(choices=["user","admin"], value="user", label="Mode (admins only)")
# Notifications panel
notif_count = gr.Markdown("**Notifications:** 0", elem_id="notif_count")
notif_btn = gr.Button("Load Notifications")
notif_md = gr.Markdown("")
# Feedback panel
feedback_view_btn = gr.Button("View Feedback", interactive=True)
feedback_md = gr.Markdown("")
# Applied changes panel
applied_btn = gr.Button("List Applied Changes", interactive=True)
applied_md = gr.Markdown("")
revert_id_box = gr.Textbox(label="Proposal ID to Revert", interactive=True)
revert_btn = gr.Button("Revert Proposal", interactive=True)
revert_md = gr.Markdown()
# Proposals filter UI
proposals_filter = gr.Textbox(label="Filter proposals (substring)", placeholder="Enter text to filter proposals")
proposals_filter_btn = gr.Button("Filter Proposals")
proposals_md = gr.Markdown("")
# Admin help
admin_help_btn = gr.Button("Admin Help")
admin_help_md = gr.Markdown()
with gr.Tabs() as admin_tabs:
with gr.TabItem("User Management"):
target=gr.Textbox(label="Target name or id")
with gr.Row():
remove_btn = gr.Button("Remove User")
rename_btn=gr.Button("Rename")
new_name=gr.Textbox(label="New name")
new_name=gr.Textbox(label="New name")
rename_btn=gr.Button("Rename")
new_pass=gr.Textbox(label="New password")
pass_btn=gr.Button("Change password")
new_role=gr.Dropdown(choices=["owner","admin_super","admin_general","user"], value="user", label="New role")
role_btn=gr.Button("Change role", elem_id="role_btn")
out=gr.Markdown()
with gr.TabItem("Add User"):
add_name=gr.Textbox(label="Add: name")
add_role=gr.Dropdown(choices=["admin_super","admin_general","user"], value="user", label="Add role")
add_pass=gr.Textbox(label="Add password (admins only)")
add_btn=gr.Button("Add user/admin")
out_add=gr.Markdown()
with gr.TabItem("System"):
ingest_status = gr.Markdown()
ingest_now_btn = gr.Button("Start Background Ingestion", interactive=False)
mem_compress_btn=gr.Button("Compress Memory (archive)", interactive=False)
compress_status=gr.Markdown()
hotpatch_patch=gr.Code(label="Paste hotpatch JSON (advanced)")
hotpatch_status=gr.Markdown()
hotpatch_apply=gr.Button("Apply Hotpatch", elem_id="hotpatch_apply", interactive=False)
with gr.TabItem("Optimization"):
gr.Markdown("### Internal Optimization (Change Manager)")
prop_kind=gr.Dropdown(choices=["model","package","code"], value="model", label="Proposal type")
prop_name=gr.Textbox(label="Model ID / Package Name")
prop_ver=gr.Textbox(label="Package version (optional)")
prop_reason=gr.Textbox(label="Why this change?")
prop_patch=gr.Code(label="Code patch (for 'code' proposals): paste full replacement or diff")
propose_btn=gr.Button("Propose", interactive=False)
test_btn=gr.Button("Test in sandbox", interactive=False)
apply_btn=gr.Button("Apply (policy-checked)", elem_id="apply_btn", interactive=False)
opt_out=gr.JSON()
# --- Wake Word Listener (Optimized) ---
class WakeWordListener(threading.Thread):
def __init__(self, porcupine_instance, on_wake_word_callback):
super().__init__(daemon=True)
self._porcupine = porcupine_instance
self._on_wake_word = on_wake_word_callback
self.stop_event = threading.Event()
self.wake_word_detected = threading.Event()
def run(self):
if not _HAVE_SD or not self._porcupine:
print("[WakeWordListener] sounddevice or porcupine not available. Listener will not start.")
return
print("[WakeWordListener] Starting dedicated listener thread.")
try:
with sd.InputStream(
samplerate=self._porcupine.sample_rate,
channels=1,
dtype='int16',
blocksize=self._porcupine.frame_length,
callback=self._audio_callback
):
self.stop_event.wait()
except Exception as e:
print(f"[WakeWordListener] Error starting audio stream: {e}")
print("[WakeWordListener] Listener thread stopped.")
def _audio_callback(self, indata, frames, time, status):
if status:
print(f"[WakeWordListener] Audio callback status: {status}")
# Porcupine expects a 1D array
audio_frame = indata.flatten()
keyword_index = self._porcupine.process(audio_frame)
if keyword_index >= 0:
print("[WakeWordListener] Wake word detected by dedicated listener!")
self.wake_word_detected.set()
# Trigger the main UI thread callback
if self._on_wake_word:
self._on_wake_word()
def stop(self):
self.stop_event.set()
# Global instance of the listener
wake_word_listener_instance = None
# --- Wake Word Detection Logic ---
porcupine_instance = None
if _HAVE_PVP and CFG.get("PVPORCUPINE_ACCESS_KEY"): # type: ignore
keyword_paths: List[str] = []
keywords = [k.strip() for k in CFG["HIVE_WAKE_WORDS"].split(',') if k.strip()] # type: ignore
for keyword in keywords:
custom_path = os.path.join(CFG["HIVE_HOME"], "keywords", f"{keyword}_{_os_name()}.ppn")
if os.path.exists(custom_path):
keyword_paths.append(custom_path)
elif keyword in pvporcupine.BUILTIN_KEYWORDS: # type: ignore
keyword_paths.append(keyword)
if not keyword_paths: keyword_paths = ['bumblebee']
try:
porcupine_instance = pvporcupine.create( # type: ignore
access_key=CFG["PVPORCUPINE_ACCESS_KEY"], # type: ignore
keyword_paths=keyword_paths
)
print(f"[WakeWord] Listening for: {keywords}")
except Exception as e:
print(f"[WakeWord] Error initializing Porcupine: {e}. Wake word will be disabled.")
porcupine_instance = None
def handle_wake_word_detection():
# This function will be called from the listener thread.
# It needs to update the Gradio state. We can use gr.State for this.
# Record the event to disk so UI/admins can inspect wake events.
try:
log_dir = os.path.join(CFG.get('HIVE_HOME', '.'), 'system')
os.makedirs(log_dir, exist_ok=True)
entry = {
'ts': time.time(),
'ts_readable': time.ctime(),
'event': 'wake_word_detected',
'words': CFG.get('HIVE_WAKE_WORDS')
}
with open(os.path.join(log_dir, 'wake_events.jsonl'), 'a', encoding='utf-8') as wf:
wf.write(json.dumps(entry) + '\n')
except Exception as e:
print(f"[WakeWord] Failed to log wake event: {e}")
# Also print to console for immediate visibility
print('[WakeWord] Detected wake word (event logged).')
# --- End of Wake Word Listener ---
def start_wake_word_listener_global(porcupine_instance):
global wake_word_listener_instance
if porcupine_instance and not (wake_word_listener_instance and wake_word_listener_instance.is_alive()):
print("[UI] Starting global wake word listener...")
wake_word_listener_instance = WakeWordListener(porcupine_instance, handle_wake_word_detection)
wake_word_listener_instance.start()
return "Wake word listener started."
def stop_wake_word_listener_global():
global wake_word_listener_instance
if wake_word_listener_instance and wake_word_listener_instance.is_alive():
wake_word_listener_instance.stop()
return "Wake word listener stopped."
def do_who(audio_path):
if not audio_path: return "Please record your voice first.", None, None
hive = get_hive_instance(_bootstrap)
if not hive or hive.lite_mode: return "Voice features not ready.", None, None
uid = identify_voice(audio_path)
if not uid: return "Voice not recognized.", None, None
ok, msg = attempt_login(uid, "")
if ok: return f"Voice login successful for {uid}. {msg}", uid, "user"
return f"Voice recognized, but login failed: {msg}", None, None # type: ignore
# Hidden button to trigger video stream
video_stream_trigger = gr.Button("Start Video", visible=False)
def edit_last_input(chatbot_history, selected_text=None):
# If a message was selected, put it into the input box for editing
if selected_text:
return gr.MultimodalTextbox(value={"text": selected_text, "files": []})
if not chatbot_history:
return gr.MultimodalTextbox(value={"text":"","files":[]})
for m in reversed(chatbot_history):
if m.get('role') == 'user' and m.get('content'):
return gr.MultimodalTextbox(value={"text": m.get('content'), "files": []})
return gr.MultimodalTextbox(value={"text":"","files":[]})
def _handle_multimodal_message(files, uid, role, mode, chatbot_history, request):
"""Placeholder for handling file uploads (e.g., images)."""
chatbot_history.append({"role": "user", "content": f"(Attached {len(files)} file(s))"}) # type: ignore
chatbot_history.append({"role": "assistant", "content": "File processing is not yet implemented in this version."}) # type: ignore
yield chatbot_history, gr.MultimodalTextbox(interactive=True)
def _handle_file_message(files, history):
"""Handles file uploads from the user."""
history.append({"role": "user", "content": f"(Attached {len(files)} file(s))"})
history.append({"role": "assistant", "content": "File processing is not yet implemented in this version."})
return history
def _stream_and_update_chat(response_stream, history):
"""Streams the response to the UI and returns the full reply."""
history.append({"role": "assistant", "content": ""})
full_reply = ""
token_count = 0
try:
for token in response_stream:
if not token:
continue
token_count += 1
full_reply = _smart_join(full_reply, str(token))
history[-1]["content"] = full_reply
yield history, gr.MultimodalTextbox(interactive=False)
print(f"[talk] Streamed {token_count} tokens")
except Exception as e:
import traceback
print(f"[ERROR] Error while streaming tokens: {e}\n{traceback.format_exc()}")
history[-1]["content"] = f"[Error during streaming: {str(e)[:100]}]"
yield history, gr.MultimodalTextbox(interactive=True)
return full_reply
def talk(m, uid, role, mode, chatbot_history, request):
"""
Main dispatcher for user input. It handles text and file inputs,
manages the streaming response, and updates the UI components correctly.
"""
try:
if request is None:
print("[ERROR] Gradio request object is None. Cannot process message.")
return
user_text = m.get("text", "").strip() if isinstance(m, dict) else (m or "").strip()
user_files = m.get("files", []) if isinstance(m, dict) else []
current_history = list(chatbot_history or [])
if user_files:
current_history = _handle_file_message(user_files, current_history)
yield current_history, gr.MultimodalTextbox(interactive=True)
return
if not user_text:
return
hive_instance = get_hive_instance(_bootstrap)
if not hive_instance:
print(f"[ERROR] No Hive instance available. lite_core_ready={_bootstrap.lite_core_ready.is_set() if _bootstrap else 'N/A'}")
current_history.append({"role": "assistant", "content": "⏳ System is still initializing. Please wait..."})
yield current_history, gr.MultimodalTextbox(interactive=True)
return
effective_role = role or "user"
current_user_id = uid or (request.session_hash if request else None)
print(f"[talk] Received TYPED message: '{user_text[:50]}...' from {current_user_id} (Role: {effective_role})")
current_history.append({"role": "user", "content": user_text})
# Get the streaming response from the Hive core by calling .chat() directly
try:
chat_result = hive_instance.chat(user_text, effective_role, current_user_id, history=current_history)
response_stream, post_process_func = chat_result if isinstance(chat_result, tuple) else (chat_result, None)
except Exception as e:
logging.error(f"hive.chat call failed: {e}", exc_info=True)
def error_gen():
yield f"❌ Chat error: {str(e)[:200]}"
response_stream, post_process_func = error_gen(), None
# Stream the response to the UI
stream_generator = _stream_and_update_chat(response_stream, current_history)
full_reply = ""
for updated_history, msg_update in stream_generator:
yield current_history, msg_update
# After the loop, the generator's return value is in StopIteration.
# We need to get it to have the final full_reply.
# A simpler way is to just grab the final content from the history.
full_reply = current_history[-1].get("content", "")
if not full_reply and current_history and current_history[-1]['role'] == 'assistant':
full_reply = current_history[-1]['content']
# Post-process the final reply
if post_process_func:
try:
processed_reply = post_process_func(full_reply)
if isinstance(processed_reply, str) and processed_reply.strip():
current_history[-1]["content"] = processed_reply
except Exception as e:
print(f"[WARN] post_process function failed: {e}")
if not current_history[-1]["content"]:
current_history[-1]["content"] = "No response generated."
print(f"[talk] Final reply length: {len(current_history[-1]['content'])}")
# Final yield to update the chat and re-enable the input
yield current_history, gr.MultimodalTextbox(interactive=True)
except Exception as e:
import traceback
print(f"[FATAL] Unexpected error in talk(): {e}")
traceback.print_exc()
current_history = list(chatbot_history or [])
current_history.append({"role": "assistant", "content": f"Fatal error: {str(e)[:100]}"})
yield current_history, gr.MultimodalTextbox(interactive=True)
# --- Event Handling with Stop Button ---
def do_memory_summary(uid: Optional[str], request):
hive_instance = get_hive_instance(_bootstrap) # type: ignore
if hive_instance.lite_mode: return "Memory features are disabled in Lite Mode." # type: ignore
current_user_id = uid or request.session_hash # type: ignore
log_path = os.path.join(CFG["HIVE_HOME"], "users", "conversations", f"{current_user_id}.jsonl")
if not os.path.exists(log_path): return "No conversation history found."
try: # type: ignore
with open(log_path, "r", encoding="utf-8") as f:
lines = f.readlines()[-10:]
if not lines: return "Not enough conversation history to summarize." # type: ignore
text_to_summarize = "\n".join([json.loads(line).get("message", "") + "\n" + json.loads(line).get("reply", "") for line in lines])
summary = hive_instance.summarize_for_memory(text_to_summarize) # type: ignore
return summary if summary.strip() else "Could not generate a summary from recent conversations."
except Exception as e: return f"Error generating summary: {e}"
summary_btn.click(do_memory_summary, [uid_state], [summary_output])
def do_get_vocab_word(uid: Optional[str], request):
hive_instance = get_hive_instance(_bootstrap) # type: ignore
if hive_instance.lite_mode: return "Vocabulary features are disabled in Lite Mode." # type: ignore
current_user_id = uid or request.session_hash
log_path = os.path.join(CFG["HIVE_HOME"], "users", "conversations", f"{current_user_id}.jsonl")
if not os.path.exists(log_path): return "No conversation history to find words from."
try:
with open(log_path, "r", encoding="utf-8") as f:
content = f.read()
words = [w for w in re.findall(r'\b\w{7,}\b', content.lower()) if w not in ["assistant", "message"]]
if not words: return "No challenging words found yet. Keep chatting!" # type: ignore
word = random.choice(words)
# The chat method returns a stream and a hook. We need to consume the stream.
# In lite mode, it returns only the stream. We must handle both cases. # type: ignore
chat_result = hive_instance.chat(f"What is the definition of the word '{word}'? Provide a simple, clear definition and one example sentence.", "user", current_user_id, history=[])
if isinstance(chat_result, tuple):
response_stream, _ = chat_result
else: # Lite mode returns just the generator
response_stream = chat_result
definition = "".join([chunk for chunk in response_stream if chunk])
if not definition or not definition.strip():
return f"Could not get a definition for '{word}' at this time."
return f"**{word.capitalize()}**: {definition.strip()}"
except Exception as e: return f"Error getting vocabulary word: {e}" # type: ignore
def wait_for_memory_features():
"""Waits for the full Hive core and enables memory-related UI features.""" # type: ignore
# Wait until at least the lite core is ready (so UI can be interactive)
bootstrap_instance.lite_core_ready.wait()
hive_instance = get_hive_instance(bootstrap_instance)
# Compute badge text helper
def _badge_for_mode(amode):
if amode in ('full-local', 'full-remote'):
return f"**Mode:** 🟢 {amode}"
if amode == 'full-fallback':
return f"**Mode:** 🟠 full (DEGRADED)"
if amode == 'lite':
return f"**Mode:** 🔵 lite"
return f"**Mode:** ⚪ unknown"
# If there's no hive instance at all, show failure
if hive_instance is None:
badge = _badge_for_mode(None)
# Return tuple for: [agent_mode_badge, core_status, init_progress, chatbot, summary_output, msg, summary_btn, vocab_output, vocab_btn, progress_output, online_now, ingest_now_btn, mem_compress_btn, hotpatch_apply, propose_btn, test_btn, apply_btn, network_status_md]
return (
badge,
"⚠️ **Initialization Failed.** No Hive instance available. Check logs.",
gr.update(visible=False),
gr.update(),
"Memory features unavailable.",
gr.update(placeholder="Initialization failed. Please check logs.", interactive=False),
gr.update(interactive=False),
"Memory features unavailable.",
gr.update(interactive=False),
"Install dependencies to enable features.",
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(value="⚠️ Dependencies missing."),
)
# If the hive instance exists, inspect the actual_mode to present accurate UI state
amode = getattr(hive_instance, 'actual_mode', None)
missing = []
if not bootstrap_instance.voice_ready.is_set():
missing.append('Voice')
# Compute badge to show at top
badge = _badge_for_mode(amode)
# Conservative UI: treat full-fallback as Lite for enabling features, but still show DEGRADED badge
if amode == 'lite' or (getattr(hive_instance, 'lite_mode', False) and amode in (None, 'unknown')) or amode == 'full-fallback':
features_msg = f" Some features (e.g., {', '.join(missing)}) are unavailable." if missing else ''
return (
badge,
f"⚡ **Lite Mode Active.** Basic chat is available while advanced features load. {features_msg}",
"🔄 **Progress:** Full Mode is loading...",
gr.update(),
"Memory features will appear when full core is ready.",
gr.update(placeholder=f"Talk to {CFG['AGENT_NAME']} (Lite Mode)", interactive=True),
gr.update(interactive=False),
"Vocabulary features loading...",
gr.update(interactive=False),
"Advanced features loading...",
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
"🌐 Network features loading...",
)
# Otherwise if actual mode indicates full (local or remote), show full ready
if amode in ('full-local', 'full-remote'):
return (
gr.update(value=badge),
gr.update(value="✅ **Full Hive Core is Ready.** Advanced features are now online."),
gr.update(visible=False), # init_progress
gr.update(),
"Click 'Show Memory Summary' to see a summary of recent conversations.",
gr.update(placeholder=f"Talk to {CFG['AGENT_NAME']}", interactive=True),
gr.update(interactive=True),
"Click to get a new vocabulary word from your conversations.",
gr.update(interactive=True),
"Your progress will be shown here. Click the button to update.",
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(interactive=True),
"🌐 Network connectivity checked at startup.",
)
# Fallback conservative UI if we cannot determine actual_mode
features_msg = f" Some features (e.g., {', '.join(missing)}) may be unavailable." if missing else ''
return (
badge,
f"⚡ **Lite Mode Active.** Basic chat is available while advanced features load. {features_msg}",
"🔄 **Progress:** Full Mode is loading...",
gr.update(),
"Memory features will appear when full core is ready.",
gr.update(placeholder=f"Talk to {CFG['AGENT_NAME']} (Lite Mode)", interactive=True),
gr.update(interactive=False),
"Vocabulary features loading...",
gr.update(interactive=False),
"Advanced features loading...",
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
"🌐 Network features loading...",
)
demo.load(wait_for_memory_features, None, [agent_mode_badge, core_status, init_progress, chatbot, summary_output, msg, summary_btn, vocab_output, vocab_btn, progress_output, online_now, ingest_now_btn, mem_compress_btn, hotpatch_apply, propose_btn, test_btn, apply_btn, network_status_md])
# Wire admin feedback view, proposals filter, and help
try:
def do_login_and_update(name, password, second, desired_mode):
"""Attempt login and update admin UI visibility and per-button visibility.
This function strictly validates inputs and always returns a 15-item tuple
matching the components bound to the `login_btn`.
"""
# Helpers for consistent fallback values
def _disabled():
return gr.update(interactive=False)
def _hidden():
return gr.update(visible=False)
EXPECTED_LEN = 15
try:
name = (name or "").strip()
desired_mode = (desired_mode or "user").strip()
if desired_mode not in ("user", "admin"):
desired_mode = "user"
ok, msg = attempt_login(name, password or "", second or "")
if not ok or not name:
# Hide admin UI and disable all admin buttons
out = [msg, None, "guest"] + [_hidden()] + [_disabled()] * (EXPECTED_LEN - 4)
return tuple(out)
# Find the user and role
d = _load_users()
u, urole = _find_user(d, name)
if not u or not urole:
out_list = [f"Login succeeded but user record not found: {name}", None, "guest", _hidden()]
out = tuple(out_list + [_disabled()] * (EXPECTED_LEN - len(out_list)))
return tuple(out)
# Sanitize role
if urole not in ("owner", "admin_super", "admin_general", "user"):
urole = "user"
# Compute role tiers and desired mode
is_admin_role = urole in ("admin_general", "admin_super", "owner")
is_super = urole in ("admin_super", "owner")
want_admin_mode = (desired_mode == "admin")
show_admin_accordion = bool(is_admin_role and want_admin_mode)
# Per-button visibility / interactivity decisions
propose_interactive = gr.update(interactive=True) if (is_admin_role and want_admin_mode) else _disabled()
test_interactive = gr.update(interactive=True) if (is_admin_role and want_admin_mode) else _disabled()
super_interactive = gr.update(interactive=True) if (is_super and want_admin_mode) else _disabled()
remove_interactive = gr.update(interactive=True) if (is_admin_role and want_admin_mode) else _disabled()
add_interactive = gr.update(interactive=True) if (is_admin_role and want_admin_mode) else _disabled()
pass_interactive = gr.update(interactive=True) if (is_admin_role and want_admin_mode) else _disabled()
rolebtn_interactive = gr.update(interactive=True) if (is_super and want_admin_mode) else _disabled()
out = [
msg, # login_status
u.get('id'), # uid_state
urole, # role_state
gr.update(visible=show_admin_accordion),
# hotpatch_apply, apply_btn, propose_btn, test_btn,
super_interactive, super_interactive, propose_interactive, test_interactive,
# ingest_now_btn, mem_compress_btn, revert_btn,
super_interactive, super_interactive, super_interactive,
# remove_btn, role_btn, add_btn, pass_btn
remove_interactive, rolebtn_interactive, add_interactive, pass_interactive,
]
# Ensure exact output length expected by the UI binding
if len(out) < EXPECTED_LEN:
out += [_disabled()] * (EXPECTED_LEN - len(out))
elif len(out) > EXPECTED_LEN:
out = out[:EXPECTED_LEN]
return tuple(out)
except Exception as e:
hide = _hidden()
disabled = _disabled()
out = [f"Login error: {e}", None, "guest", hide]
out += [disabled] * (EXPECTED_LEN - len(out))
return tuple(out)
feedback_view_btn.click(lambda role: view_feedback_as_markdown(role), [role_state], [feedback_md])
def _list_applied_changes():
# Read OPT_RESULTS and show applied proposals
try:
if not os.path.exists(OPT_RESULTS):
return "No applied changes found."
with open(OPT_RESULTS, 'r', encoding='utf-8') as f:
items = [json.loads(l) for l in f.readlines() if l.strip()]
applied = [it for it in items if it.get('applied')]
if not applied:
return "No applied changes found."
lines = []
for it in reversed(applied[-50:]):
pid = it.get('update_for')
reason = it.get('reason','')
ts = it.get('backup','')
lines.append(f"- **ID:** `{pid}` | {reason} | Backup: `{ts}`")
return '\n'.join(lines)
except Exception as e:
return f"Error loading applied changes: {e}"
def _revert_proposal_ui(pid):
try:
if not pid:
return "Please enter a proposal ID.", ""
res = _revert_proposal(pid)
if res.get('ok'):
msg = f"Reverted proposal `{pid}`. Restored files: {res.get('restored')}"
else:
msg = f"Failed to revert proposal `{pid}`: {res.get('error')}"
return msg, ""
except Exception as e:
return f"Error: {e}", ""
applied_btn.click(_list_applied_changes, [], [applied_md])
revert_btn.click(_revert_proposal_ui, [revert_id_box], [revert_md, applied_md])
def _load_notifs(uid, role):
try:
notifs = _get_notifications_for_user(uid, role)
if not notifs:
return "No notifications.", "**Notifications:** 0"
lines = []
unread = 0
for n in reversed(notifs[-50:]):
seen = uid in n.get('viewed_by', []) if uid else False
if not seen: unread += 1
lines.append(f"- **{n.get('ts_readable')}** | {n.get('subject')} | from: `{n.get('sender')}`{'' if seen else ' **(unread)**'}\n - {n.get('body')[:300]}")
md = "\n".join(lines)
return md, f"**Notifications:** {unread}"
except Exception as e:
print(f"[NotifyUI] Failed to load notifs: {e}")
return "Error loading notifications.", "**Notifications:** 0"
def _mark_viewed_and_load(uid, role):
try:
_mark_notifications_viewed(uid, role)
except Exception as e:
print(f"[NotifyUI] Failed to mark viewed: {e}")
return _load_notifs(uid, role)
notif_btn.click(_mark_viewed_and_load, [uid_state, role_state], [notif_md, notif_count])
def _filter_proposals(filter_text):
props = _load_proposals()
if not props:
return "No proposals found."
if not filter_text:
items = props[-50:]
else:
items = [p for p in props if filter_text.lower() in (p.get('name','') + p.get('patch_text','') + p.get('kind','')).lower()]
if not items:
return "No matching proposals."
lines = [f"- {p.get('id')} | {p.get('kind')} | {p.get('name')} | proposer:{p.get('proposer')} | needs_owner:{p.get('needs_owner_approval',False)}" for p in items[-50:]]
return "Recent proposals:\n" + "\n".join(lines)
proposals_filter_btn.click(_filter_proposals, [proposals_filter], [proposals_md])
def _admin_help(role):
if role not in ("owner","admin","admin_general","admin_super"):
return "🔒 You do not have admin privileges."
return (
"**Admin Chat Commands**:\n"
"- `/view_feedback` : View recent user feedback.\n"
"- `/list_proposals` : List recent proposals recorded by the system.\n"
"- `/apply_proposal <id>` : Owner may apply a proposal immediately; lower admins will queue for owner approval.\n"
"- `/optimize_now` : Owner triggers a one-shot self optimization run; lower admins queue a request.\n"
"\n**UI Tools**: Use the 'Filter Proposals' box to search proposals. Owner notifications are recorded in system notifications and optionally emailed when SMTP is configured."
)
admin_help_btn.click(_admin_help, [role_state], [admin_help_md])
try:
# Bind the main login button to attempt login and update admin UI visibility and per-button states
login_btn.click(
do_login_and_update,
[login_name, login_pass, login_second, mode_state],
[login_status, uid_state, role_state, admin_accordion,
hotpatch_apply, apply_btn, propose_btn, test_btn,
ingest_now_btn, mem_compress_btn, revert_btn,
remove_btn, role_btn, add_btn, pass_btn]
)
except Exception:
# If binding fails (due to duplicate components or different Gradio version), fall back to simpler binding
try:
login_btn.click(do_login_and_update, [login_name, login_pass, login_second, mode_state], [login_status, uid_state, role_state, admin_accordion])
except Exception:
pass
except Exception:
pass
def wait_for_lite_core():
"""Waits for the lite Hive core and enables basic chat."""
bootstrap_instance.lite_core_ready.wait()
# If the full core is already ready, prefer the Full-mode placeholder and don't overwrite it
if getattr(bootstrap_instance, 'hive_ready', None) and bootstrap_instance.hive_ready.is_set():
return gr.Textbox(placeholder=f"Talk to {CFG['AGENT_NAME']}", interactive=True)
# Otherwise, show the Lite Mode placeholder only if lite core is the active instance
if bootstrap_instance.lite_core_success and bootstrap_instance.hive_lite_instance is not None:
return gr.Textbox(placeholder=f"Talk to {CFG['AGENT_NAME']} (Lite Mode)", interactive=True)
else:
return gr.Textbox(placeholder="Dependencies not installed - deploy to HF Spaces", interactive=False)
demo.load(wait_for_lite_core, None, [msg])
vocab_btn.click(do_get_vocab_word, [uid_state], [vocab_output])
def wait_for_voice_features(request):
"""Waits for ASR/TTS models and enables voice-related UI elements."""
bootstrap_instance.voice_ready.wait() # type: ignore
is_pi = 'raspberrypi' in platform.machine().lower()
has_display = bootstrap_instance.caps.get("has_display", True) if bootstrap_instance.caps else True
bootstrap_instance.hive_ready.wait() # Also wait for full core for voice features # type: ignore
hive_instance = get_hive_instance(bootstrap_instance)
voice_ready = bool(hive_instance and not getattr(hive_instance, 'lite_mode', False) and getattr(hive_instance, 'asr_service', None) and getattr(hive_instance, 'tts_service', None))
video_ready = bool(hive_instance and not getattr(hive_instance, 'lite_mode', False) and getattr(hive_instance, 'video_service', None) and CFG.get("VIDEO_ENABLED", False)) # type: ignore
# Show appropriate message based on voice readiness state
if voice_ready:
voice_msg = "✅ **Voice Ready!** Click a record button below. **Your browser will ask for microphone permission.** You must click **Allow**."
interactive_buttons = True
else:
voice_msg = "⚠️ **Voice Features Unavailable** - Check build logs for missing dependencies (e.g., faster-whisper, piper-tts)."
interactive_buttons = False
# Determine button visibility based on display and platform
# - On headless / no-display: offer a single compact hands-free button (no separate no-wake option)
# - On devices with display: show both wake-word and no-wake hands-free options (unless running on Pi where wake-word is preferred)
vocal_btn = None
if voice_ready:
if not has_display:
vocal_btn = gr.Button(interactive=True, value=("Hands-Free is ON" if is_pi else "Start Hands-Free"))
else:
vocal_btn = gr.Button(interactive=True, value=("Stop Hands-Free Conversation" if is_pi else "Start Hands-Free Conversation"))
else:
vocal_btn = gr.Button(interactive=False, visible=False)
unified_visible = True if (is_pi or has_display) else False
return (
gr.Slider(visible=is_pi), # conversation_timeout_slider
gr.Markdown(voice_msg, visible=True),
vocal_btn, # vocal_chat_btn
(gr.Audio(interactive=True, label="Record 5-10s for voiceprint") if voice_ready else gr.Audio(interactive=False)), # enroll_audio
interactive_buttons, # enroll_btn
interactive_buttons, # who_btn
gr.Markdown(("✅ **Camera Ready!** Click the video feed below. **Your browser will ask for camera permission.** You must click **Allow**.") if video_ready else ("⚠️ **Camera Unavailable** - This feature requires a secure (HTTPS) connection and browser permissions."), visible=True),
gr.Image(interactive=video_ready, visible=video_ready), # video_out
gr.Audio(streaming=True, visible=unified_visible) # unified_mic, auto-start on Pi or when display present
) # type: ignore
demo.load(wait_for_voice_features, None, [conversation_timeout_slider, voice_status_md, vocal_chat_btn, enroll_audio, enroll_btn, who_btn, camera_status_md, video_out, unified_mic], show_progress="hidden")
def stream_video():
"""Streams video frames from the VideoService to the UI."""
hive_instance = get_hive_instance(bootstrap_instance) # type: ignore
if not (
hive_instance and not hive_instance.lite_mode and
hasattr(hive_instance, 'video_service') and hive_instance.video_service and
CFG["VIDEO_ENABLED"]
):
yield None
return
video_service = hive_instance.video_service
while not video_service.stop_event.is_set():
frame = video_service.get_frame()
if frame is not None:
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
time.sleep(0.05) # ~20 fps
# Use a looping click event on a hidden button to drive the video stream
video_stream_trigger.click(stream_video, None, video_out)
demo.load(lambda: None, None, None, js="(async () => { document.querySelector('button[style=\"display: none;\"]')?.click(); })")
# Attach client-side listeners to allow selecting messages and exposing a selected message
# Safe client-side initializer: attach click handlers to message elements
demo.load(lambda: None, None, None, js='''(function(){
try{
const root = document.getElementById('hive_chatbot');
const setSelected = (text)=>{
const el = document.getElementById('selected_message');
if(el) el.value = text;
};
const attach = (node)=>{
if(!node || node.__hive_attached) return;
node.__hive_attached = true;
node.style.cursor = 'pointer';
node.addEventListener('click', ()=>{
const txt = node.innerText || node.textContent || '';
setSelected(txt.trim());
});
};
const scan = ()=>{
if(!root) return;
root.querySelectorAll('div').forEach(n=>{ if(n && n.innerText && n.innerText.length>0) attach(n); });
};
const mo = new MutationObserver(()=>{ scan(); });
mo.observe(document.body, {childList:true, subtree:true});
setTimeout(scan, 500);
}catch(e){console.warn('Hive selector init failed', e);}
})()''')
def do_online_update():
hive_instance = get_hive_instance(bootstrap_instance) # type: ignore
if hive_instance.lite_mode: return "Online features are disabled in Lite Mode." # type: ignore
return "Added %s new summaries to curves." % (hive_instance.online_update().get("added",0))
connect_now.click(lambda: (NET.kick_async() or "Auto-connect started in background."), [], [wifi_status]) # type: ignore
online_now.click(do_online_update, [], [online_status])
def on_login_or_mode_change(role, pick): # type: ignore
is_adm = is_admin(pick, role)
return gr.Tab(visible=is_adm)
def toggle_vocal_chat(state, porcupine_instance_ref):
global wake_word_listener_instance
# Determine platform
is_pi = 'raspberrypi' in platform.machine().lower()
# If porcupine_instance_ref is falsy, user requested hands-free without wake word
if not porcupine_instance_ref:
state["active"] = not state["active"]
btn_text = "Stop Hands-Free Conversation" if state["active"] else "Start Hands-Free Conversation"
if state["active"]:
if state.get("always_listening"):
state["status_text"] = "Status: Always listening"
else:
state["status_text"] = "Status: Active, listening..."
else:
state["status_text"] = "Status: Inactive"
mic_visibility = state["active"] if not is_pi else True
return state, gr.update(value=btn_text), gr.Audio(visible=mic_visibility)
state["active"] = not state["active"]
status_text = "Status: Active, listening..." if state["active"] else "Status: Inactive"
btn_text = "Stop Hands-Free Conversation" if state["active"] else "Start Hands-Free Conversation"
state["status_text"] = status_text
# Toggle visibility of the streaming mic
# On Pi, the mic is always on for wake word, so we don't change visibility
is_pi = 'raspberrypi' in platform.machine().lower()
mic_visibility = state["active"] if not is_pi else True
# For porcupine (wake-word) mode, toggle and start/stop the listener
if state["active"]:
start_wake_word_listener_global(porcupine_instance_ref)
else:
# Only stop the listener if it's not a Pi (where it's always on)
if not is_pi:
stop_wake_word_listener_global()
return state, gr.Button(value=btn_text), gr.Audio(visible=mic_visibility)
vocal_chat_btn.click(toggle_vocal_chat, [vocal_chat_state, gr.State(None)], [vocal_chat_state, vocal_chat_btn, unified_mic])
# Wire the 'Always listen' checkbox to update the vocal_chat_state
def _set_always_listen(val, vc_state):
try:
vc_state.setdefault("active", False)
vc_state["always_listening"] = bool(val)
if vc_state["always_listening"]:
vc_state["last_interaction_time"] = time.time()
vc_state["status_text"] = "Status: Always listening"
elif not vc_state["active"]:
vc_state["status_text"] = "Status: Inactive"
except Exception:
vc_state = {"active": False, "last_interaction_time": time.time(), "status_text": "Status: Inactive", "always_listening": bool(val)}
return vc_state
try:
profile_always_listen.change(_set_always_listen, [profile_always_listen, vocal_chat_state], [vocal_chat_state])
except Exception:
pass
def stop_voice_now(vc_state):
"""Handler to forcibly stop hands-free conversation and wake-word listener."""
global wake_word_listener_instance
try:
vc_state["active"] = False
except Exception:
vc_state = {"active": False, "last_interaction_time": time.time(), "status_text": "Status: Inactive"}
# Stop wake word listener if running
try:
if wake_word_listener_instance and wake_word_listener_instance.is_alive():
wake_word_listener_instance.stop()
except Exception as e:
print(f"[StopVoice] Error stopping wake-word listener: {e}") # type: ignore
# Return updated state, reset the vocal button label, and hide mic stream
return vc_state, gr.update(value="Start Hands-Free Conversation"), gr.update(visible=False), gr.HTML(js="window.hiveWaveform.stop()")
stop_voice_btn.click(stop_voice_now, [vocal_chat_state], [vocal_chat_state, vocal_chat_btn, unified_mic, vad_indicator])
# Auto-start wake word listener on Pi
is_pi = 'raspberrypi' in platform.machine().lower()
if is_pi and porcupine_instance:
print("[WakeWord] Raspberry Pi detected. Wake word listener is always on.")
start_wake_word_listener_global(porcupine_instance)
def _handle_wake_word(vc_state, uid, hive_instance):
"""Checks for a wake word and activates the conversation."""
if not vc_state["active"] and wake_word_listener_instance and wake_word_listener_instance.wake_word_detected.is_set():
wake_word_listener_instance.wake_word_detected.clear() # Reset the event
vc_state["active"] = True
vc_state["last_interaction_time"] = time.time()
vc_state["status_text"] = "Status: Wake word detected! Listening..."
reply_audio_path = None
if hive_instance and hasattr(hive_instance, 'tts_service'):
reply_audio_path = hive_instance.tts_service.synthesize("Yes?", uid)
return True, vc_state, gr.update(value=reply_audio_path, autoplay=True) if reply_audio_path else None
return False, vc_state, None
def _handle_conversation_timeout(vc_state, timeout_seconds):
"""Checks for and handles conversation timeout."""
is_pi = 'raspberrypi' in platform.machine().lower()
if is_pi or vc_state.get("always_listening"):
return False, vc_state # No timeout on Pi or when always listening is enabled
effective_timeout = timeout_seconds if timeout_seconds <= 60 else 999999.0
if vc_state["active"] and (time.time() - vc_state["last_interaction_time"] > effective_timeout):
vc_state["active"] = False
vc_state["status_text"] = "Status: Inactive (timeout)"
return True, vc_state
return False, vc_state
def _handle_active_conversation(stream, vc_state, uid, role, mode, chatbot_history, hive_instance):
"""Generator to process audio for an active conversation."""
try:
sampling_rate, audio_chunk = stream
except Exception as e:
print(f"[Voice] Error unpacking stream: {e}")
yield vc_state, chatbot_history, None
return
try:
for speech_segment in hive_instance.vad_service.process_stream(audio_chunk):
vc_state["last_interaction_time"] = time.time()
vc_state["status_text"] = "Status: Transcribing..."
yield vc_state, chatbot_history, None
# Transcribe audio in a background thread to keep UI responsive
asr_result = {}
def transcribe_task():
nonlocal asr_result
try:
asr_result = hive_instance.asr_service.transcribe(speech_segment, uid)
except Exception as te:
print(f"[Voice] ASR transcription failed: {te}")
asr_result = {"text": "[Transcription failed]"}
transcribe_thread = threading.Thread(target=transcribe_task)
transcribe_thread.start()
transcribe_thread.join(timeout=30) # Wait up to 30s for transcription
user_text = asr_result.get("text", "").strip()
if not user_text or len(user_text) < 2:
continue
# 1. Add user's message to chat UI with transcription
print(f"[Voice] Transcribed: '{user_text}'")
chatbot_history = list(chatbot_history or []) + [{"role": "user", "content": user_text}]
yield vc_state, chatbot_history, None
vc_state["status_text"] = "Status: Thinking..."
# 2. Get assistant's streaming response
eff_role = role if mode == "admin" else "user"
try:
chat_result = hive_instance.chat(user_text, eff_role, uid, history=chatbot_history)
response_stream, post_process_func = chat_result if isinstance(chat_result, tuple) else (chat_result, None)
except Exception as ce:
print(f"[Voice] Chat call failed: {ce}")
response_stream, post_process_func = None, None
# 3. Stream text response to UI
if response_stream:
chatbot_history.append({"role": "assistant", "content": ""})
full_reply = ""
try:
for token in response_stream:
if token:
piece = str(token)
full_reply = _smart_join(full_reply, piece)
chatbot_history[-1]["content"] = full_reply
yield vc_state, chatbot_history, None
except Exception as se:
print(f"[Voice] Stream iteration failed: {se}")
chatbot_history[-1]["content"] = f"[Error during streaming: {str(se)[:50]}]"
yield vc_state, chatbot_history, None
if post_process_func:
try:
processed = post_process_func(full_reply)
if isinstance(processed, str) and processed.strip():
full_reply = processed
except Exception as pe:
print(f"[Voice] Post-process failed: {pe}")
# 4. Synthesize and yield audio for playback
if full_reply.strip():
reply_audio_path = None
try:
if hasattr(hive_instance, 'tts_service') and hive_instance.tts_service:
reply_audio_path = hive_instance.tts_service.synthesize(full_reply, uid)
except Exception as te:
print(f"[Voice] TTS synth failed: {te}")
reply_audio_path = None
if reply_audio_path:
try:
yield vc_state, chatbot_history, gr.update(value=reply_audio_path, autoplay=True)
except Exception as ue:
print(f"[Voice] Audio update failed: {ue}")
yield vc_state, chatbot_history, None
else:
# Chat failed, add error message
chatbot_history.append({"role": "assistant", "content": "[Chat system unavailable]"})
yield vc_state, chatbot_history, None
vc_state["last_interaction_time"] = time.time()
# Keep conversation active on Pi, otherwise deactivate after completing a turn
is_pi = 'raspberrypi' in platform.machine().lower()
vc_state["active"] = is_pi
vc_state["status_text"] = "Status: Active, listening..." if is_pi else "Status: Inactive (turn complete)"
except Exception as e:
print(f"[Voice] Error in active conversation handler: {e}")
import traceback
traceback.print_exc()
yield vc_state, chatbot_history, None
return
def process_unified_stream(stream, vc_state, timeout_seconds, uid, role, mode, chatbot_history, request):
"""
Main dispatcher for voice stream processing. It handles wake word detection,
conversation state, and active conversation logic by calling helper functions.
"""
hive_instance = get_hive_instance(bootstrap_instance)
# If not active and no audio is coming in, do nothing. This prevents interference with typed chat.
if not vc_state.get("active") and stream is None:
return
# Check that voice services are available
if not hive_instance or hive_instance.lite_mode:
yield vc_state, chatbot_history, None
return
has_voice_services = (hasattr(hive_instance, 'vad_service') and hive_instance.vad_service and
hasattr(hive_instance, 'asr_service') and hive_instance.asr_service)
if not has_voice_services:
print("[Voice] ASR/VAD services not available. Voice chat disabled.")
yield vc_state, chatbot_history, None
return
# Handle wake word detection if conversation is not active
was_awakened, vc_state, audio_reply = _handle_wake_word(vc_state, uid, hive_instance)
if was_awakened:
yield vc_state, chatbot_history, audio_reply
return
# If there's no audio stream, check for timeout and then exit
if stream is None:
timed_out, vc_state = _handle_conversation_timeout(vc_state, timeout_seconds)
if timed_out:
yield vc_state, chatbot_history, None
return
# If conversation is active, process the audio stream
if vc_state.get("active"):
try:
sampling_rate, audio_chunk = stream
except Exception as e:
print(f"[Voice] Error unpacking stream: {e}")
return
try:
for speech_segment in hive_instance.vad_service.process_stream(audio_chunk):
vc_state["last_interaction_time"] = time.time()
vc_state["status_text"] = "Status: Transcribing..."
yield vc_state, chatbot_history, None, gr.update(), gr.update()
# Transcribe audio in a background thread to keep UI responsive
asr_result = {}
def transcribe_task():
nonlocal asr_result
try:
# Create a temporary file for the speech segment
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio_file:
sf.write(tmp_audio_file.name, speech_segment, sampling_rate)
asr_result = hive_instance.asr_service.transcribe(tmp_audio_file.name, uid)
os.unlink(tmp_audio_file.name) # Clean up the temp file
except Exception as te:
print(f"[Voice] ASR transcription failed: {te}")
asr_result = {"text": "[Transcription failed]"}
transcribe_thread = threading.Thread(target=transcribe_task)
transcribe_thread.start()
transcribe_thread.join(timeout=30)
user_text = asr_result.get("text", "").strip()
if not user_text or len(user_text) < 2:
continue
# Deactivate voice conversation after successful transcription
vc_state["active"] = False
vc_state["status_text"] = "Status: Inactive (transcribed)"
# Yield the transcribed text to the multimodal textbox and trigger its submit event
# This unifies voice and text input through the `talk` function.
yield vc_state, chatbot_history, None, gr.MultimodalTextbox(value={'text': user_text, 'files': []}), gr.Button(variant="primary") # The button is a dummy to trigger submission
return # Stop processing after one successful transcription
except Exception as e:
print(f"[Voice] Error in active conversation handler: {e}")
import traceback
traceback.print_exc()
yield vc_state, chatbot_history, None, gr.update(), gr.update()
return
unified_mic.stream(
process_unified_stream,
[unified_mic, vocal_chat_state, conversation_timeout_slider, uid_state, role_state, mode_state, chatbot],
[vocal_chat_state, chatbot, ptt_reply_audio, msg, msg], # Outputs: state, chatbot, audio, and TWO updates for the multimodal textbox
show_progress="hidden",
api_name=False
)
def is_admin(mode, role): return (mode == "admin") and (role in ("admin_general", "admin_super", "owner"))
def do_add(mode, role, caller, nm, rl, pw): # type: ignore
if not is_admin(mode, role): return "Switch to Admin mode to use this."
d=_load_users(); cu,_=_find_user(d, caller or "")
if not cu: return "Login first as admin."
if rl not in PERMS.get(cu.get("role", "guest"),{}).get("can_add",[]): return f"{cu.get('role', 'guest')} cannot add {rl}."
uid=f"{rl}:{int(time.time())}"
entry={"id":uid,"name":nm,"role":rl,"pass":pw if rl!='user' else "", "prefs":{"activation_names":[CFG["AGENT_NAME"]],"language":"en"}} # type: ignore
if rl=="owner" and cu.get("role") == "owner":
for group in ["admins_super", "admins_general", "users"]:
d[group] = [u for u in d.get(group, []) if u.get("id") != d.get("owner", {}).get("id")]
d["owner"] = entry
elif rl=="admin_super": d["admins_super"].append(entry)
elif rl=="admin_general": d["admins_general"].append(entry)
else: d["users"].append(entry)
_save_json(USERS_DB,d); return f"Added {rl}: {nm}"
add_btn.click(do_add, [mode_state, role_state, uid_state, add_name, add_role, add_pass], [out_add])
def do_rename(mode, role, caller, tgt, nm): # type: ignore
if not is_admin(mode, role): return "Switch to Admin mode to use this."
d=_load_users(); u,_=_find_user(d, tgt or "")
if not u: return "Target not found."
cu,_=_find_user(d, caller or "")
if not cu: return "Login first."
if u.get("role") in PERMS.get(cu.get("role", "guest"),{}).get("can_edit_profile_of",[]):
u["name"]=nm; _save_json(USERS_DB,d); return "Renamed."
return "Not allowed."
rename_btn.click(do_rename,[mode_state, role_state, uid_state, target, new_name],[out])
def do_pass(mode, role, caller, tgt, pw): # type: ignore
if not is_admin(mode, role): return "Switch to Admin mode to use this."
d=_load_users(); u,_=_find_user(d, tgt or "")
if not u: return "Target not found."
cu,_=_find_user(d, caller or "")
if not cu: return "Login first."
if u.get("role") in PERMS.get(cu.get("role", "guest"),{}).get("can_edit_profile_of",[]):
u["pass"]=pw; _save_json(USERS_DB,d); return "Password changed."
return "Not allowed."
pass_btn.click(do_pass,[mode_state, role_state, uid_state, target, new_pass],[out])
def do_role(mode, role, caller, tgt, rl): # type: ignore
if not is_admin(mode, role): return "Switch to Admin mode to use this."
d=_load_users(); u,_=_find_user(d, tgt or "")
if not u: return "Target not found."
cu,_=_find_user(d, caller or "");
if not cu: return "Login first."
allowed_new = {"owner":["owner","admin_super","admin_general","user"],
"admin_super":["admin_super","admin_general","user"],
"admin_general":["admin_general","user"]}.get(cu.get("role", "guest"), [])
if u.get("role") not in PERMS.get(cu.get("role"),{}).get("can_edit_role_of",[]) or rl not in allowed_new:
return f"Not allowed to set {rl}."
for grp in ["admins_super","admins_general","users"]:
if d and grp in d:
d[grp] = [user for user in d[grp] if user.get("id") != u.get("id")]
if rl=="owner": d["owner"]=u; u["role"]="owner"
elif rl=="admin_super": d["admins_super"].append(u); u["role"]="admin_super"
elif rl=="admin_general": d["admins_general"].append(u); u["role"]="admin_general"
else: d["users"].append(u); u["role"]="user"
_save_json(USERS_DB,d); return f"Role set to {rl}."
role_btn.click(do_role,[mode_state, role_state, uid_state, target, new_role],[out])
def do_remove(mode, role, caller, tgt):
if not is_admin(mode, role): return "Switch to Admin mode to use this."
d=_load_users(); u,urole=_find_user(d, tgt or "")
if not u: return "Target not found."
cu,_=_find_user(d, caller or "")
if not cu: return "Login first."
if urole in PERMS.get(cu.get("role", "guest"),{}).get("can_remove",[]):
d[f"{urole}s"] = [user for user in d.get(f"{urole}s", []) if user.get("id") != u.get("id")]
_save_json(USERS_DB,d); return f"Removed {u.get('name')}."
return "Not allowed."
remove_btn.click(do_remove, [mode_state, role_state, uid_state, target], [out])
def run_ingest_background(hive_instance): # type: ignore
"""
Triggers the background ingestion process.
"""
if not hive_instance or hive_instance.lite_mode: return "Ingestion is disabled in Lite Mode."
def ingest_task(): # type: ignore
staged_ingest_chain_if_enabled(str(hive_instance.config["CURVE_DIR"]))
threading.Thread(target=ingest_task, daemon=True).start()
return "Background ingestion process started. See logs for details."
ingest_now_btn.click(lambda: run_ingest_background(get_hive_instance(bootstrap_instance)), [], [ingest_status])
# This function has a potential issue if get_hive_instance() returns a lite instance.
# It is now guarded with a check.
def compress_memory(h): # type: ignore
if h.lite_mode or not hasattr(h, 'store'):
return "Memory compression is not available until the Full Hive Core is ready."
ok,msg= _archive_memory(str(h.store.dir))
return msg
mem_compress_btn.click(lambda: compress_memory(get_hive_instance(bootstrap_instance)), [], [compress_status])
def do_hotpatch(mode, role, patch_json): # type: ignore
"""
Applies a runtime hotpatch from the admin console.
"""
if not is_admin(mode, role):
return "Hotpatching is an admin-only feature."
try: patch=json.loads(patch_json)
except Exception as e: return f"Invalid JSON: {e}"
hive_instance = get_hive_instance(bootstrap_instance)
if hive_instance.lite_mode or not hasattr(hive_instance, 'overlay'):
return "Hotpatching is not available in Lite Mode."
ok, msg = hive_instance.overlay.patch(patch, actor_role=role)
return f"Hotpatch result: {msg}"
hotpatch_apply.click(do_hotpatch,[mode_state, role_state, hotpatch_patch],[hotpatch_status])
# This state will hold the session hash for guest users.
session_id_state = gr.State(None)
_last: Dict[str, any] = {"id": None, "obj": None}
# This function is safe because it's only called by the user on the full UI.
# It is now guarded with a check.
def do_apply(role, mode): # type: ignore
try:
hive_instance = get_hive_instance(bootstrap_instance)
if hive_instance.lite_mode or not hasattr(hive_instance, 'changes'): return {"status": "Error", "reason": "Change management is disabled in Lite Mode."}
if role not in ("admin_super","owner") or mode!="admin": return {"status": "Error", "reason": "Only admin_super or owner may apply."}
if not _last["obj"]: return {"status": "Error", "reason": "No proposal loaded."}
# Re-run test to ensure context is fresh before applying
res = hive_instance.changes.test_and_compare(str(_last["id"]), _last["obj"])
if not res.get("ok"): return {"status": "Error", "reason": f"Pre-apply test failed: {res.get('reason','unknown')}"}
if not res.get("passed"): return {"status": "Error", "reason": "Proposal did not pass quality thresholds."}
if _last["obj"].kind=="code" and role!="owner" and not CFG["OPT_AUTO_APPLY"]: return {"status": "Pending", "reason": "Awaiting Owner approval for code changes."}
ok,msg = hive_instance.changes.apply(res)
return {"status": "Applied" if ok else "Failed", "message": msg}
except Exception as e:
return {"status": "Error", "reason": f"Exception during apply: {e}"}
def do_propose(kind,name,ver,reason,patch): # type: ignore
hive_instance = get_hive_instance(bootstrap_instance)
if hive_instance.lite_mode or not hasattr(hive_instance, 'changes'): return {"status": "Error", "reason": "Proposals disabled in Lite Mode."}
cp=ChangeProposal(kind=kind,name=name or "",version=ver or "",reason=reason or "",patch_text=patch or "")
pid=hive_instance.changes.propose(cp); _last["id"]=pid; _last["obj"]=cp
return {"status": "Proposed", "kind": kind, "name": name or '(code patch)', "id": pid}
def do_test(): # type: ignore
if not _last["obj"]: return {"status": "Error", "reason": "No proposal in memory. Submit one first."}
hive_instance = get_hive_instance(bootstrap_instance)
if hive_instance.lite_mode or not hasattr(hive_instance, 'changes'): return {"status": "Error", "reason": "Testing disabled in Lite Mode."}
res=hive_instance.changes.test_and_compare(str(_last["id"]), _last["obj"]); return res
propose_btn.click(do_propose, [prop_kind,prop_name,prop_ver,prop_reason,prop_patch],[opt_out]) # type: ignore
test_btn.click(do_test, [], [opt_out])
apply_btn.click(do_apply, [role_state, mode_state], [opt_out])
# Smart port selection: prefer explicit PORT or GRADIO_SERVER_PORT, fallback to HF Spaces default 7860
server_port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or 7860)
# Enable Gradio queue so Spaces and streaming handlers work reliably.
try:
demo.queue(max_size=32)
except TypeError:
# Older Gradio versions don't support max_size
try:
demo.queue()
except Exception:
pass
demo.launch(
server_name="0.0.0.0",
server_port=server_port,
share=os.getenv("GRADIO_SHARE", "false").lower() == "true",
)
return demo
def setup_headless_http_server(hive_lite, port: int = 8000):
if not _HAVE_FASTAPI:
logging.error("FastAPI or Uvicorn not installed. Cannot start headless server.")
logging.error("Install with: pip install fastapi uvicorn")
return None
app = FastAPI(
title="Hive Headless API",
description="Hive AI Assistant headless HTTP API",
version="1.0.0"
)
startup_time = time.time()
@app.get("/health")
def health_check():
"""Health check endpoint."""
try:
mem_percent = 0.0
if psutil:
mem_percent = psutil.virtual_memory().percent
else:
# Fallback for environments without psutil, though less accurate
mem_pct_cmd = 'free | grep Mem | awk \'{print ($3/$2) * 100.0}\''
mem_pct_str = os.popen(mem_pct_cmd).read().strip()
if mem_pct_str: mem_percent = float(mem_pct_str)
return JSONResponse({
"status": "healthy",
"timestamp": time.time(),
"uptime_seconds": int(time.time() - startup_time),
"memory_percent": mem_percent,
"headless": True,
"api_version": "1.0.0"
})
except Exception as e:
logging.error(f"Health check error: {e}")
return JSONResponse({"status": "unhealthy", "error": str(e)}, status_code=500)
@app.post("/chat")
async def chat_endpoint(request: dict):
"""Chat endpoint: POST with {prompt, user_id, session_id, ...}"""
try:
prompt = request.get("prompt", "")
user_id = request.get("user_id", "anonymous")
session_id = request.get("session_id", "default")
if not prompt:
raise HTTPException(status_code=400, detail="Missing 'prompt' field")
logging.debug(f"Chat request from {user_id}: {prompt[:50]}...")
# Call Hive.chat with the correct signature. Hive.chat returns (stream, postproc)
stream, postproc = hive_lite.chat(prompt, "user", user_id, history=request.get("history"))
# Consume the stream to build the full output
collected = []
try:
for chunk in stream:
# stream items might be bytes or str
collected.append(chunk.decode() if isinstance(chunk, (bytes, bytearray)) else str(chunk))
except Exception:
# If the stream raises (e.g., StopIteration), continue with what we have
pass
full_output = "".join(collected)
# Apply post-processing if provided
try:
response_text = postproc(full_output) if postproc else full_output
except Exception:
response_text = _final_sanitize_reply(full_output)
return JSONResponse({
"status": "success",
"response": response_text,
"user_id": user_id,
"session_id": session_id,
"timestamp": time.time()
})
except HTTPException:
raise
except Exception as e:
logging.error(f"Chat error: {e}", exc_info=True)
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
@app.get("/status")
def status_endpoint():
"""Detailed status endpoint."""
try:
caps = hive_lite.caps if hive_lite else {}
versions = get_installed_versions()
model_info = {
"model_id": getattr(hive_lite, 'model_id', CFG.get('MODEL_OVERRIDE')),
"mode": getattr(hive_lite, 'actual_mode', 'unknown')
}
return JSONResponse({
"status": "ok",
"timestamp": time.time(),
"capabilities": {
"device_type": caps.get("device_type", "unknown"),
"is_pi": caps.get("is_pi", False),
"is_headless": caps.get("is_headless", True),
"total_ram_gb": caps.get("total_ram_gb", 0),
"free_ram_gb": caps.get("free_ram_gb", 0),
"has_display": caps.get("has_display", False),
"has_camera": caps.get("has_camera", False),
"has_microphone": caps.get("has_microphone", False),
"network_up": caps.get("network_up", False)
},
"adaptive_config": {
"embedding_batch_size": caps.get("embedding_batch_size", 8),
"cache_budget_gb": caps.get("cache_budget_gb", 1.0),
"retrieval_k": caps.get("retrieval_k", 6),
"model_precision": caps.get("model_precision", "float32")
},
"versions": versions,
"model": model_info
})
except Exception as e:
logging.error(f"Status check error: {e}")
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
# --- Basic Admin endpoints ---
@app.get("/admin/notifications")
def admin_notifications(uid: Optional[str] = None, role: str = "guest"):
"""Return notifications visible to the given uid/role."""
try:
notifs = _get_notifications_for_user(uid, role)
return JSONResponse({"status": "ok", "notifications": notifs})
except Exception as e:
logging.error(f"Admin notifications error: {e}")
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
@app.get("/admin/users")
def admin_users():
"""Return the user DB (admin-only in production)."""
try:
users = _load_users()
return JSONResponse({"status": "ok", "users": users})
except Exception as e:
logging.error(f"Admin users error: {e}")
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
@app.post("/admin/hotpatch")
def admin_hotpatch(payload: dict):
"""Apply a runtime hotpatch (body: {patch: {...}, role: 'owner'|'admin_general'|...})."""
try:
patch = payload.get("patch") if isinstance(payload, dict) else None
role = (payload.get("role") if isinstance(payload, dict) else None) or "owner"
if not patch:
return JSONResponse({"status": "error", "error": "Missing 'patch' in body"}, status_code=400)
# If the lite hive has an overlay, apply; otherwise queue failure
hive_instance = hive_lite
if hasattr(hive_instance, 'overlay') and hive_instance.overlay:
ok, msg = hive_instance.overlay.patch(patch, actor_role=role)
return JSONResponse({"status": "ok" if ok else "failed", "message": msg})
return JSONResponse({"status": "error", "error": "Overlay not available yet; try again later."}, status_code=503)
except Exception as e:
logging.error(f"Admin hotpatch error: {e}")
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
return app, uvicorn.Config(app, host="0.0.0.0", port=port, log_level="warning")
def run_headless_http_server(app_config_tuple):
"""Run HTTP server in blocking mode."""
if not app_config_tuple:
logging.warning("No HTTP server app available.")
return
app, config = app_config_tuple
try:
server = uvicorn.Server(config)
async def serve():
try:
await server.serve()
except Exception as e:
logging.error(f"HTTP server error: {e}", exc_info=True)
asyncio.run(serve())
except Exception as e:
logging.error(f"Failed to start HTTP server: {e}", exc_info=True)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="HIVE 🐝 - Hybrid Intelligent Voice Engine",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 app.py # Full mode (or lite if deps missing)
python3 app.py --lite # Force lightweight mode
python3 app.py --headless # Headless (HTTP API only)
python3 app.py --port 8000 # Specify port for headless
"""
)
parser.add_argument("--lite", action="store_true",
help="Force lightweight mode (no heavy ML deps)")
parser.add_argument("--headless", action="store_true",
help="Force headless mode (HTTP API only)")
parser.add_argument("--port", type=int, default=8000,
help="Port for headless HTTP API (default: 8000)")
parser.add_argument("--ui", action="store_true",
help="Force UI launch (Gradio)")
args, unknown = parser.parse_known_args()
# If --lite is set, use embedded lightweight server
if args.lite:
port = args.port or int(os.environ.get("HIVE_PORT", 7860))
run_lite_server(port)
sys.exit(0)
# Dependency preflight check + version validation
auto_install = os.environ.get("HIVE_AUTOINSTALL", "0") == "1"
version_statuses, missing_versions = check_and_maybe_install(REQUIRED_VERSIONS, auto_install=auto_install)
# Build readable missing list
MISSING_DEPS = missing_versions
CRITICAL_IMPORTS_OK = len(MISSING_DEPS) == 0
if not CRITICAL_IMPORTS_OK:
print("\n" + "="*60)
print("🐝 HIVE - DEPENDENCY / VERSION CHECK WARNING")
print("="*60)
print("\n⚠️ Missing or out-of-date packages:")
for pkg, st in version_statuses.items():
if not st.get('ok'):
print(f" - {pkg}: installed={st.get('installed')} required={st.get('required')}")
print("\n💡 Options:")
print(" 1. Run with --lite: python3 app.py --lite")
print(" 2. Install deps: pip install -r requirements.txt")
print(" 3. Use Docker: docker build -t hive . && docker run hive")
print("="*60 + "\n")
# If UI was requested but deps missing, switch to --lite
if args.ui:
print("[WARN] UI requested but critical deps missing. Switching to --lite mode.\n")
port = args.port or int(os.environ.get("HIVE_PORT", 7860))
run_lite_server(port)
sys.exit(0)
# Setup logging
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
# Initialize Bootstrap (headless-aware)
logging.info("="*70)
logging.info("HIVE Bootstrap Starting")
logging.info("="*70)
# Determine if the UI should be launched
launch_ui_flag = args.ui or bool(os.getenv("SPACE_ID") or os.getenv("HIVE_FORCE_UI"))
force_headless_flag = args.headless or bool(os.getenv("HIVE_HEADLESS"))
bootstrap = Bootstrap(force_ui=launch_ui_flag)
hive_lite = bootstrap.run()
# Detect if headless
is_headless = force_headless_flag or (bootstrap.env and bootstrap.env.get("is_headless", True))
if launch_ui_flag:
logging.info("UI flag or Hugging Face Space detected; launching UI (non-headless).")
is_headless = False
if is_headless:
logging.info("Headless mode detected. Starting HTTP API server...")
port = args.port or int(os.getenv("HIVE_PORT", "8000"))
http_server = setup_headless_http_server(hive_lite, port)
if http_server:
logging.info(f"HTTP API server configured on 0.0.0.0:{port}")
logging.info("Available endpoints:")
logging.info(" GET /health - Health check")
logging.info(" GET /status - Device status & capabilities")
logging.info(" POST /chat - Chat interface")
logging.info("="*70)
logging.info("Hive is running. Press Ctrl+C to shutdown.")
logging.info("="*70)
try:
run_headless_http_server(http_server)
except KeyboardInterrupt:
logging.info("Shutdown requested.")
if bootstrap.hive_instance and hasattr(bootstrap.hive_instance, "module_manager"):
bootstrap.hive_instance.module_manager.stop_all()
sys.exit(0)
else:
logging.warning("HTTP server setup failed. Running in idle mode...")
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
logging.info("Shutdown requested.")
sys.exit(0)
else:
# Non-headless: use UI
logging.info("UI mode enabled. Launching Gradio interface...")
try:
if gr is not None:
launch_ui(bootstrap)
else:
logging.warning("Gradio not installed; falling back to idle loop.")
while True:
time.sleep(60)
except Exception as e:
logging.error(f"Failed to launch Gradio UI: {e}")
while True:
time.sleep(60)
except KeyboardInterrupt:
logging.info("Shutdown requested.")
if bootstrap.hive_instance and hasattr(bootstrap.hive_instance, "module_manager"):
bootstrap.hive_instance.module_manager.stop_all()
sys.exit(0)