guru / brain_core.py
tejadabheja's picture
Upload folder using huggingface_hub
a5ae1ac verified
"""
BrainCore: the clean reasoning engine. No performance hacks.
A growing matrix that learns from what it's taught.
No pretrained embeddings. No gradient descent. No FAISS.
brain = BrainCore()
brain.teach("paris is the capital of france")
brain.teach("london is the capital of england")
brain.ask("capital of france") # → "paris capital france"
The matrix IS the understanding. Each word is a dimension.
Co-occurring words pull toward each other. Query = search the matrix.
The DB (SQLite) is the portable brain — copy it anywhere.
"""
import re
import hashlib
import struct
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
import numpy as np
from neuron import NeuronDB, Neuron
from convergence import ConvergenceLoop, MultiHopConvergence
# How much co-occurring words pull toward each other
COOCCURRENCE_PULL = 0.3
# Words that are grammar, not knowledge
FUNCTION_WORDS = frozenset({
"the", "a", "an", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "must", "of", "in", "to",
"for", "with", "on", "at", "by", "from", "as", "into", "through",
"during", "before", "after", "above", "below", "between", "out",
"off", "over", "under", "and", "but", "or", "nor", "not", "so",
"yet", "both", "either", "neither", "each", "every", "all", "any",
"few", "more", "most", "other", "some", "such", "no", "only", "own",
"same", "than", "too", "very", "just", "about", "up", "what", "which",
"who", "whom", "this", "that", "these", "those", "am", "if", "then",
"because", "while", "although", "though", "even", "also", "it", "its",
"how", "when", "where", "why", "there", "here",
})
STRUCTURAL_WORDS = frozenset({
"is", "are", "was", "were", "wrote", "discovered", "invented",
"created", "built", "made", "found", "said", "called", "known",
"born", "died", "has", "had", "have", "been", "became", "became",
"the", "a", "an", "of", "in", "for", "to", "on", "at", "by",
"with", "from", "as", "and", "or", "but",
})
class BrainCore:
"""
A self-growing reasoning system.
The brain is a matrix + a database.
The matrix stores word relationships (encoder).
The database stores metadata (confidence, links, sentences).
Teaching grows the matrix. Querying searches it.
"""
def __init__(self, db_path: str = None):
"""
Create a brain. Optionally persist to disk.
Args:
db_path: directory for SQLite storage. None = in-memory.
"""
self._words = [] # index → word
self._word_idx = {} # word → index
self._cooc = {} # word_idx → {word_idx: weight} (sparse co-occurrence)
self._matrix = None # legacy compat — derived from _cooc when needed
self._db_path = db_path
# The database: metadata + persistence
self.db = NeuronDB(path=db_path, dim=1)
self._word_neurons = self.db.load_word_mappings()
# Templates for fluent output
self._templates = []
# Convergence loop for multi-hop reasoning
self._convergence = ConvergenceLoop(
self.db, max_hops=10, k=5,
convergence_threshold=0.99,
min_confidence=0.1, min_relevance=0.3,
)
self._multi_hop = MultiHopConvergence(
self._convergence, max_rounds=3, concept_blend_weight=0.4,
)
# Nid-to-word cache
self._nid_to_word_cache = None
# Thread pools
self._pool = ThreadPoolExecutor(max_workers=4)
self._batch_pool = ThreadPoolExecutor(max_workers=8)
self._lock = Lock()
# Load co-occurrence from DB
self._load_cooc()
# --- Learning ---
def teach(self, sentence: str, confidence: float = 0.5) -> list:
"""
Teach the brain a sentence. Returns list of neuron IDs created.
"paris is the capital of france" →
1. Learns words: paris, capital, france (skips function words)
2. Records co-occurrence: paris↔capital, paris↔france, capital↔france
3. Creates neurons in DB with successor links
4. Records sentence association
5. Extracts template: "[S0] is the [S1] of [S2]"
6. Reindexes all vectors to current dimensions
"""
tokens = self._tokenize(sentence)
content = [t for t in tokens if t not in FUNCTION_WORDS]
if not content:
return []
with self._lock:
# Batch all DB writes for this sentence
self.db.begin_batch()
# Grow the matrix
dim_before = len(self._words)
for word in content:
self._learn_word(word)
if len(content) >= 2:
self._learn_cooccurrence(content)
# Create neurons in DB
neurons = []
for word in content:
if word in self._word_neurons:
n = self.db.get(self._word_neurons[word])
if n:
neurons.append(n)
continue
vec = self._encode_word(word)
if np.any(vec != 0):
n = self.db.insert(vec, confidence=confidence)
self._word_neurons[word] = n.id
self.db.save_word_mapping(word, n.id)
neurons.append(n)
# Wire successors
for i in range(len(neurons) - 1):
self.db.update_successors(neurons[i].id, neurons[i + 1].id, 0.8)
self.db.update_predecessors(neurons[i + 1].id, neurons[i].id)
# Record sentence
if len(neurons) >= 2:
self.db.record_sentence([n.id for n in neurons])
# Extract template
if len(tokens) >= 3:
self._extract_template(tokens)
# Flush all writes at once
self.db.end_batch()
# Reindex if dimensions changed
if len(self._words) != dim_before:
self._reindex()
return [n.id for n in neurons]
def teach_batch(self, sentences: list, confidence: float = 0.5) -> list:
"""Teach multiple sentences in parallel. Returns list of neuron ID lists."""
futures = [self._batch_pool.submit(self.teach, s, confidence) for s in sentences]
return [f.result() for f in futures]
def correct(self, question: str, answer: str):
"""Learn from a failure. Teach the answer, resolve the miss."""
self.teach(answer, confidence=0.6)
self.db.resolve_miss_by_query(question, answer)
# --- Querying ---
def ask_batch(self, questions: list) -> list:
"""Ask multiple questions in parallel. Returns list of result dicts.
Uses separate batch pool to avoid thread starvation (each ask()
uses the internal pool for its own parallelism)."""
futures = [self._batch_pool.submit(self.ask, q) for q in questions]
return [f.result() for f in futures]
def ask(self, question: str) -> dict:
"""
Ask the brain a question. Returns dict with answer + trace.
Flow:
1. Encode question → vector
2. Multi-hop convergence: iteratively search the neuron DB,
blending discovered concepts back into the query each round.
This allows reasoning to cross concept boundaries.
3. Sparse co-occurrence search as complementary signal
4. Disambiguate via sentence table
5. Output answer (template or concept list)
"""
tokens = self._tokenize(question)
content = [t for t in tokens if t not in FUNCTION_WORDS]
query_vec = self._encode_sentence(question)
if np.all(query_vec == 0) or len(self._words) == 0:
self.db.log_miss(question, query_vec if query_vec is not None
else np.zeros(1, dtype=np.float32))
return {"answer": "I don't know.", "confidence": 0.0,
"strategy": "abstain", "trace": "No knowledge"}
content_indices = [self._word_idx[t] for t in content
if t in self._word_idx]
content_nids = [self._word_neurons[t] for t in content
if t in self._word_neurons]
if not content_indices:
self.db.log_miss(question, query_vec if query_vec is not None
else np.zeros(1, dtype=np.float32))
return {"answer": "I don't know.", "confidence": 0.0,
"strategy": "abstain", "trace": "No known words in query"}
# --- Multi-hop convergence ---
# Run convergence loop on the dense neuron vectors in the DB.
# Each round discovers concepts, blends them into the query,
# and searches again — allowing reasoning across concept gaps.
multi_hop_result = self._multi_hop.reason(query_vec)
convergence_trace = multi_hop_result.trace()
# Build nid→word lookup
if self._nid_to_word_cache is None:
self._nid_to_word_cache = {nid: w for w, nid in self._word_neurons.items()
if not w.startswith("__")}
nid_to_word = self._nid_to_word_cache
word_to_nid = {w: nid for nid, w in nid_to_word.items()}
# --- Positional encoding for reasoning ---
# Build a map of query word positions: nid → position in query.
# Used to bias search results toward concepts that appeared at
# similar positions in taught sentences. This lets the system
# reason about word order during search, not just at output time.
query_positions = {}
for i, t in enumerate(content):
nid = self._word_neurons.get(t)
if nid is not None:
query_positions[nid] = i
# Build nid → set of taught positions (across all sentences)
nid_taught_positions = {}
if content_nids:
all_sentences = self.db.get_sentences_for_neurons(content_nids)
for sid, nid_pos_list in all_sentences.items():
for nid, pos in nid_pos_list:
if nid not in nid_taught_positions:
nid_taught_positions[nid] = set()
nid_taught_positions[nid].add(pos)
def _position_bias(nid: int) -> float:
"""Compute position similarity bias for a neuron.
Returns a multiplier >= 1.0. Concepts that appeared at
positions similar to query word positions get boosted.
Max boost is 1.5 (50% increase for perfect position match).
"""
taught_pos = nid_taught_positions.get(nid)
if not taught_pos or not query_positions:
return 1.0
# For each query word position, find the closest taught
# position for this concept. Average the inverse distances.
total_sim = 0.0
count = 0
for q_nid, q_pos in query_positions.items():
min_dist = min(abs(q_pos - tp) for tp in taught_pos)
# Position similarity: 1 / (1 + distance)
total_sim += 1.0 / (1.0 + min_dist)
count += 1
if count == 0:
return 1.0
avg_sim = total_sim / count
# Scale to [1.0, 1.5] range — modest boost, not dominant
return 1.0 + 0.5 * avg_sim
# Collect concepts from multi-hop convergence
concepts = []
seen = set()
for n in multi_hop_result.concepts:
word = nid_to_word.get(n.id)
if word and n.id not in seen and word not in FUNCTION_WORDS:
concepts.append((n, word))
seen.add(n.id)
# --- Sparse co-occurrence search (complementary) ---
weights = [1.0 / (1.0 + 0.1 * i) for i in range(len(content_indices))]
query_cooc = self._sparse_blend(content_indices, weights)
search_results = self._sparse_search(query_cooc, k=10)
# Apply position bias to sparse search results for re-ranking
if nid_taught_positions:
biased_results = []
for word_idx, sim in search_results:
if word_idx < len(self._words):
word = self._words[word_idx]
nid = word_to_nid.get(word)
if nid:
biased_results.append((word_idx, sim * _position_bias(nid)))
else:
biased_results.append((word_idx, sim))
else:
biased_results.append((word_idx, sim))
biased_results.sort(key=lambda x: x[1], reverse=True)
search_results = biased_results
for word_idx, sim in search_results:
if word_idx < len(self._words):
word = self._words[word_idx]
nid = word_to_nid.get(word)
if nid and nid not in seen and word not in FUNCTION_WORDS:
n = self.db.get(nid)
if n:
concepts.append((n, word))
seen.add(nid)
# --- Sentence disambiguation ---
best_sentence_nids = None
if content_nids:
sentences = self.db.get_sentences_for_neurons(content_nids)
if sentences:
scored = [(sid, len(nids)) for sid, nids in sentences.items()]
scored.sort(key=lambda x: x[1], reverse=True)
best_score = scored[0][1]
best_sentence_nids = set()
for sid, score in scored:
if score < best_score:
break
for nid, pos in self.db.get_sentence_neurons(sid):
best_sentence_nids.add(nid)
# Add concepts from sentence disambiguation
if best_sentence_nids:
for nid in best_sentence_nids:
if nid not in seen:
n = self.db.get(nid)
word = nid_to_word.get(nid)
if n and word:
concepts.append((n, word))
seen.add(nid)
if not concepts:
self.db.log_miss(question, query_vec)
return {"answer": "I don't know.", "confidence": 0.0,
"strategy": "abstain", "trace": "No relevant concepts"}
# Filter: keep only concepts from best sentence (if disambiguated)
if best_sentence_nids and len(content_nids) >= 2:
concepts = [(n, w) for n, w in concepts
if n.id in best_sentence_nids
or n.id in set(content_nids)]
# Generate answer (pass convergence trace for inspectability)
result = self._generate(concepts, query_vec, tokens, question)
result["convergence_trace"] = convergence_trace
result["converged"] = multi_hop_result.converged
result["convergence_rounds"] = len(multi_hop_result.rounds)
return result
# --- Text Generation (fluent output from the graph) ---
def generate(self, query: str, max_tokens: int = 30, temperature: float = 0.7,
min_score: float = 0.01) -> dict:
"""
Generate fluent text from the graph, steered by a query.
Algorithm:
1. Run ask() to find starting concepts via convergence
2. Pick best starting word (highest confidence, not a query word)
3. Walk the graph: score words by context_similarity × (1 + successor_confidence)
4. Apply softmax with temperature, pick top word
5. Update running context profile, loop detection, min-score stopping
Returns dict with 'text', 'trace' list, 'tokens_generated'.
"""
import math
# Need vocabulary to generate from
if not self._words:
return {"text": "", "trace": ["No vocabulary"], "tokens_generated": 0}
# Step 1: find starting concepts via ask()
ask_result = self.ask(query)
query_tokens_set = set(self._tokenize(query))
# Build nid→word lookup
nid_to_word = {nid: w for w, nid in self._word_neurons.items()
if not w.startswith("__")}
# Step 2: pick starting word — highest confidence concept not in query
start_word = None
start_score = -1.0
# Try concepts from ask result
answer_words = self._tokenize(ask_result.get("answer", ""))
for w in answer_words:
if w in self._word_idx and w not in FUNCTION_WORDS and w not in query_tokens_set:
nid = self._word_neurons.get(w)
conf = 0.5
if nid:
n = self.db.get(nid)
if n:
conf = n.confidence
if conf > start_score:
start_score = conf
start_word = w
# Fallback: best connected word to query
if start_word is None:
query_content = [t for t in self._tokenize(query) if t in self._word_idx]
if query_content:
q_indices = [self._word_idx[t] for t in query_content]
q_cooc = self._sparse_blend(q_indices)
search = self._sparse_search(q_cooc, k=20)
for widx, sim in search:
w = self._words[widx]
if w not in FUNCTION_WORDS and w not in query_tokens_set:
start_word = w
start_score = sim
break
if start_word is None:
return {"text": "", "trace": ["No starting word found"], "tokens_generated": 0}
# Step 3: walk the graph
generated = [start_word]
trace = [{"token": start_word, "score": start_score, "reason": "start (best concept)"}]
recent_window = 6 # loop detection window
# Initialize running context profile from start word
start_idx = self._word_idx[start_word]
context_profile = dict(self._cooc.get(start_idx, {}))
for pos in range(1, max_tokens):
prev_word = generated[-1]
prev_idx = self._word_idx.get(prev_word)
# Get predecessor's successor list for bonus scoring
prev_successors = {}
if prev_word in self._word_neurons:
prev_nid = self._word_neurons[prev_word]
prev_neuron = self.db.get(prev_nid)
if prev_neuron and prev_neuron.successors:
for succ_nid, succ_conf in prev_neuron.successors:
succ_word = nid_to_word.get(succ_nid)
if succ_word:
prev_successors[succ_word] = succ_conf
# Score every word in vocabulary
best_word = None
best_raw_score = -1.0
scores = []
for widx, word in enumerate(self._words):
if word in FUNCTION_WORDS:
continue
# Context similarity: sparse cosine between word's cooc and running context
word_cooc = self._cooc.get(widx, {})
ctx_sim = self._sparse_cosine(word_cooc, context_profile)
# Successor confidence bonus
succ_conf = prev_successors.get(word, 0.0)
raw_score = ctx_sim * (1.0 + succ_conf)
if raw_score > 0:
scores.append((widx, word, raw_score))
if not scores:
trace.append({"token": "[STOP]", "score": 0, "reason": "no candidates"})
break
# Sort by score descending
scores.sort(key=lambda x: x[2], reverse=True)
# Apply softmax with temperature over top candidates (cap at 50 for speed)
top_n = min(50, len(scores))
top_scores = scores[:top_n]
if temperature > 0:
max_raw = top_scores[0][2]
exp_scores = []
for widx, word, raw in top_scores:
exp_scores.append((widx, word, raw, math.exp((raw - max_raw) / temperature)))
total_exp = sum(e for _, _, _, e in exp_scores)
if total_exp > 0:
softmax_scores = [(widx, word, raw, e / total_exp) for widx, word, raw, e in exp_scores]
else:
softmax_scores = [(widx, word, raw, 0.0) for widx, word, raw, _ in exp_scores]
else:
# Greedy: just pick top
softmax_scores = [(top_scores[0][0], top_scores[0][1], top_scores[0][2], 1.0)]
# Pick top word (deterministic top-1 from softmax ranking)
# Filter out recent words (loop detection) and below min_score
chosen = None
recent_set = set(generated[-recent_window:]) if len(generated) >= recent_window else set(generated)
for widx, word, raw, prob in softmax_scores:
if word in recent_set:
continue
if raw < min_score:
continue
chosen = (widx, word, raw, prob)
break
if chosen is None:
trace.append({"token": "[STOP]", "score": 0,
"reason": "all candidates below threshold or in loop"})
break
widx, word, raw, prob = chosen
generated.append(word)
trace.append({"token": word, "score": round(raw, 4),
"prob": round(prob, 4),
"reason": f"ctx_sim×(1+succ), prob={prob:.3f}"})
# Update running context profile by blending in new word's cooc
new_cooc = self._cooc.get(widx, {})
# Blend: 70% existing context + 30% new word
blend_weight = 0.3
for k, v in new_cooc.items():
context_profile[k] = context_profile.get(k, 0) * (1 - blend_weight) + v * blend_weight
# Decay old entries slightly
for k in list(context_profile.keys()):
if k not in new_cooc:
context_profile[k] *= (1 - blend_weight)
return {
"text": " ".join(generated),
"trace": trace,
"tokens_generated": len(generated),
}
# --- Answer Generation (internal, from concepts) ---
def _generate(self, concepts, query_vec, query_tokens, question) -> dict:
"""Generate an answer from concepts. Template and chain race in parallel."""
concept_neurons = [n for n, w in concepts]
concept_words = [w for n, w in concepts]
concept_ids = [n.id for n, w in concepts]
# Race: template vs sentence chain — first valid result wins
template_future = self._pool.submit(
self._try_template,
concept_neurons, concept_words, concept_ids, query_vec, query_tokens
)
chain_future = self._pool.submit(
self._try_sentence_chain, concept_ids, query_vec
)
# Wait for both (fast enough that ordering doesn't matter much)
template_result = template_future.result()
chain_result = chain_future.result()
if template_result:
return template_result
if chain_result:
return chain_result
# Fallback: concept list
avg_conf = float(np.mean([n.confidence for n in concept_neurons]))
return {
"answer": " ".join(concept_words),
"confidence": avg_conf,
"strategy": "concept_list",
"trace": f"Concepts: {concept_words}",
}
def _try_template(self, neurons, words, nids, query_vec, query_tokens) -> dict:
"""Try to fill a template with concepts."""
if not self._templates:
return None
query_word_set = set(query_tokens)
# Score templates by structural word overlap with query
best_template = None
best_score = 0
for pattern, slots, tvec in self._templates:
struct_words = [w for w in re.findall(r'[a-z]+', pattern.lower())
if w not in [s.lower() for s in slots]]
overlap = sum(1 for w in struct_words if w in query_word_set)
if overlap > best_score:
best_score = overlap
best_template = (pattern, slots)
if not best_template or best_score == 0:
return None
pattern, slots = best_template
# Order concepts by taught sentence position
sentence_order = self._get_sentence_order(nids)
if sentence_order:
ordered = sorted(zip(neurons, words),
key=lambda p: sentence_order.get(p[0].id, 999))
else:
ordered = list(zip(neurons, words))
# Fill slots
content_words = [w for n, w in ordered
if w.lower() not in STRUCTURAL_WORDS]
fills = {}
available = list(content_words)
for slot_name in slots:
if available:
fills[slot_name] = available.pop(0)
if not fills:
return None
text = pattern
for name, value in fills.items():
text = text.replace(f"[{name}]", value)
# Replace unfilled slots
for name in slots:
if name not in fills:
text = text.replace(f"[{name}]", "...")
# Convergence: does the filled template live near the query?
answer_vec = self._encode_sentence(text)
q_norm = np.linalg.norm(query_vec)
a_norm = np.linalg.norm(answer_vec)
if q_norm > 0 and a_norm > 0:
convergence = float(np.dot(query_vec, answer_vec) / (q_norm * a_norm))
else:
convergence = 0.0
if convergence <= 0:
return None
return {
"answer": text,
"confidence": convergence,
"strategy": "template",
"trace": f"Template: {pattern}, fills: {fills}, convergence={convergence:.3f}",
}
def _try_sentence_chain(self, concept_ids, query_vec) -> dict:
"""Find the best matching taught sentence, output in word order."""
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return None
nid_to_word = {nid: w for w, nid in self._word_neurons.items()
if not w.startswith("__")}
scored = []
for sid, matched in sentences.items():
sent_neurons = self.db.get_sentence_neurons(sid)
if not sent_neurons:
continue
score = len(matched)
scored.append((sid, score, sent_neurons))
if not scored:
return None
scored.sort(key=lambda x: x[1], reverse=True)
sid, score, sent_neurons = scored[0]
# Output in taught order
ordered = sorted(sent_neurons, key=lambda x: x[1])
words = [nid_to_word.get(nid, "") for nid, pos in ordered]
words = [w for w in words if w]
if len(words) < 2:
return None
# Sparse convergence check: do answer words' co-occurrence overlap with query?
answer_indices = [self._word_idx[w] for w in words if w in self._word_idx]
if not answer_indices:
return None
answer_cooc = self._sparse_blend(answer_indices)
# concept_ids are neuron IDs — convert to word indices
nid_to_word = self._nid_to_word_cache or {nid: w for w, nid in self._word_neurons.items()
if not w.startswith("__")}
query_words = [nid_to_word.get(nid) for nid in concept_ids]
query_indices = [self._word_idx[w] for w in query_words
if w and w in self._word_idx]
if not query_indices:
query_indices = answer_indices
query_cooc = self._sparse_blend(query_indices)
convergence = self._sparse_cosine(query_cooc, answer_cooc)
if convergence <= 0:
return None
return {
"answer": " ".join(words),
"confidence": convergence,
"strategy": "sentence_chain",
"trace": f"Sentence {sid} (convergence={convergence:.3f}): {words}",
}
def _get_sentence_order(self, concept_ids) -> dict:
"""Get taught position order for concepts."""
if len(concept_ids) < 2:
return {}
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return {}
best_sid = max(sentences, key=lambda sid: len(sentences[sid]))
if len(sentences[best_sid]) < 2:
return {}
return {nid: pos for nid, pos in self.db.get_sentence_neurons(best_sid)}
# --- Co-occurrence operations (dict-based, no N×N matrix) ---
def _learn_word(self, word: str) -> int:
"""Add a word. Returns its index."""
word = word.lower().strip()
if word in self._word_idx:
return self._word_idx[word]
idx = len(self._words)
self._words.append(word)
self._word_idx[word] = idx
self._cooc[idx] = {idx: 1.0} # self-connection
return idx
def _load_cooc(self):
"""Load co-occurrence from the cooccurrence table, or rebuild from neuron vectors."""
# Build word list from DB
words_in_db = sorted(
[(w, nid) for w, nid in self._word_neurons.items()
if not w.startswith("__")],
key=lambda x: x[1]
)
for word, nid in words_in_db:
if word not in self._word_idx:
idx = len(self._words)
self._words.append(word)
self._word_idx[word] = idx
self._cooc[idx] = {idx: 1.0}
if not words_in_db:
return
# Try loading from cooccurrence table first
try:
rows = self.db.db.execute(
"SELECT word_a, word_b, weight FROM cooccurrence"
).fetchall()
if rows:
for a, b, w in rows:
if a not in self._cooc:
self._cooc[a] = {a: 1.0}
if b not in self._cooc:
self._cooc[b] = {b: 1.0}
self._cooc[a][b] = w
self._cooc[b][a] = w
self._rebuild_search_matrix()
return
except Exception:
pass # table doesn't exist yet
# Create cooccurrence table
self.db.db.execute("""
CREATE TABLE IF NOT EXISTS cooccurrence (
word_a INTEGER NOT NULL,
word_b INTEGER NOT NULL,
weight REAL NOT NULL DEFAULT 0.0,
PRIMARY KEY (word_a, word_b)
)
""")
self.db.db.execute(
"CREATE INDEX IF NOT EXISTS idx_cooc_a ON cooccurrence(word_a)")
self.db.db.commit()
# Rebuild from neuron vectors (migration from old N×N format)
n = len(self._words)
nid_to_idx = {nid: self._word_idx[w]
for w, nid in words_in_db if w in self._word_idx}
for nid, vec_bytes in self.db.db.execute("SELECT id, vector FROM neurons"):
idx = nid_to_idx.get(nid)
if idx is None:
continue
vec = np.frombuffer(vec_bytes, dtype=np.float32)
for j, val in enumerate(vec):
if j < n and val > 0 and j != idx:
if idx not in self._cooc:
self._cooc[idx] = {idx: 1.0}
self._cooc[idx][j] = float(val)
# Persist to cooccurrence table
self._save_cooc()
self._rebuild_search_matrix()
def _save_cooc(self):
"""Persist co-occurrence dict to SQLite."""
self.db.db.execute("""
CREATE TABLE IF NOT EXISTS cooccurrence (
word_a INTEGER NOT NULL,
word_b INTEGER NOT NULL,
weight REAL NOT NULL DEFAULT 0.0,
PRIMARY KEY (word_a, word_b)
)
""")
pairs = []
for a, neighbors in self._cooc.items():
for b, w in neighbors.items():
if a != b and w > 0:
pairs.append((a, b, w))
self.db.db.execute("DELETE FROM cooccurrence")
self.db.db.executemany(
"INSERT INTO cooccurrence (word_a, word_b, weight) VALUES (?, ?, ?)",
pairs)
self.db.db.commit()
def _save_matrix(self):
"""Persist co-occurrence to DB."""
self._save_cooc()
def _learn_cooccurrence(self, words: list):
"""Strengthen connections between co-occurring words."""
indices = [self._word_idx[w.lower().strip()] for w in words
if w.lower().strip() in self._word_idx]
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
a, b = indices[i], indices[j]
if a not in self._cooc:
self._cooc[a] = {a: 1.0}
if b not in self._cooc:
self._cooc[b] = {b: 1.0}
self._cooc[a][b] = self._cooc[a].get(b, 0) + COOCCURRENCE_PULL
self._cooc[b][a] = self._cooc[b].get(a, 0) + COOCCURRENCE_PULL
# --- Sparse operations (no N-dimensional vectors) ---
def _get_cooc(self, word: str) -> dict:
"""Get a word's co-occurrence dict. Sparse. O(K) where K = connections."""
word = word.lower().strip()
idx = self._word_idx.get(word)
if idx is None:
return {}
return self._cooc.get(idx, {})
def _sparse_norm(self, d: dict) -> float:
"""L2 norm of a sparse dict."""
return sum(v * v for v in d.values()) ** 0.5
def _sparse_cosine(self, a: dict, b: dict) -> float:
"""Cosine similarity between two sparse dicts. O(min(|a|, |b|))."""
if not a or not b:
return 0.0
# Iterate over the smaller dict
if len(a) > len(b):
a, b = b, a
dot = sum(v * b.get(k, 0) for k, v in a.items())
if dot == 0:
return 0.0
na = self._sparse_norm(a)
nb = self._sparse_norm(b)
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
def _sparse_blend(self, word_indices: list, weights: list = None) -> dict:
"""Blend multiple words' co-occurrence dicts. Weighted average."""
result = {}
if weights is None:
weights = [1.0] * len(word_indices)
total_w = sum(weights)
if total_w == 0:
return result
for idx, w in zip(word_indices, weights):
for k, v in self._cooc.get(idx, {}).items():
result[k] = result.get(k, 0) + w * v
# Normalize
for k in result:
result[k] /= total_w
return result
def _sparse_search(self, query_cooc: dict, k: int = 5) -> list:
"""Search all words by sparse cosine with query. O(N × K)."""
if not query_cooc:
return []
scores = []
q_norm = self._sparse_norm(query_cooc)
if q_norm == 0:
return []
for word_idx, word_cooc in self._cooc.items():
if not word_cooc:
continue
dot = sum(query_cooc.get(j, 0) * v for j, v in word_cooc.items())
if dot > 0:
w_norm = self._sparse_norm(word_cooc)
if w_norm > 0:
sim = dot / (q_norm * w_norm)
scores.append((word_idx, sim))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:k]
def _encode_word(self, word: str) -> np.ndarray:
"""Get a word's vector. Dense form for backward compat."""
word = word.lower().strip()
n = len(self._words)
idx = self._word_idx.get(word)
if idx is None or n == 0:
return np.zeros(n or 1, dtype=np.float32)
vec = np.zeros(n, dtype=np.float32)
for j, w in self._cooc.get(idx, {}).items():
if j < n:
vec[j] = w
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
return vec
def _encode_sentence(self, text: str) -> np.ndarray:
"""Encode a sentence. Dense form for backward compat."""
tokens = self._tokenize(text)
if not tokens:
d = len(self._words) or 1
return np.zeros(d, dtype=np.float32)
vectors = []
weights = []
for i, token in enumerate(tokens):
vec = self._encode_word(token)
if np.any(vec != 0):
vectors.append(vec)
weights.append(1.0 / (1.0 + 0.1 * i))
if not vectors:
d = len(self._words) or 1
return np.zeros(d, dtype=np.float32)
vectors = np.array(vectors)
weights = np.array(weights, dtype=np.float32)
weights = weights / weights.sum()
result = np.average(vectors, axis=0, weights=weights).astype(np.float32)
norm = np.linalg.norm(result)
if norm > 0:
result = result / norm
return result
def _reindex(self):
"""Re-encode all neurons to current dimensions."""
for word, nid in self._word_neurons.items():
if word.startswith("__"):
continue
vec = self._encode_word(word)
if np.any(vec != 0):
# Update search matrix
row = self.db._id_to_row.get(nid)
if row is not None and self.db._vectors is not None:
if vec.shape[0] != self.db._vectors.shape[1]:
# Dimension changed — full rebuild
self._rebuild_search_matrix()
return
self.db._vectors[row] = vec
self.db.db.execute(
"UPDATE neurons SET vector = ? WHERE id = ?",
(vec.tobytes(), nid)
)
self.db.db.commit()
# Re-encode templates
for i, (pattern, slots, old_vec) in enumerate(self._templates):
text = re.sub(r'\[[A-Z_0-9]+\]', '', pattern).strip()
vec = self._encode_sentence(text)
self._templates[i] = (pattern, slots, vec)
def _rebuild_search_matrix(self):
"""No-op. Sparse search uses _cooc dict directly. No matrix needed."""
pass
# --- Template extraction ---
def _extract_template(self, tokens: list):
"""Auto-extract a template from a sentence."""
structural = set()
content = []
for i, token in enumerate(tokens):
if token in STRUCTURAL_WORDS:
structural.add(i)
else:
content.append(i)
if not content or not structural:
return
pattern_parts = []
slots = {}
slot_idx = 0
for i, token in enumerate(tokens):
if i in structural:
pattern_parts.append(token)
else:
name = f"S{slot_idx}"
pattern_parts.append(f"[{name}]")
slots[name] = "noun"
slot_idx += 1
pattern = " ".join(pattern_parts)
# Check for duplicates
for p, s, v in self._templates:
if p == pattern:
return
text = " ".join(t for t in tokens if t in STRUCTURAL_WORDS)
vec = self._encode_sentence(text) if text.strip() else np.zeros(
len(self._words) or 1, dtype=np.float32
)
self._templates.append((pattern, slots, vec))
# --- Utilities ---
@staticmethod
def _tokenize(text: str) -> list:
return re.findall(r'[a-z0-9]+', text.lower())
def inspect(self, word: str) -> dict:
"""Show what the brain knows about a word."""
word = word.lower().strip()
if word not in self._word_idx:
return {"word": word, "known": False}
idx = self._word_idx[word]
# Find strongest relationships from co-occurrence dict
connections = []
for j, val in self._cooc.get(idx, {}).items():
if j != idx and j < len(self._words):
connections.append((self._words[j], float(val)))
connections.sort(key=lambda x: x[1], reverse=True)
nid = self._word_neurons.get(word)
neuron = self.db.get(nid) if nid else None
return {
"word": word,
"known": True,
"dimension": idx,
"connections": connections[:10],
"confidence": neuron.confidence if neuron else None,
"successors": len(neuron.successors) if neuron else 0,
}
def stats(self) -> dict:
return {
"words": len(self._words),
"dimensions": len(self._words),
"neurons": self.db.count(),
"templates": len(self._templates),
"matrix_size": f"{len(self._words)}x{len(self._words)}",
}
def health(self) -> dict:
"""
Self-awareness: how much resource am I using?
Returns CPU, memory, DB size, matrix size, disk free.
The brain should know its own cost and not exploit the machine.
"""
h = self.db.health()
# Add brain-level matrix stats
# Estimate co-occurrence memory: entries × ~50 bytes
cooc_entries = sum(len(v) for v in self._cooc.values())
h["brain_matrix_mb"] = round(cooc_entries * 50 / (1024 * 1024), 2)
h["cooc_entries"] = cooc_entries
h["words"] = len(self._words)
h["templates"] = len(self._templates)
# Pressure signals — is the brain getting too big?
h["memory_pressure"] = h["rss_mb"] > 512 # over 512MB = pressure
h["disk_pressure"] = h["disk_free_gb"] < 1.0 # under 1GB = pressure
h["matrix_pressure"] = len(self._words) > 10000 # 10K dims = O(100M) matrix
return h
def close(self):
self._pool.shutdown(wait=False)
self._batch_pool.shutdown(wait=False)
self._save_matrix()
self.db.close()
# --- CLI ---
if __name__ == "__main__":
brain = BrainCore()
print("Brain — self-growing reasoning system")
print("Commands: teach <sentence>, ask <question>, generate <query>, inspect <word>, stats, quit")
print()
while True:
try:
line = input("> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not line:
continue
parts = line.split(None, 1)
cmd = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
if cmd in ("quit", "exit"):
break
elif cmd == "teach":
ids = brain.teach(arg)
print(f"Learned {len(ids)} concepts. Dimensions: {len(brain._words)}")
elif cmd == "ask":
result = brain.ask(arg)
print(f"A: {result['answer']}")
print(f" [{result['strategy']}, conf={result['confidence']:.3f}]")
elif cmd == "generate":
result = brain.generate(arg)
print(f"Generated: {result['text']}")
print(f" [{result['tokens_generated']} tokens]")
for step in result['trace']:
print(f" {step['token']}: score={step.get('score', '?')}"
f" ({step.get('reason', '')})")
elif cmd == "inspect":
info = brain.inspect(arg)
if info["known"]:
print(f" dim={info['dimension']}, conf={info['confidence']}")
for w, v in info["connections"]:
print(f" → {w}: {v:.3f}")
else:
print(f" Unknown word: {arg}")
elif cmd == "stats":
for k, v in brain.stats().items():
print(f" {k}: {v}")
else:
print(f"Unknown: {cmd}")
brain.close()
print("Done.")