guru / brain.py
tejadabheja's picture
Upload folder using huggingface_hub
a5ae1ac verified
"""
Brain: two-layer performance wrapper over brain_core.
Layer 1 (fast): co-occurrence dict + word list. In-memory. Microseconds.
Layer 2 (background): neurons, successors, sentences → SQLite. Deferred.
The brain KNOWS things at Layer 1 speed. It SAVES them at Layer 2 speed.
Like human memory: learn instantly, consolidate during sleep.
from brain import Brain
brain = Brain(db_path="./my_brain")
brain.teach("paris is the capital of france") # instant (dict only)
brain.ask("capital of france") # works immediately
brain.flush() # persist to SQLite
"""
import os
import numpy as np
from collections import deque
from threading import Lock, Thread
from concurrent.futures import ThreadPoolExecutor
from brain_core import BrainCore, FUNCTION_WORDS, STRUCTURAL_WORDS, COOCCURRENCE_PULL
__all__ = ['Brain', 'FUNCTION_WORDS', 'STRUCTURAL_WORDS', 'COOCCURRENCE_PULL']
class Brain(BrainCore):
"""Two-layer brain. Same API as BrainCore, much faster teach.
Layer 1: _learn_word + _learn_cooccurrence (dict ops, ~microseconds)
Layer 2: neuron insert + successors + sentences (SQLite, deferred)
"""
def __init__(self, db_path=None):
self._bulk_mode = False
self._search_dirty = False
self._nid_to_word_cache = None
# Layer 2: async persist queue + background thread
self._persist_queue = deque()
self._persist_count = 0
self._flush_thread = None
self._flushing = False
super().__init__(db_path=db_path)
# --- Two-layer teach ---
def teach(self, sentence, confidence=0.5):
"""Layer 1: instant. Dict operations only.
Layer 2: queued for background persist."""
tokens = self._tokenize(sentence)
content = [t for t in tokens if t not in FUNCTION_WORDS]
if not content:
return []
with self._lock:
dim_before = len(self._words)
# Layer 1: co-occurrence (instant)
for word in content:
self._learn_word(word)
if len(content) >= 2:
self._learn_cooccurrence(content)
if len(self._words) != dim_before:
self._search_dirty = True
self._nid_to_word_cache = None
# Queue for Layer 2 persist — backpressure prevents OOM
self._persist_queue.append((sentence, content, tokens, confidence))
self._persist_count += 1
# Backpressure: flush when queue hits cap (memory-bounded)
MAX_QUEUE = 5000
if len(self._persist_queue) >= MAX_QUEUE:
self._flush_persist() # blocks until drained — backpressure
return list(range(len(content))) # placeholder IDs
def _async_flush(self):
"""Kick off a background flush if not already running."""
if self._flushing:
return
self._flushing = True
self._flush_thread = Thread(target=self._bg_flush, daemon=True)
self._flush_thread.start()
def _bg_flush(self):
"""Background thread: persist queued teaches to SQLite."""
try:
self._flush_persist()
finally:
self._flushing = False
def _flush_persist(self):
"""Layer 2: flush queued teaches to SQLite."""
if not self._persist_queue:
return
self.db.begin_batch()
flushed = 0
while self._persist_queue:
sentence, content, tokens, confidence = self._persist_queue.popleft()
neurons = []
for word in content:
if word in self._word_neurons:
n = self.db.get(self._word_neurons[word])
if n:
neurons.append(n)
continue
vec = self._encode_word(word)
if np.any(vec != 0):
n = self.db.insert(vec, confidence=confidence)
self._word_neurons[word] = n.id
self._nid_to_word_cache = None
self.db.save_word_mapping(word, n.id)
neurons.append(n)
for i in range(len(neurons) - 1):
self.db.update_successors(neurons[i].id, neurons[i + 1].id, 0.8)
self.db.update_predecessors(neurons[i + 1].id, neurons[i].id)
if len(neurons) >= 2:
self.db.record_sentence([n.id for n in neurons])
if len(tokens) >= 3:
self._extract_template(tokens)
flushed += 1
self.db.end_batch()
return flushed
def flush(self):
"""Force flush all pending teaches to SQLite."""
self._flush_persist()
self._save_cooc()
if self._search_dirty:
self._rebuild_search_matrix()
self._search_dirty = False
def ask(self, question):
"""Wait for any async flush, then query."""
# Wait for background flush to finish
if self._flush_thread and self._flush_thread.is_alive():
self._flush_thread.join()
# Flush any remaining
if self._persist_queue:
self._flush_persist()
if getattr(self, '_search_dirty', False):
self._rebuild_search_matrix()
self._search_dirty = False
return super().ask(question)
def teach_batch(self, sentences, confidence=0.5):
"""Batch teach — all Layer 1 instant, one Layer 2 flush at end."""
results = [self.teach(s, confidence) for s in sentences]
return results
# --- Bulk mode ---
def begin_bulk(self):
"""Enter bulk mode. Layer 2 deferred until end_bulk."""
self._bulk_mode = True
def end_bulk(self):
"""Exit bulk mode. Flush everything."""
self._bulk_mode = False
self._flush_persist()
self._save_cooc()
self._rebuild_search_matrix()
# --- Close ---
def close(self):
self._pool.shutdown(wait=False)
self._batch_pool.shutdown(wait=False)
if self._flush_thread and self._flush_thread.is_alive():
self._flush_thread.join()
self._flush_persist()
self._save_cooc()
self.db.close()
# --- Health with death risk ---
def health(self):
h = super().health()
h["persist_queue"] = len(self._persist_queue)
try:
with open('/proc/meminfo') as f:
for line in f:
if line.startswith('MemAvailable:'):
h["system_avail_mb"] = round(int(line.split()[1]) / 1024, 1)
elif line.startswith('SwapTotal:'):
h["swap_total_mb"] = round(int(line.split()[1]) / 1024, 1)
elif line.startswith('SwapFree:'):
h["swap_free_mb"] = round(int(line.split()[1]) / 1024, 1)
except Exception:
pass
h["swap_used_mb"] = round(h.get("swap_total_mb", 0) - h.get("swap_free_mb", 0), 1)
risk = 0
avail = h.get("system_avail_mb", 9999)
if avail < 2048:
risk += int((2048 - avail) / 2048 * 40)
if h.get("swap_used_mb", 0) > 100:
risk += min(25, int(h["swap_used_mb"] / 500 * 25))
if h["disk_free_gb"] < 5:
risk += int((5 - h["disk_free_gb"]) / 5 * 20)
try:
load1 = os.getloadavg()[0]
ncpu = os.cpu_count() or 1
if load1 > ncpu * 0.9:
risk += 10
except Exception:
pass
if h["rss_mb"] > 512:
risk += min(5, int((h["rss_mb"] - 512) / 1024 * 5))
h["death_risk"] = min(100, risk)
return h