FerrellSyntheticIntelligence
Add deep cognition layer, curriculum runner, evaluation probe, updated loop and benchmark
653b8c1 | """ | |
| AbstractReasoner — Vitalis FSI | |
| Reasons about RELATIONSHIPS between concepts. | |
| Not pattern matching. Not retrieval. | |
| Genuine relational reasoning: | |
| - Analogy: A is to B as C is to ? | |
| - Composition: concept_A + concept_B = novel_concept | |
| - Inversion: what is the opposite of this concept? | |
| - Transitivity: if A relates to B and B relates to C, what does A relate to C? | |
| Built entirely on HDC operations. No external models. | |
| """ | |
| import numpy as np | |
| import os | |
| import json | |
| import time | |
| from vitalis_ide.math_core.kernel import VitalisKernel | |
| from src.cognition.abstraction import AbstractionEngine | |
| from src.hippocampus import Hippocampus | |
| class AbstractReasoner: | |
| ANALOGY_THRESHOLD = 0.25 | |
| COMPOSITION_DECAY = 0.85 | |
| INVERSION_SHIFT = 5000 | |
| def __init__(self): | |
| self.kernel = VitalisKernel() | |
| self.abstraction = AbstractionEngine() | |
| self.hippocampus = Hippocampus() | |
| self.path = os.path.expanduser( | |
| "~/.vitalis_workspace/reasoning_log.json" | |
| ) | |
| self._log = self._load_log() | |
| def _load_log(self) -> list: | |
| if os.path.exists(self.path): | |
| with open(self.path) as f: | |
| return json.load(f) | |
| return [] | |
| def _save_log(self): | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| with open(self.path, "w") as f: | |
| json.dump(self._log[-500:], f, indent=2) | |
| # ------------------------------------------------------------------ | |
| # Core HDC reasoning operations | |
| # ------------------------------------------------------------------ | |
| def _bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| """Bipolar binding: element-wise multiply.""" | |
| return (a.astype(np.int32) * b.astype(np.int32)).astype(np.int8) | |
| def _bundle(self, vecs: list) -> np.ndarray: | |
| """Bipolar bundling: sum then sign.""" | |
| stacked = np.stack(vecs).astype(np.int32).sum(axis=0) | |
| result = np.sign(stacked).astype(np.int8) | |
| result[result == 0] = 1 | |
| return result | |
| def _invert(self, vec: np.ndarray) -> np.ndarray: | |
| """ | |
| Semantic inversion: cyclic shift by half the vector length. | |
| Produces a vector maximally dissimilar to the input. | |
| """ | |
| return np.roll(vec, self.INVERSION_SHIFT) | |
| # ------------------------------------------------------------------ | |
| # Analogy: A is to B as C is to ? | |
| # ------------------------------------------------------------------ | |
| def analogy( | |
| self, | |
| concept_a: str, | |
| concept_b: str, | |
| concept_c: str, | |
| ) -> dict: | |
| """ | |
| Solves: A:B :: C:? | |
| HDC method: ? = bind(bind(A, B), C) | |
| Searches abstraction space and hippocampus for closest match. | |
| """ | |
| vec_a = self.kernel.vectorize_tokens(concept_a.split(), positional=False) | |
| vec_b = self.kernel.vectorize_tokens(concept_b.split(), positional=False) | |
| vec_c = self.kernel.vectorize_tokens(concept_c.split(), positional=False) | |
| # ? = B * A^-1 * C (HDC analogy formula) | |
| a_inv = self._bind(vec_a, vec_a) # A bound with itself = identity-like | |
| relation = self._bind(vec_a, vec_b) # encode A→B relationship | |
| answer_vec = self._bind(relation, vec_c) # apply relation to C | |
| # Search for closest concept | |
| candidates = self.abstraction.query_abstractions(answer_vec, top_k=3) | |
| hipp_results = self.hippocampus.similarity_search(answer_vec, top_k=3) | |
| best_match = None | |
| best_score = -1.0 | |
| for score, name, _ in candidates: | |
| if score > best_score: | |
| best_score = score | |
| best_match = name | |
| result = { | |
| "type": "analogy", | |
| "query": f"{concept_a}:{concept_b}::{concept_c}:?", | |
| "answer_vec": answer_vec, | |
| "best_match": best_match, | |
| "confidence": round(float(best_score), 4), | |
| "candidates": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "timestamp": time.time(), | |
| } | |
| self._log.append({k: v for k, v in result.items() if k != "answer_vec"}) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Composition: merge two concepts into a novel one | |
| # ------------------------------------------------------------------ | |
| def compose(self, concept_a: str, concept_b: str) -> dict: | |
| """ | |
| Compose two concepts into a novel concept vector. | |
| The result occupies a position in the space between both inputs. | |
| Weighted by the COMPOSITION_DECAY to prevent drift. | |
| """ | |
| vec_a = self.kernel.vectorize_tokens(concept_a.split(), positional=False) | |
| vec_b = self.kernel.vectorize_tokens(concept_b.split(), positional=False) | |
| # Bundle with decay weighting | |
| composed = self._bundle([vec_a, vec_b]) | |
| # Apply composition decay — prevents the result from being | |
| # too close to either parent | |
| noise_mask = np.random.choice( | |
| [-1, 1], | |
| size=self.kernel.dim, | |
| p=[1 - self.COMPOSITION_DECAY, self.COMPOSITION_DECAY] | |
| ).astype(np.int8) | |
| composed = self._bind(composed, noise_mask) | |
| # Search for nearest existing concept | |
| candidates = self.abstraction.query_abstractions(composed, top_k=3) | |
| result = { | |
| "type": "composition", | |
| "inputs": [concept_a, concept_b], | |
| "novel_vec": composed, | |
| "nearest": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "novelty": round(1.0 - (candidates[0][0] if candidates else 0.0), 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append({k: v for k, v in result.items() if k != "novel_vec"}) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Inversion: what is the conceptual opposite? | |
| # ------------------------------------------------------------------ | |
| def invert(self, concept: str) -> dict: | |
| """ | |
| Find the conceptual opposite of a concept. | |
| Uses cyclic shift inversion then searches concept space. | |
| """ | |
| vec = self.kernel.vectorize_tokens(concept.split(), positional=False) | |
| inverted = self._invert(vec) | |
| candidates = self.abstraction.query_abstractions(inverted, top_k=3) | |
| hipp_results = self.hippocampus.similarity_search(inverted, top_k=3) | |
| result = { | |
| "type": "inversion", | |
| "concept": concept, | |
| "opposites": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "confidence": round(float(candidates[0][0]) if candidates else 0.0, 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append(result) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Transitivity: if A→B and B→C, what is A→C? | |
| # ------------------------------------------------------------------ | |
| def transitive_chain(self, concepts: list) -> dict: | |
| """ | |
| Chain reasoning: given [A, B, C, D...], | |
| derive the relationship between A and the last element. | |
| Each step binds the accumulated relationship with the next concept. | |
| """ | |
| if len(concepts) < 2: | |
| return {"error": "Need at least 2 concepts"} | |
| vecs = [ | |
| self.kernel.vectorize_tokens(c.split(), positional=False) | |
| for c in concepts | |
| ] | |
| # Accumulate relationship via sequential binding | |
| accumulated = vecs[0].copy() | |
| for i in range(1, len(vecs)): | |
| accumulated = self._bind(accumulated, vecs[i]) | |
| # Apply position-aware permutation at each step | |
| accumulated = np.roll(accumulated, i * 100) | |
| candidates = self.abstraction.query_abstractions(accumulated, top_k=3) | |
| result = { | |
| "type": "transitivity", | |
| "chain": concepts, | |
| "conclusion": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "confidence": round(float(candidates[0][0]) if candidates else 0.0, 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append(result) | |
| self._save_log() | |
| return result | |
| def report(self) -> dict: | |
| if not self._log: | |
| return {"status": "No reasoning performed yet"} | |
| type_counts = {} | |
| for entry in self._log: | |
| t = entry.get("type", "unknown") | |
| type_counts[t] = type_counts.get(t, 0) + 1 | |
| return { | |
| "total_reasoning_ops": len(self._log), | |
| "by_type": type_counts, | |
| "recent": self._log[-3:], | |
| } | |
| """ | |
| AbstractReasoner — Vitalis FSI | |
| Reasons about RELATIONSHIPS between concepts. | |
| Not pattern matching. Not retrieval. | |
| Genuine relational reasoning: | |
| - Analogy: A is to B as C is to ? | |
| - Composition: concept_A + concept_B = novel_concept | |
| - Inversion: what is the opposite of this concept? | |
| - Transitivity: if A relates to B and B relates to C, what does A relate to C? | |
| Built entirely on HDC operations. No external models. | |
| """ | |
| import numpy as np | |
| import os | |
| import json | |
| import time | |
| from vitalis_ide.math_core.kernel import VitalisKernel | |
| from src.cognition.abstraction import AbstractionEngine | |
| from src.hippocampus import Hippocampus | |
| class AbstractReasoner: | |
| ANALOGY_THRESHOLD = 0.25 | |
| COMPOSITION_DECAY = 0.85 | |
| INVERSION_SHIFT = 5000 | |
| def __init__(self): | |
| self.kernel = VitalisKernel() | |
| self.abstraction = AbstractionEngine() | |
| self.hippocampus = Hippocampus() | |
| self.path = os.path.expanduser( | |
| "~/.vitalis_workspace/reasoning_log.json" | |
| ) | |
| self._log = self._load_log() | |
| def _load_log(self) -> list: | |
| if os.path.exists(self.path): | |
| with open(self.path) as f: | |
| return json.load(f) | |
| return [] | |
| def _save_log(self): | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| with open(self.path, "w") as f: | |
| json.dump(self._log[-500:], f, indent=2) | |
| # ------------------------------------------------------------------ | |
| # Core HDC reasoning operations | |
| # ------------------------------------------------------------------ | |
| def _bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| """Bipolar binding: element-wise multiply.""" | |
| return (a.astype(np.int32) * b.astype(np.int32)).astype(np.int8) | |
| def _bundle(self, vecs: list) -> np.ndarray: | |
| """Bipolar bundling: sum then sign.""" | |
| stacked = np.stack(vecs).astype(np.int32).sum(axis=0) | |
| result = np.sign(stacked).astype(np.int8) | |
| result[result == 0] = 1 | |
| return result | |
| def _invert(self, vec: np.ndarray) -> np.ndarray: | |
| """ | |
| Semantic inversion: cyclic shift by half the vector length. | |
| Produces a vector maximally dissimilar to the input. | |
| """ | |
| return np.roll(vec, self.INVERSION_SHIFT) | |
| # ------------------------------------------------------------------ | |
| # Analogy: A is to B as C is to ? | |
| # ------------------------------------------------------------------ | |
| def analogy( | |
| self, | |
| concept_a: str, | |
| concept_b: str, | |
| concept_c: str, | |
| ) -> dict: | |
| """ | |
| Solves: A:B :: C:? | |
| HDC method: ? = bind(bind(A, B), C) | |
| Searches abstraction space and hippocampus for closest match. | |
| """ | |
| vec_a = self.kernel.vectorize_tokens(concept_a.split(), positional=False) | |
| vec_b = self.kernel.vectorize_tokens(concept_b.split(), positional=False) | |
| vec_c = self.kernel.vectorize_tokens(concept_c.split(), positional=False) | |
| # ? = B * A^-1 * C (HDC analogy formula) | |
| a_inv = self._bind(vec_a, vec_a) # A bound with itself = identity-like | |
| relation = self._bind(vec_a, vec_b) # encode A→B relationship | |
| answer_vec = self._bind(relation, vec_c) # apply relation to C | |
| # Search for closest concept | |
| candidates = self.abstraction.query_abstractions(answer_vec, top_k=3) | |
| hipp_results = self.hippocampus.similarity_search(answer_vec, top_k=3) | |
| best_match = None | |
| best_score = -1.0 | |
| for score, name, _ in candidates: | |
| if score > best_score: | |
| best_score = score | |
| best_match = name | |
| result = { | |
| "type": "analogy", | |
| "query": f"{concept_a}:{concept_b}::{concept_c}:?", | |
| "answer_vec": answer_vec, | |
| "best_match": best_match, | |
| "confidence": round(float(best_score), 4), | |
| "candidates": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "timestamp": time.time(), | |
| } | |
| self._log.append({k: v for k, v in result.items() if k != "answer_vec"}) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Composition: merge two concepts into a novel one | |
| # ------------------------------------------------------------------ | |
| def compose(self, concept_a: str, concept_b: str) -> dict: | |
| """ | |
| Compose two concepts into a novel concept vector. | |
| The result occupies a position in the space between both inputs. | |
| Weighted by the COMPOSITION_DECAY to prevent drift. | |
| """ | |
| vec_a = self.kernel.vectorize_tokens(concept_a.split(), positional=False) | |
| vec_b = self.kernel.vectorize_tokens(concept_b.split(), positional=False) | |
| # Bundle with decay weighting | |
| composed = self._bundle([vec_a, vec_b]) | |
| # Apply composition decay — prevents the result from being | |
| # too close to either parent | |
| noise_mask = np.random.choice( | |
| [-1, 1], | |
| size=self.kernel.dim, | |
| p=[1 - self.COMPOSITION_DECAY, self.COMPOSITION_DECAY] | |
| ).astype(np.int8) | |
| composed = self._bind(composed, noise_mask) | |
| # Search for nearest existing concept | |
| candidates = self.abstraction.query_abstractions(composed, top_k=3) | |
| result = { | |
| "type": "composition", | |
| "inputs": [concept_a, concept_b], | |
| "novel_vec": composed, | |
| "nearest": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "novelty": round(1.0 - (candidates[0][0] if candidates else 0.0), 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append({k: v for k, v in result.items() if k != "novel_vec"}) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Inversion: what is the conceptual opposite? | |
| # ------------------------------------------------------------------ | |
| def invert(self, concept: str) -> dict: | |
| """ | |
| Find the conceptual opposite of a concept. | |
| Uses cyclic shift inversion then searches concept space. | |
| """ | |
| vec = self.kernel.vectorize_tokens(concept.split(), positional=False) | |
| inverted = self._invert(vec) | |
| candidates = self.abstraction.query_abstractions(inverted, top_k=3) | |
| hipp_results = self.hippocampus.similarity_search(inverted, top_k=3) | |
| result = { | |
| "type": "inversion", | |
| "concept": concept, | |
| "opposites": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "confidence": round(float(candidates[0][0]) if candidates else 0.0, 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append(result) | |
| self._save_log() | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Transitivity: if A→B and B→C, what is A→C? | |
| # ------------------------------------------------------------------ | |
| def transitive_chain(self, concepts: list) -> dict: | |
| """ | |
| Chain reasoning: given [A, B, C, D...], | |
| derive the relationship between A and the last element. | |
| Each step binds the accumulated relationship with the next concept. | |
| """ | |
| if len(concepts) < 2: | |
| return {"error": "Need at least 2 concepts"} | |
| vecs = [ | |
| self.kernel.vectorize_tokens(c.split(), positional=False) | |
| for c in concepts | |
| ] | |
| # Accumulate relationship via sequential binding | |
| accumulated = vecs[0].copy() | |
| for i in range(1, len(vecs)): | |
| accumulated = self._bind(accumulated, vecs[i]) | |
| # Apply position-aware permutation at each step | |
| accumulated = np.roll(accumulated, i * 100) | |
| candidates = self.abstraction.query_abstractions(accumulated, top_k=3) | |
| result = { | |
| "type": "transitivity", | |
| "chain": concepts, | |
| "conclusion": [(name, round(float(s), 4)) for s, name, _ in candidates], | |
| "confidence": round(float(candidates[0][0]) if candidates else 0.0, 4), | |
| "timestamp": time.time(), | |
| } | |
| self._log.append(result) | |
| self._save_log() | |
| return result | |
| def report(self) -> dict: | |
| if not self._log: | |
| return {"status": "No reasoning performed yet"} | |
| type_counts = {} | |
| for entry in self._log: | |
| t = entry.get("type", "unknown") | |
| type_counts[t] = type_counts.get(t, 0) + 1 | |
| return { | |
| "total_reasoning_ops": len(self._log), | |
| "by_type": type_counts, | |
| "recent": self._log[-3:], | |
| } | |