| """ |
| Brain: two-layer performance wrapper over brain_core. |
| |
| Layer 1 (fast): co-occurrence dict + word list. In-memory. Microseconds. |
| Layer 2 (background): neurons, successors, sentences → SQLite. Deferred. |
| |
| The brain KNOWS things at Layer 1 speed. It SAVES them at Layer 2 speed. |
| Like human memory: learn instantly, consolidate during sleep. |
| |
| from brain import Brain |
| brain = Brain(db_path="./my_brain") |
| brain.teach("paris is the capital of france") # instant (dict only) |
| brain.ask("capital of france") # works immediately |
| brain.flush() # persist to SQLite |
| """ |
|
|
| import os |
| import numpy as np |
| from collections import deque |
| from threading import Lock, Thread |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| from brain_core import BrainCore, FUNCTION_WORDS, STRUCTURAL_WORDS, COOCCURRENCE_PULL |
|
|
| __all__ = ['Brain', 'FUNCTION_WORDS', 'STRUCTURAL_WORDS', 'COOCCURRENCE_PULL'] |
|
|
|
|
| class Brain(BrainCore): |
| """Two-layer brain. Same API as BrainCore, much faster teach. |
| |
| Layer 1: _learn_word + _learn_cooccurrence (dict ops, ~microseconds) |
| Layer 2: neuron insert + successors + sentences (SQLite, deferred) |
| """ |
|
|
| def __init__(self, db_path=None): |
| self._bulk_mode = False |
| self._search_dirty = False |
| self._nid_to_word_cache = None |
|
|
| |
| self._persist_queue = deque() |
| self._persist_count = 0 |
| self._flush_thread = None |
| self._flushing = False |
|
|
| super().__init__(db_path=db_path) |
|
|
| |
|
|
| def teach(self, sentence, confidence=0.5): |
| """Layer 1: instant. Dict operations only. |
| Layer 2: queued for background persist.""" |
| tokens = self._tokenize(sentence) |
| content = [t for t in tokens if t not in FUNCTION_WORDS] |
| if not content: |
| return [] |
|
|
| with self._lock: |
| dim_before = len(self._words) |
|
|
| |
| for word in content: |
| self._learn_word(word) |
| if len(content) >= 2: |
| self._learn_cooccurrence(content) |
|
|
| if len(self._words) != dim_before: |
| self._search_dirty = True |
| self._nid_to_word_cache = None |
|
|
| |
| self._persist_queue.append((sentence, content, tokens, confidence)) |
| self._persist_count += 1 |
|
|
| |
| MAX_QUEUE = 5000 |
| if len(self._persist_queue) >= MAX_QUEUE: |
| self._flush_persist() |
|
|
| return list(range(len(content))) |
|
|
| def _async_flush(self): |
| """Kick off a background flush if not already running.""" |
| if self._flushing: |
| return |
| self._flushing = True |
| self._flush_thread = Thread(target=self._bg_flush, daemon=True) |
| self._flush_thread.start() |
|
|
| def _bg_flush(self): |
| """Background thread: persist queued teaches to SQLite.""" |
| try: |
| self._flush_persist() |
| finally: |
| self._flushing = False |
|
|
| def _flush_persist(self): |
| """Layer 2: flush queued teaches to SQLite.""" |
| if not self._persist_queue: |
| return |
|
|
| self.db.begin_batch() |
| flushed = 0 |
|
|
| while self._persist_queue: |
| sentence, content, tokens, confidence = self._persist_queue.popleft() |
|
|
| neurons = [] |
| for word in content: |
| if word in self._word_neurons: |
| n = self.db.get(self._word_neurons[word]) |
| if n: |
| neurons.append(n) |
| continue |
| vec = self._encode_word(word) |
| if np.any(vec != 0): |
| n = self.db.insert(vec, confidence=confidence) |
| self._word_neurons[word] = n.id |
| self._nid_to_word_cache = None |
| self.db.save_word_mapping(word, n.id) |
| neurons.append(n) |
|
|
| for i in range(len(neurons) - 1): |
| self.db.update_successors(neurons[i].id, neurons[i + 1].id, 0.8) |
| self.db.update_predecessors(neurons[i + 1].id, neurons[i].id) |
|
|
| if len(neurons) >= 2: |
| self.db.record_sentence([n.id for n in neurons]) |
|
|
| if len(tokens) >= 3: |
| self._extract_template(tokens) |
|
|
| flushed += 1 |
|
|
| self.db.end_batch() |
| return flushed |
|
|
| def flush(self): |
| """Force flush all pending teaches to SQLite.""" |
| self._flush_persist() |
| self._save_cooc() |
| if self._search_dirty: |
| self._rebuild_search_matrix() |
| self._search_dirty = False |
|
|
| def ask(self, question): |
| """Wait for any async flush, then query.""" |
| |
| if self._flush_thread and self._flush_thread.is_alive(): |
| self._flush_thread.join() |
| |
| if self._persist_queue: |
| self._flush_persist() |
| if getattr(self, '_search_dirty', False): |
| self._rebuild_search_matrix() |
| self._search_dirty = False |
| return super().ask(question) |
|
|
| def teach_batch(self, sentences, confidence=0.5): |
| """Batch teach — all Layer 1 instant, one Layer 2 flush at end.""" |
| results = [self.teach(s, confidence) for s in sentences] |
| return results |
|
|
| |
|
|
| def begin_bulk(self): |
| """Enter bulk mode. Layer 2 deferred until end_bulk.""" |
| self._bulk_mode = True |
|
|
| def end_bulk(self): |
| """Exit bulk mode. Flush everything.""" |
| self._bulk_mode = False |
| self._flush_persist() |
| self._save_cooc() |
| self._rebuild_search_matrix() |
|
|
| |
|
|
| def close(self): |
| self._pool.shutdown(wait=False) |
| self._batch_pool.shutdown(wait=False) |
| if self._flush_thread and self._flush_thread.is_alive(): |
| self._flush_thread.join() |
| self._flush_persist() |
| self._save_cooc() |
| self.db.close() |
|
|
| |
|
|
| def health(self): |
| h = super().health() |
| h["persist_queue"] = len(self._persist_queue) |
|
|
| try: |
| with open('/proc/meminfo') as f: |
| for line in f: |
| if line.startswith('MemAvailable:'): |
| h["system_avail_mb"] = round(int(line.split()[1]) / 1024, 1) |
| elif line.startswith('SwapTotal:'): |
| h["swap_total_mb"] = round(int(line.split()[1]) / 1024, 1) |
| elif line.startswith('SwapFree:'): |
| h["swap_free_mb"] = round(int(line.split()[1]) / 1024, 1) |
| except Exception: |
| pass |
|
|
| h["swap_used_mb"] = round(h.get("swap_total_mb", 0) - h.get("swap_free_mb", 0), 1) |
|
|
| risk = 0 |
| avail = h.get("system_avail_mb", 9999) |
| if avail < 2048: |
| risk += int((2048 - avail) / 2048 * 40) |
| if h.get("swap_used_mb", 0) > 100: |
| risk += min(25, int(h["swap_used_mb"] / 500 * 25)) |
| if h["disk_free_gb"] < 5: |
| risk += int((5 - h["disk_free_gb"]) / 5 * 20) |
| try: |
| load1 = os.getloadavg()[0] |
| ncpu = os.cpu_count() or 1 |
| if load1 > ncpu * 0.9: |
| risk += 10 |
| except Exception: |
| pass |
| if h["rss_mb"] > 512: |
| risk += min(5, int((h["rss_mb"] - 512) / 1024 * 5)) |
| h["death_risk"] = min(100, risk) |
|
|
| return h |
|
|