| """ |
| BrainCore: the clean reasoning engine. No performance hacks. |
| |
| A growing matrix that learns from what it's taught. |
| No pretrained embeddings. No gradient descent. No FAISS. |
| |
| brain = BrainCore() |
| brain.teach("paris is the capital of france") |
| brain.teach("london is the capital of england") |
| brain.ask("capital of france") # → "paris capital france" |
| |
| The matrix IS the understanding. Each word is a dimension. |
| Co-occurring words pull toward each other. Query = search the matrix. |
| The DB (SQLite) is the portable brain — copy it anywhere. |
| """ |
|
|
| import re |
| import hashlib |
| import struct |
| from collections import OrderedDict |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from pathlib import Path |
| from threading import Lock |
|
|
| import numpy as np |
|
|
| from neuron import NeuronDB, Neuron |
| from convergence import ConvergenceLoop, MultiHopConvergence |
|
|
| |
| COOCCURRENCE_PULL = 0.3 |
|
|
| |
| FUNCTION_WORDS = frozenset({ |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", |
| "have", "has", "had", "do", "does", "did", "will", "would", "could", |
| "should", "may", "might", "shall", "can", "must", "of", "in", "to", |
| "for", "with", "on", "at", "by", "from", "as", "into", "through", |
| "during", "before", "after", "above", "below", "between", "out", |
| "off", "over", "under", "and", "but", "or", "nor", "not", "so", |
| "yet", "both", "either", "neither", "each", "every", "all", "any", |
| "few", "more", "most", "other", "some", "such", "no", "only", "own", |
| "same", "than", "too", "very", "just", "about", "up", "what", "which", |
| "who", "whom", "this", "that", "these", "those", "am", "if", "then", |
| "because", "while", "although", "though", "even", "also", "it", "its", |
| "how", "when", "where", "why", "there", "here", |
| }) |
|
|
| STRUCTURAL_WORDS = frozenset({ |
| "is", "are", "was", "were", "wrote", "discovered", "invented", |
| "created", "built", "made", "found", "said", "called", "known", |
| "born", "died", "has", "had", "have", "been", "became", "became", |
| "the", "a", "an", "of", "in", "for", "to", "on", "at", "by", |
| "with", "from", "as", "and", "or", "but", |
| }) |
|
|
|
|
| class BrainCore: |
| """ |
| A self-growing reasoning system. |
| |
| The brain is a matrix + a database. |
| The matrix stores word relationships (encoder). |
| The database stores metadata (confidence, links, sentences). |
| |
| Teaching grows the matrix. Querying searches it. |
| """ |
|
|
| def __init__(self, db_path: str = None): |
| """ |
| Create a brain. Optionally persist to disk. |
| |
| Args: |
| db_path: directory for SQLite storage. None = in-memory. |
| """ |
| self._words = [] |
| self._word_idx = {} |
| self._cooc = {} |
| self._matrix = None |
| self._db_path = db_path |
|
|
| |
| self.db = NeuronDB(path=db_path, dim=1) |
| self._word_neurons = self.db.load_word_mappings() |
|
|
| |
| self._templates = [] |
|
|
| |
| self._convergence = ConvergenceLoop( |
| self.db, max_hops=10, k=5, |
| convergence_threshold=0.99, |
| min_confidence=0.1, min_relevance=0.3, |
| ) |
| self._multi_hop = MultiHopConvergence( |
| self._convergence, max_rounds=3, concept_blend_weight=0.4, |
| ) |
|
|
| |
| self._nid_to_word_cache = None |
|
|
| |
| self._pool = ThreadPoolExecutor(max_workers=4) |
| self._batch_pool = ThreadPoolExecutor(max_workers=8) |
| self._lock = Lock() |
|
|
| |
| self._load_cooc() |
|
|
| |
|
|
| def teach(self, sentence: str, confidence: float = 0.5) -> list: |
| """ |
| Teach the brain a sentence. Returns list of neuron IDs created. |
| |
| "paris is the capital of france" → |
| 1. Learns words: paris, capital, france (skips function words) |
| 2. Records co-occurrence: paris↔capital, paris↔france, capital↔france |
| 3. Creates neurons in DB with successor links |
| 4. Records sentence association |
| 5. Extracts template: "[S0] is the [S1] of [S2]" |
| 6. Reindexes all vectors to current dimensions |
| """ |
| tokens = self._tokenize(sentence) |
| content = [t for t in tokens if t not in FUNCTION_WORDS] |
|
|
| if not content: |
| return [] |
|
|
| with self._lock: |
| |
| self.db.begin_batch() |
|
|
| |
| dim_before = len(self._words) |
| for word in content: |
| self._learn_word(word) |
| if len(content) >= 2: |
| self._learn_cooccurrence(content) |
|
|
| |
| neurons = [] |
| for word in content: |
| if word in self._word_neurons: |
| n = self.db.get(self._word_neurons[word]) |
| if n: |
| neurons.append(n) |
| continue |
| vec = self._encode_word(word) |
| if np.any(vec != 0): |
| n = self.db.insert(vec, confidence=confidence) |
| self._word_neurons[word] = n.id |
| self.db.save_word_mapping(word, n.id) |
| neurons.append(n) |
|
|
| |
| for i in range(len(neurons) - 1): |
| self.db.update_successors(neurons[i].id, neurons[i + 1].id, 0.8) |
| self.db.update_predecessors(neurons[i + 1].id, neurons[i].id) |
|
|
| |
| if len(neurons) >= 2: |
| self.db.record_sentence([n.id for n in neurons]) |
|
|
| |
| if len(tokens) >= 3: |
| self._extract_template(tokens) |
|
|
| |
| self.db.end_batch() |
|
|
| |
| if len(self._words) != dim_before: |
| self._reindex() |
|
|
| return [n.id for n in neurons] |
|
|
| def teach_batch(self, sentences: list, confidence: float = 0.5) -> list: |
| """Teach multiple sentences in parallel. Returns list of neuron ID lists.""" |
| futures = [self._batch_pool.submit(self.teach, s, confidence) for s in sentences] |
| return [f.result() for f in futures] |
|
|
| def correct(self, question: str, answer: str): |
| """Learn from a failure. Teach the answer, resolve the miss.""" |
| self.teach(answer, confidence=0.6) |
| self.db.resolve_miss_by_query(question, answer) |
|
|
| |
|
|
| def ask_batch(self, questions: list) -> list: |
| """Ask multiple questions in parallel. Returns list of result dicts. |
| Uses separate batch pool to avoid thread starvation (each ask() |
| uses the internal pool for its own parallelism).""" |
| futures = [self._batch_pool.submit(self.ask, q) for q in questions] |
| return [f.result() for f in futures] |
|
|
| def ask(self, question: str) -> dict: |
| """ |
| Ask the brain a question. Returns dict with answer + trace. |
| |
| Flow: |
| 1. Encode question → vector |
| 2. Multi-hop convergence: iteratively search the neuron DB, |
| blending discovered concepts back into the query each round. |
| This allows reasoning to cross concept boundaries. |
| 3. Sparse co-occurrence search as complementary signal |
| 4. Disambiguate via sentence table |
| 5. Output answer (template or concept list) |
| """ |
| tokens = self._tokenize(question) |
| content = [t for t in tokens if t not in FUNCTION_WORDS] |
| query_vec = self._encode_sentence(question) |
|
|
| if np.all(query_vec == 0) or len(self._words) == 0: |
| self.db.log_miss(question, query_vec if query_vec is not None |
| else np.zeros(1, dtype=np.float32)) |
| return {"answer": "I don't know.", "confidence": 0.0, |
| "strategy": "abstain", "trace": "No knowledge"} |
|
|
| content_indices = [self._word_idx[t] for t in content |
| if t in self._word_idx] |
| content_nids = [self._word_neurons[t] for t in content |
| if t in self._word_neurons] |
|
|
| if not content_indices: |
| self.db.log_miss(question, query_vec if query_vec is not None |
| else np.zeros(1, dtype=np.float32)) |
| return {"answer": "I don't know.", "confidence": 0.0, |
| "strategy": "abstain", "trace": "No known words in query"} |
|
|
| |
| |
| |
| |
| multi_hop_result = self._multi_hop.reason(query_vec) |
| convergence_trace = multi_hop_result.trace() |
|
|
| |
| if self._nid_to_word_cache is None: |
| self._nid_to_word_cache = {nid: w for w, nid in self._word_neurons.items() |
| if not w.startswith("__")} |
| nid_to_word = self._nid_to_word_cache |
| word_to_nid = {w: nid for nid, w in nid_to_word.items()} |
|
|
| |
| |
| |
| |
| |
| query_positions = {} |
| for i, t in enumerate(content): |
| nid = self._word_neurons.get(t) |
| if nid is not None: |
| query_positions[nid] = i |
|
|
| |
| nid_taught_positions = {} |
| if content_nids: |
| all_sentences = self.db.get_sentences_for_neurons(content_nids) |
| for sid, nid_pos_list in all_sentences.items(): |
| for nid, pos in nid_pos_list: |
| if nid not in nid_taught_positions: |
| nid_taught_positions[nid] = set() |
| nid_taught_positions[nid].add(pos) |
|
|
| def _position_bias(nid: int) -> float: |
| """Compute position similarity bias for a neuron. |
| |
| Returns a multiplier >= 1.0. Concepts that appeared at |
| positions similar to query word positions get boosted. |
| Max boost is 1.5 (50% increase for perfect position match). |
| """ |
| taught_pos = nid_taught_positions.get(nid) |
| if not taught_pos or not query_positions: |
| return 1.0 |
| |
| |
| total_sim = 0.0 |
| count = 0 |
| for q_nid, q_pos in query_positions.items(): |
| min_dist = min(abs(q_pos - tp) for tp in taught_pos) |
| |
| total_sim += 1.0 / (1.0 + min_dist) |
| count += 1 |
| if count == 0: |
| return 1.0 |
| avg_sim = total_sim / count |
| |
| return 1.0 + 0.5 * avg_sim |
|
|
| |
| concepts = [] |
| seen = set() |
| for n in multi_hop_result.concepts: |
| word = nid_to_word.get(n.id) |
| if word and n.id not in seen and word not in FUNCTION_WORDS: |
| concepts.append((n, word)) |
| seen.add(n.id) |
|
|
| |
| weights = [1.0 / (1.0 + 0.1 * i) for i in range(len(content_indices))] |
| query_cooc = self._sparse_blend(content_indices, weights) |
| search_results = self._sparse_search(query_cooc, k=10) |
|
|
| |
| if nid_taught_positions: |
| biased_results = [] |
| for word_idx, sim in search_results: |
| if word_idx < len(self._words): |
| word = self._words[word_idx] |
| nid = word_to_nid.get(word) |
| if nid: |
| biased_results.append((word_idx, sim * _position_bias(nid))) |
| else: |
| biased_results.append((word_idx, sim)) |
| else: |
| biased_results.append((word_idx, sim)) |
| biased_results.sort(key=lambda x: x[1], reverse=True) |
| search_results = biased_results |
|
|
| for word_idx, sim in search_results: |
| if word_idx < len(self._words): |
| word = self._words[word_idx] |
| nid = word_to_nid.get(word) |
| if nid and nid not in seen and word not in FUNCTION_WORDS: |
| n = self.db.get(nid) |
| if n: |
| concepts.append((n, word)) |
| seen.add(nid) |
|
|
| |
| best_sentence_nids = None |
| if content_nids: |
| sentences = self.db.get_sentences_for_neurons(content_nids) |
| if sentences: |
| scored = [(sid, len(nids)) for sid, nids in sentences.items()] |
| scored.sort(key=lambda x: x[1], reverse=True) |
| best_score = scored[0][1] |
| best_sentence_nids = set() |
| for sid, score in scored: |
| if score < best_score: |
| break |
| for nid, pos in self.db.get_sentence_neurons(sid): |
| best_sentence_nids.add(nid) |
|
|
| |
| if best_sentence_nids: |
| for nid in best_sentence_nids: |
| if nid not in seen: |
| n = self.db.get(nid) |
| word = nid_to_word.get(nid) |
| if n and word: |
| concepts.append((n, word)) |
| seen.add(nid) |
|
|
| if not concepts: |
| self.db.log_miss(question, query_vec) |
| return {"answer": "I don't know.", "confidence": 0.0, |
| "strategy": "abstain", "trace": "No relevant concepts"} |
|
|
| |
| if best_sentence_nids and len(content_nids) >= 2: |
| concepts = [(n, w) for n, w in concepts |
| if n.id in best_sentence_nids |
| or n.id in set(content_nids)] |
|
|
| |
| result = self._generate(concepts, query_vec, tokens, question) |
| result["convergence_trace"] = convergence_trace |
| result["converged"] = multi_hop_result.converged |
| result["convergence_rounds"] = len(multi_hop_result.rounds) |
| return result |
|
|
| |
|
|
| def generate(self, query: str, max_tokens: int = 30, temperature: float = 0.7, |
| min_score: float = 0.01) -> dict: |
| """ |
| Generate fluent text from the graph, steered by a query. |
| |
| Algorithm: |
| 1. Run ask() to find starting concepts via convergence |
| 2. Pick best starting word (highest confidence, not a query word) |
| 3. Walk the graph: score words by context_similarity × (1 + successor_confidence) |
| 4. Apply softmax with temperature, pick top word |
| 5. Update running context profile, loop detection, min-score stopping |
| |
| Returns dict with 'text', 'trace' list, 'tokens_generated'. |
| """ |
| import math |
|
|
| |
| if not self._words: |
| return {"text": "", "trace": ["No vocabulary"], "tokens_generated": 0} |
|
|
| |
| ask_result = self.ask(query) |
| query_tokens_set = set(self._tokenize(query)) |
|
|
| |
| nid_to_word = {nid: w for w, nid in self._word_neurons.items() |
| if not w.startswith("__")} |
|
|
| |
| start_word = None |
| start_score = -1.0 |
|
|
| |
| answer_words = self._tokenize(ask_result.get("answer", "")) |
| for w in answer_words: |
| if w in self._word_idx and w not in FUNCTION_WORDS and w not in query_tokens_set: |
| nid = self._word_neurons.get(w) |
| conf = 0.5 |
| if nid: |
| n = self.db.get(nid) |
| if n: |
| conf = n.confidence |
| if conf > start_score: |
| start_score = conf |
| start_word = w |
|
|
| |
| if start_word is None: |
| query_content = [t for t in self._tokenize(query) if t in self._word_idx] |
| if query_content: |
| q_indices = [self._word_idx[t] for t in query_content] |
| q_cooc = self._sparse_blend(q_indices) |
| search = self._sparse_search(q_cooc, k=20) |
| for widx, sim in search: |
| w = self._words[widx] |
| if w not in FUNCTION_WORDS and w not in query_tokens_set: |
| start_word = w |
| start_score = sim |
| break |
|
|
| if start_word is None: |
| return {"text": "", "trace": ["No starting word found"], "tokens_generated": 0} |
|
|
| |
| generated = [start_word] |
| trace = [{"token": start_word, "score": start_score, "reason": "start (best concept)"}] |
| recent_window = 6 |
|
|
| |
| start_idx = self._word_idx[start_word] |
| context_profile = dict(self._cooc.get(start_idx, {})) |
|
|
| for pos in range(1, max_tokens): |
| prev_word = generated[-1] |
| prev_idx = self._word_idx.get(prev_word) |
|
|
| |
| prev_successors = {} |
| if prev_word in self._word_neurons: |
| prev_nid = self._word_neurons[prev_word] |
| prev_neuron = self.db.get(prev_nid) |
| if prev_neuron and prev_neuron.successors: |
| for succ_nid, succ_conf in prev_neuron.successors: |
| succ_word = nid_to_word.get(succ_nid) |
| if succ_word: |
| prev_successors[succ_word] = succ_conf |
|
|
| |
| best_word = None |
| best_raw_score = -1.0 |
| scores = [] |
|
|
| for widx, word in enumerate(self._words): |
| if word in FUNCTION_WORDS: |
| continue |
|
|
| |
| word_cooc = self._cooc.get(widx, {}) |
| ctx_sim = self._sparse_cosine(word_cooc, context_profile) |
|
|
| |
| succ_conf = prev_successors.get(word, 0.0) |
| raw_score = ctx_sim * (1.0 + succ_conf) |
|
|
| if raw_score > 0: |
| scores.append((widx, word, raw_score)) |
|
|
| if not scores: |
| trace.append({"token": "[STOP]", "score": 0, "reason": "no candidates"}) |
| break |
|
|
| |
| scores.sort(key=lambda x: x[2], reverse=True) |
|
|
| |
| top_n = min(50, len(scores)) |
| top_scores = scores[:top_n] |
|
|
| if temperature > 0: |
| max_raw = top_scores[0][2] |
| exp_scores = [] |
| for widx, word, raw in top_scores: |
| exp_scores.append((widx, word, raw, math.exp((raw - max_raw) / temperature))) |
| total_exp = sum(e for _, _, _, e in exp_scores) |
| if total_exp > 0: |
| softmax_scores = [(widx, word, raw, e / total_exp) for widx, word, raw, e in exp_scores] |
| else: |
| softmax_scores = [(widx, word, raw, 0.0) for widx, word, raw, _ in exp_scores] |
| else: |
| |
| softmax_scores = [(top_scores[0][0], top_scores[0][1], top_scores[0][2], 1.0)] |
|
|
| |
| |
| chosen = None |
| recent_set = set(generated[-recent_window:]) if len(generated) >= recent_window else set(generated) |
|
|
| for widx, word, raw, prob in softmax_scores: |
| if word in recent_set: |
| continue |
| if raw < min_score: |
| continue |
| chosen = (widx, word, raw, prob) |
| break |
|
|
| if chosen is None: |
| trace.append({"token": "[STOP]", "score": 0, |
| "reason": "all candidates below threshold or in loop"}) |
| break |
|
|
| widx, word, raw, prob = chosen |
| generated.append(word) |
| trace.append({"token": word, "score": round(raw, 4), |
| "prob": round(prob, 4), |
| "reason": f"ctx_sim×(1+succ), prob={prob:.3f}"}) |
|
|
| |
| new_cooc = self._cooc.get(widx, {}) |
| |
| blend_weight = 0.3 |
| for k, v in new_cooc.items(): |
| context_profile[k] = context_profile.get(k, 0) * (1 - blend_weight) + v * blend_weight |
| |
| for k in list(context_profile.keys()): |
| if k not in new_cooc: |
| context_profile[k] *= (1 - blend_weight) |
|
|
| return { |
| "text": " ".join(generated), |
| "trace": trace, |
| "tokens_generated": len(generated), |
| } |
|
|
| |
|
|
| def _generate(self, concepts, query_vec, query_tokens, question) -> dict: |
| """Generate an answer from concepts. Template and chain race in parallel.""" |
| concept_neurons = [n for n, w in concepts] |
| concept_words = [w for n, w in concepts] |
| concept_ids = [n.id for n, w in concepts] |
|
|
| |
| template_future = self._pool.submit( |
| self._try_template, |
| concept_neurons, concept_words, concept_ids, query_vec, query_tokens |
| ) |
| chain_future = self._pool.submit( |
| self._try_sentence_chain, concept_ids, query_vec |
| ) |
|
|
| |
| template_result = template_future.result() |
| chain_result = chain_future.result() |
|
|
| if template_result: |
| return template_result |
| if chain_result: |
| return chain_result |
|
|
| |
| avg_conf = float(np.mean([n.confidence for n in concept_neurons])) |
| return { |
| "answer": " ".join(concept_words), |
| "confidence": avg_conf, |
| "strategy": "concept_list", |
| "trace": f"Concepts: {concept_words}", |
| } |
|
|
| def _try_template(self, neurons, words, nids, query_vec, query_tokens) -> dict: |
| """Try to fill a template with concepts.""" |
| if not self._templates: |
| return None |
|
|
| query_word_set = set(query_tokens) |
|
|
| |
| best_template = None |
| best_score = 0 |
| for pattern, slots, tvec in self._templates: |
| struct_words = [w for w in re.findall(r'[a-z]+', pattern.lower()) |
| if w not in [s.lower() for s in slots]] |
| overlap = sum(1 for w in struct_words if w in query_word_set) |
| if overlap > best_score: |
| best_score = overlap |
| best_template = (pattern, slots) |
|
|
| if not best_template or best_score == 0: |
| return None |
|
|
| pattern, slots = best_template |
|
|
| |
| sentence_order = self._get_sentence_order(nids) |
| if sentence_order: |
| ordered = sorted(zip(neurons, words), |
| key=lambda p: sentence_order.get(p[0].id, 999)) |
| else: |
| ordered = list(zip(neurons, words)) |
|
|
| |
| content_words = [w for n, w in ordered |
| if w.lower() not in STRUCTURAL_WORDS] |
| fills = {} |
| available = list(content_words) |
| for slot_name in slots: |
| if available: |
| fills[slot_name] = available.pop(0) |
|
|
| if not fills: |
| return None |
|
|
| text = pattern |
| for name, value in fills.items(): |
| text = text.replace(f"[{name}]", value) |
| |
| for name in slots: |
| if name not in fills: |
| text = text.replace(f"[{name}]", "...") |
|
|
| |
| answer_vec = self._encode_sentence(text) |
| q_norm = np.linalg.norm(query_vec) |
| a_norm = np.linalg.norm(answer_vec) |
| if q_norm > 0 and a_norm > 0: |
| convergence = float(np.dot(query_vec, answer_vec) / (q_norm * a_norm)) |
| else: |
| convergence = 0.0 |
| if convergence <= 0: |
| return None |
|
|
| return { |
| "answer": text, |
| "confidence": convergence, |
| "strategy": "template", |
| "trace": f"Template: {pattern}, fills: {fills}, convergence={convergence:.3f}", |
| } |
|
|
| def _try_sentence_chain(self, concept_ids, query_vec) -> dict: |
| """Find the best matching taught sentence, output in word order.""" |
| sentences = self.db.get_sentences_for_neurons(concept_ids) |
| if not sentences: |
| return None |
|
|
| nid_to_word = {nid: w for w, nid in self._word_neurons.items() |
| if not w.startswith("__")} |
|
|
| scored = [] |
| for sid, matched in sentences.items(): |
| sent_neurons = self.db.get_sentence_neurons(sid) |
| if not sent_neurons: |
| continue |
| score = len(matched) |
| scored.append((sid, score, sent_neurons)) |
|
|
| if not scored: |
| return None |
|
|
| scored.sort(key=lambda x: x[1], reverse=True) |
| sid, score, sent_neurons = scored[0] |
|
|
| |
| ordered = sorted(sent_neurons, key=lambda x: x[1]) |
| words = [nid_to_word.get(nid, "") for nid, pos in ordered] |
| words = [w for w in words if w] |
|
|
| if len(words) < 2: |
| return None |
|
|
| |
| answer_indices = [self._word_idx[w] for w in words if w in self._word_idx] |
| if not answer_indices: |
| return None |
| answer_cooc = self._sparse_blend(answer_indices) |
| |
| nid_to_word = self._nid_to_word_cache or {nid: w for w, nid in self._word_neurons.items() |
| if not w.startswith("__")} |
| query_words = [nid_to_word.get(nid) for nid in concept_ids] |
| query_indices = [self._word_idx[w] for w in query_words |
| if w and w in self._word_idx] |
| if not query_indices: |
| query_indices = answer_indices |
| query_cooc = self._sparse_blend(query_indices) |
| convergence = self._sparse_cosine(query_cooc, answer_cooc) |
|
|
| if convergence <= 0: |
| return None |
|
|
| return { |
| "answer": " ".join(words), |
| "confidence": convergence, |
| "strategy": "sentence_chain", |
| "trace": f"Sentence {sid} (convergence={convergence:.3f}): {words}", |
| } |
|
|
| def _get_sentence_order(self, concept_ids) -> dict: |
| """Get taught position order for concepts.""" |
| if len(concept_ids) < 2: |
| return {} |
| sentences = self.db.get_sentences_for_neurons(concept_ids) |
| if not sentences: |
| return {} |
| best_sid = max(sentences, key=lambda sid: len(sentences[sid])) |
| if len(sentences[best_sid]) < 2: |
| return {} |
| return {nid: pos for nid, pos in self.db.get_sentence_neurons(best_sid)} |
|
|
| |
|
|
| def _learn_word(self, word: str) -> int: |
| """Add a word. Returns its index.""" |
| word = word.lower().strip() |
| if word in self._word_idx: |
| return self._word_idx[word] |
|
|
| idx = len(self._words) |
| self._words.append(word) |
| self._word_idx[word] = idx |
| self._cooc[idx] = {idx: 1.0} |
| return idx |
|
|
| def _load_cooc(self): |
| """Load co-occurrence from the cooccurrence table, or rebuild from neuron vectors.""" |
| |
| words_in_db = sorted( |
| [(w, nid) for w, nid in self._word_neurons.items() |
| if not w.startswith("__")], |
| key=lambda x: x[1] |
| ) |
| for word, nid in words_in_db: |
| if word not in self._word_idx: |
| idx = len(self._words) |
| self._words.append(word) |
| self._word_idx[word] = idx |
| self._cooc[idx] = {idx: 1.0} |
|
|
| if not words_in_db: |
| return |
|
|
| |
| try: |
| rows = self.db.db.execute( |
| "SELECT word_a, word_b, weight FROM cooccurrence" |
| ).fetchall() |
| if rows: |
| for a, b, w in rows: |
| if a not in self._cooc: |
| self._cooc[a] = {a: 1.0} |
| if b not in self._cooc: |
| self._cooc[b] = {b: 1.0} |
| self._cooc[a][b] = w |
| self._cooc[b][a] = w |
| self._rebuild_search_matrix() |
| return |
| except Exception: |
| pass |
|
|
| |
| self.db.db.execute(""" |
| CREATE TABLE IF NOT EXISTS cooccurrence ( |
| word_a INTEGER NOT NULL, |
| word_b INTEGER NOT NULL, |
| weight REAL NOT NULL DEFAULT 0.0, |
| PRIMARY KEY (word_a, word_b) |
| ) |
| """) |
| self.db.db.execute( |
| "CREATE INDEX IF NOT EXISTS idx_cooc_a ON cooccurrence(word_a)") |
| self.db.db.commit() |
|
|
| |
| n = len(self._words) |
| nid_to_idx = {nid: self._word_idx[w] |
| for w, nid in words_in_db if w in self._word_idx} |
|
|
| for nid, vec_bytes in self.db.db.execute("SELECT id, vector FROM neurons"): |
| idx = nid_to_idx.get(nid) |
| if idx is None: |
| continue |
| vec = np.frombuffer(vec_bytes, dtype=np.float32) |
| for j, val in enumerate(vec): |
| if j < n and val > 0 and j != idx: |
| if idx not in self._cooc: |
| self._cooc[idx] = {idx: 1.0} |
| self._cooc[idx][j] = float(val) |
|
|
| |
| self._save_cooc() |
| self._rebuild_search_matrix() |
|
|
| def _save_cooc(self): |
| """Persist co-occurrence dict to SQLite.""" |
| self.db.db.execute(""" |
| CREATE TABLE IF NOT EXISTS cooccurrence ( |
| word_a INTEGER NOT NULL, |
| word_b INTEGER NOT NULL, |
| weight REAL NOT NULL DEFAULT 0.0, |
| PRIMARY KEY (word_a, word_b) |
| ) |
| """) |
| pairs = [] |
| for a, neighbors in self._cooc.items(): |
| for b, w in neighbors.items(): |
| if a != b and w > 0: |
| pairs.append((a, b, w)) |
| self.db.db.execute("DELETE FROM cooccurrence") |
| self.db.db.executemany( |
| "INSERT INTO cooccurrence (word_a, word_b, weight) VALUES (?, ?, ?)", |
| pairs) |
| self.db.db.commit() |
|
|
| def _save_matrix(self): |
| """Persist co-occurrence to DB.""" |
| self._save_cooc() |
|
|
| def _learn_cooccurrence(self, words: list): |
| """Strengthen connections between co-occurring words.""" |
| indices = [self._word_idx[w.lower().strip()] for w in words |
| if w.lower().strip() in self._word_idx] |
| for i in range(len(indices)): |
| for j in range(i + 1, len(indices)): |
| a, b = indices[i], indices[j] |
| if a not in self._cooc: |
| self._cooc[a] = {a: 1.0} |
| if b not in self._cooc: |
| self._cooc[b] = {b: 1.0} |
| self._cooc[a][b] = self._cooc[a].get(b, 0) + COOCCURRENCE_PULL |
| self._cooc[b][a] = self._cooc[b].get(a, 0) + COOCCURRENCE_PULL |
|
|
| |
|
|
| def _get_cooc(self, word: str) -> dict: |
| """Get a word's co-occurrence dict. Sparse. O(K) where K = connections.""" |
| word = word.lower().strip() |
| idx = self._word_idx.get(word) |
| if idx is None: |
| return {} |
| return self._cooc.get(idx, {}) |
|
|
| def _sparse_norm(self, d: dict) -> float: |
| """L2 norm of a sparse dict.""" |
| return sum(v * v for v in d.values()) ** 0.5 |
|
|
| def _sparse_cosine(self, a: dict, b: dict) -> float: |
| """Cosine similarity between two sparse dicts. O(min(|a|, |b|)).""" |
| if not a or not b: |
| return 0.0 |
| |
| if len(a) > len(b): |
| a, b = b, a |
| dot = sum(v * b.get(k, 0) for k, v in a.items()) |
| if dot == 0: |
| return 0.0 |
| na = self._sparse_norm(a) |
| nb = self._sparse_norm(b) |
| if na == 0 or nb == 0: |
| return 0.0 |
| return dot / (na * nb) |
|
|
| def _sparse_blend(self, word_indices: list, weights: list = None) -> dict: |
| """Blend multiple words' co-occurrence dicts. Weighted average.""" |
| result = {} |
| if weights is None: |
| weights = [1.0] * len(word_indices) |
| total_w = sum(weights) |
| if total_w == 0: |
| return result |
| for idx, w in zip(word_indices, weights): |
| for k, v in self._cooc.get(idx, {}).items(): |
| result[k] = result.get(k, 0) + w * v |
| |
| for k in result: |
| result[k] /= total_w |
| return result |
|
|
| def _sparse_search(self, query_cooc: dict, k: int = 5) -> list: |
| """Search all words by sparse cosine with query. O(N × K).""" |
| if not query_cooc: |
| return [] |
| scores = [] |
| q_norm = self._sparse_norm(query_cooc) |
| if q_norm == 0: |
| return [] |
| for word_idx, word_cooc in self._cooc.items(): |
| if not word_cooc: |
| continue |
| dot = sum(query_cooc.get(j, 0) * v for j, v in word_cooc.items()) |
| if dot > 0: |
| w_norm = self._sparse_norm(word_cooc) |
| if w_norm > 0: |
| sim = dot / (q_norm * w_norm) |
| scores.append((word_idx, sim)) |
| scores.sort(key=lambda x: x[1], reverse=True) |
| return scores[:k] |
|
|
| def _encode_word(self, word: str) -> np.ndarray: |
| """Get a word's vector. Dense form for backward compat.""" |
| word = word.lower().strip() |
| n = len(self._words) |
| idx = self._word_idx.get(word) |
| if idx is None or n == 0: |
| return np.zeros(n or 1, dtype=np.float32) |
| vec = np.zeros(n, dtype=np.float32) |
| for j, w in self._cooc.get(idx, {}).items(): |
| if j < n: |
| vec[j] = w |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def _encode_sentence(self, text: str) -> np.ndarray: |
| """Encode a sentence. Dense form for backward compat.""" |
| tokens = self._tokenize(text) |
| if not tokens: |
| d = len(self._words) or 1 |
| return np.zeros(d, dtype=np.float32) |
|
|
| vectors = [] |
| weights = [] |
| for i, token in enumerate(tokens): |
| vec = self._encode_word(token) |
| if np.any(vec != 0): |
| vectors.append(vec) |
| weights.append(1.0 / (1.0 + 0.1 * i)) |
|
|
| if not vectors: |
| d = len(self._words) or 1 |
| return np.zeros(d, dtype=np.float32) |
|
|
| vectors = np.array(vectors) |
| weights = np.array(weights, dtype=np.float32) |
| weights = weights / weights.sum() |
| result = np.average(vectors, axis=0, weights=weights).astype(np.float32) |
| norm = np.linalg.norm(result) |
| if norm > 0: |
| result = result / norm |
| return result |
|
|
| def _reindex(self): |
| """Re-encode all neurons to current dimensions.""" |
| for word, nid in self._word_neurons.items(): |
| if word.startswith("__"): |
| continue |
| vec = self._encode_word(word) |
| if np.any(vec != 0): |
| |
| row = self.db._id_to_row.get(nid) |
| if row is not None and self.db._vectors is not None: |
| if vec.shape[0] != self.db._vectors.shape[1]: |
| |
| self._rebuild_search_matrix() |
| return |
| self.db._vectors[row] = vec |
| self.db.db.execute( |
| "UPDATE neurons SET vector = ? WHERE id = ?", |
| (vec.tobytes(), nid) |
| ) |
| self.db.db.commit() |
|
|
| |
| for i, (pattern, slots, old_vec) in enumerate(self._templates): |
| text = re.sub(r'\[[A-Z_0-9]+\]', '', pattern).strip() |
| vec = self._encode_sentence(text) |
| self._templates[i] = (pattern, slots, vec) |
|
|
| def _rebuild_search_matrix(self): |
| """No-op. Sparse search uses _cooc dict directly. No matrix needed.""" |
| pass |
|
|
| |
|
|
| def _extract_template(self, tokens: list): |
| """Auto-extract a template from a sentence.""" |
| structural = set() |
| content = [] |
| for i, token in enumerate(tokens): |
| if token in STRUCTURAL_WORDS: |
| structural.add(i) |
| else: |
| content.append(i) |
|
|
| if not content or not structural: |
| return |
|
|
| pattern_parts = [] |
| slots = {} |
| slot_idx = 0 |
| for i, token in enumerate(tokens): |
| if i in structural: |
| pattern_parts.append(token) |
| else: |
| name = f"S{slot_idx}" |
| pattern_parts.append(f"[{name}]") |
| slots[name] = "noun" |
| slot_idx += 1 |
|
|
| pattern = " ".join(pattern_parts) |
|
|
| |
| for p, s, v in self._templates: |
| if p == pattern: |
| return |
|
|
| text = " ".join(t for t in tokens if t in STRUCTURAL_WORDS) |
| vec = self._encode_sentence(text) if text.strip() else np.zeros( |
| len(self._words) or 1, dtype=np.float32 |
| ) |
| self._templates.append((pattern, slots, vec)) |
|
|
| |
|
|
| @staticmethod |
| def _tokenize(text: str) -> list: |
| return re.findall(r'[a-z0-9]+', text.lower()) |
|
|
| def inspect(self, word: str) -> dict: |
| """Show what the brain knows about a word.""" |
| word = word.lower().strip() |
| if word not in self._word_idx: |
| return {"word": word, "known": False} |
|
|
| idx = self._word_idx[word] |
|
|
| |
| connections = [] |
| for j, val in self._cooc.get(idx, {}).items(): |
| if j != idx and j < len(self._words): |
| connections.append((self._words[j], float(val))) |
| connections.sort(key=lambda x: x[1], reverse=True) |
|
|
| nid = self._word_neurons.get(word) |
| neuron = self.db.get(nid) if nid else None |
|
|
| return { |
| "word": word, |
| "known": True, |
| "dimension": idx, |
| "connections": connections[:10], |
| "confidence": neuron.confidence if neuron else None, |
| "successors": len(neuron.successors) if neuron else 0, |
| } |
|
|
| def stats(self) -> dict: |
| return { |
| "words": len(self._words), |
| "dimensions": len(self._words), |
| "neurons": self.db.count(), |
| "templates": len(self._templates), |
| "matrix_size": f"{len(self._words)}x{len(self._words)}", |
| } |
|
|
| def health(self) -> dict: |
| """ |
| Self-awareness: how much resource am I using? |
| Returns CPU, memory, DB size, matrix size, disk free. |
| The brain should know its own cost and not exploit the machine. |
| """ |
| h = self.db.health() |
|
|
| |
| |
| cooc_entries = sum(len(v) for v in self._cooc.values()) |
| h["brain_matrix_mb"] = round(cooc_entries * 50 / (1024 * 1024), 2) |
| h["cooc_entries"] = cooc_entries |
| h["words"] = len(self._words) |
| h["templates"] = len(self._templates) |
|
|
| |
| h["memory_pressure"] = h["rss_mb"] > 512 |
| h["disk_pressure"] = h["disk_free_gb"] < 1.0 |
| h["matrix_pressure"] = len(self._words) > 10000 |
|
|
| return h |
|
|
| def close(self): |
| self._pool.shutdown(wait=False) |
| self._batch_pool.shutdown(wait=False) |
| self._save_matrix() |
| self.db.close() |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| brain = BrainCore() |
|
|
| print("Brain — self-growing reasoning system") |
| print("Commands: teach <sentence>, ask <question>, generate <query>, inspect <word>, stats, quit") |
| print() |
|
|
| while True: |
| try: |
| line = input("> ").strip() |
| except (EOFError, KeyboardInterrupt): |
| break |
|
|
| if not line: |
| continue |
|
|
| parts = line.split(None, 1) |
| cmd = parts[0].lower() |
| arg = parts[1] if len(parts) > 1 else "" |
|
|
| if cmd in ("quit", "exit"): |
| break |
| elif cmd == "teach": |
| ids = brain.teach(arg) |
| print(f"Learned {len(ids)} concepts. Dimensions: {len(brain._words)}") |
| elif cmd == "ask": |
| result = brain.ask(arg) |
| print(f"A: {result['answer']}") |
| print(f" [{result['strategy']}, conf={result['confidence']:.3f}]") |
| elif cmd == "generate": |
| result = brain.generate(arg) |
| print(f"Generated: {result['text']}") |
| print(f" [{result['tokens_generated']} tokens]") |
| for step in result['trace']: |
| print(f" {step['token']}: score={step.get('score', '?')}" |
| f" ({step.get('reason', '')})") |
| elif cmd == "inspect": |
| info = brain.inspect(arg) |
| if info["known"]: |
| print(f" dim={info['dimension']}, conf={info['confidence']}") |
| for w, v in info["connections"]: |
| print(f" → {w}: {v:.3f}") |
| else: |
| print(f" Unknown word: {arg}") |
| elif cmd == "stats": |
| for k, v in brain.stats().items(): |
| print(f" {k}: {v}") |
| else: |
| print(f"Unknown: {cmd}") |
|
|
| brain.close() |
| print("Done.") |
|
|