File size: 54,066 Bytes
a5ae1ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 | """
Generator: converts converged concepts into text output.
Three strategies, tried in order:
A. Template Matching (primary) — concepts → find closest template → fill slots
B. Successor Walk (secondary) — walk successor lists, emit tokens
C. Concept List (fallback) — return raw concepts as structured output
The generator does NOT generate from nothing. It takes the output of the
convergence loop (a set of concept neurons) and renders it as text.
Templates are stored as neurons in the DB with a special pattern field.
They're inspectable, editable, deletable — same as any other neuron.
"""
import re
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from neuron import NeuronDB, Neuron
from encoder import Encoder
from convergence import ConvergenceLoop, ConvergenceResult
from constants import (
ABSTAIN_MESSAGE, FUNCTION_WORDS, STRUCTURAL_WORDS,
GRAMMAR_CONFIDENCE_THRESHOLD, MAX_CONVERGENCE_JUMPS,
QUERY_ANCHOR_FLOOR, PARAGRAPH_RELEVANCE_FLOOR,
)
def _parse_template_structure(pattern: str) -> list:
"""
Parse a template pattern into an ordered structure.
"[PERSON] wrote [WORK]" → [("slot", "PERSON"), ("word", "wrote"), ("slot", "WORK")]
Uses simple character-level parsing, no regex.
"""
parts = []
i = 0
current_word = []
while i < len(pattern):
if pattern[i] == '[':
# Flush any accumulated word
if current_word:
word = ''.join(current_word).strip()
if word:
for w in word.split():
parts.append(("word", w.lower()))
current_word = []
# Find closing bracket
j = pattern.index(']', i)
slot_name = pattern[i + 1:j]
parts.append(("slot", slot_name))
i = j + 1
else:
current_word.append(pattern[i])
i += 1
# Flush remaining
if current_word:
word = ''.join(current_word).strip()
if word:
for w in word.split():
parts.append(("word", w.lower()))
return parts
@dataclass
class Template:
"""
A sentence pattern with fillable slots.
Example:
pattern: "[PERSON] wrote [WORK] in [YEAR]"
slots: {"PERSON": "noun", "WORK": "noun", "YEAR": "number"}
structure: [("slot", "PERSON"), ("word", "wrote"), ("slot", "WORK"), ...]
"""
id: int
pattern: str
slots: dict # slot_name → slot_type
vector: np.ndarray # embedding of the template (for search)
confidence: float = 0.5
structure: list = field(default_factory=list) # parsed at creation
def __post_init__(self):
if not self.structure:
self.structure = _parse_template_structure(self.pattern)
@property
def slot_names(self) -> list:
return list(self.slots.keys())
@property
def structural_words(self) -> list:
"""Words in the template that aren't slots."""
return [name for kind, name in self.structure if kind == "word"]
def fill(self, slot_values: dict) -> str:
"""Fill slots with values. Returns the filled text."""
result = self.pattern
for name, value in slot_values.items():
result = result.replace(f"[{name}]", str(value))
return result
def unfilled_slots(self, slot_values: dict) -> list:
"""Return slot names that haven't been filled."""
return [s for s in self.slots if s not in slot_values]
@dataclass
class GenerationResult:
"""Result of text generation."""
text: str
strategy: str # "template", "successor", "concept_list"
confidence: float
template_used: Optional[Template] = None
slot_fills: dict = field(default_factory=dict)
trace: list = field(default_factory=list) # step-by-step for inspectability
def explain(self) -> str:
"""Human-readable explanation. Invariant #2."""
lines = [f"Strategy: {self.strategy} (confidence={self.confidence:.3f})"]
if self.template_used:
lines.append(f"Template: {self.template_used.pattern}")
lines.append(f"Fills: {self.slot_fills}")
for step in self.trace:
lines.append(f" {step}")
return "\n".join(lines)
class TemplateStore:
"""
Stores and retrieves sentence templates.
Templates are patterns like "[PERSON] wrote [WORK]" with typed slots.
Stored with embeddings for spatial search — find the template that
best matches a set of concepts.
Persistence: when a NeuronDB is provided, templates are saved to and
loaded from SQLite. Without a DB, templates live in memory only.
"""
def __init__(self, encoder: Encoder, db: 'NeuronDB' = None):
self.encoder = encoder
self.db = db
self.templates: list[Template] = []
self._next_id = 0
# Load persisted templates if DB is available
if db is not None:
self._load_from_db()
def _load_from_db(self):
"""Load templates from SQLite on startup."""
import json
rows = self.db.load_templates()
for tid, pattern, slots_json, conf, vector in rows:
slots = json.loads(slots_json)
template = Template(
id=tid, pattern=pattern, slots=slots,
vector=vector, confidence=conf,
)
self.templates.append(template)
if tid >= self._next_id:
self._next_id = tid + 1
def add(self, pattern: str, slots: dict, confidence: float = 0.5) -> Template:
"""Register a template."""
import json
# Embed the template by encoding its non-slot words
text = re.sub(r'\[([A-Z_]+)\]', '', pattern).strip()
vector = self.encoder.encode_sentence(text)
template = Template(
id=self._next_id,
pattern=pattern,
slots=slots,
vector=vector,
confidence=confidence,
)
self.templates.append(template)
# Persist to SQLite
if self.db is not None:
self.db.save_template(
self._next_id, pattern, json.dumps(slots),
confidence, vector,
)
self._next_id += 1
return template
def search(self, concept_vector: np.ndarray, k: int = 3,
min_similarity: float = -1.0) -> list[Template]:
"""Find templates closest to a concept vector, above min similarity."""
if not self.templates:
return []
# Vectorized: batch cosine similarity
template_matrix = np.array([t.vector for t in self.templates], dtype=np.float32)
query_norm = np.linalg.norm(concept_vector)
template_norms = np.linalg.norm(template_matrix, axis=1)
sims = (template_matrix @ concept_vector) / (query_norm * template_norms + 1e-10)
# Filter and sort
mask = sims >= min_similarity
indices = np.where(mask)[0]
if len(indices) == 0:
return []
sorted_idx = indices[np.argsort(-sims[indices])]
return [self.templates[i] for i in sorted_idx[:k]]
def count(self) -> int:
return len(self.templates)
def delete(self, template_id: int) -> bool:
"""Delete = gone. Invariant #3."""
before = len(self.templates)
self.templates = [t for t in self.templates if t.id != template_id]
if self.db is not None:
self.db.delete_template(template_id)
return len(self.templates) < before
class Generator:
"""
Converts convergence results into text.
Tries strategies in order:
1. Template matching — if a matching template is found
2. Successor walk — if concepts have successors
3. Concept list — always works (fallback)
"""
def __init__(self, db: NeuronDB, encoder: Encoder,
template_store: TemplateStore):
self.db = db
self.encoder = encoder
self.template_store = template_store
def generate(self, convergence_result: ConvergenceResult,
max_tokens: int = 20,
query_vector: np.ndarray = None,
query_words: list = None,
evaluate_all: bool = False) -> GenerationResult:
"""
Generate text from a convergence result.
Default mode: tries template → successor → concept_list, returns first success.
Evaluate mode (evaluate_all=True): runs ALL strategies, picks highest confidence.
The winning strategy and all candidates are in the trace for inspectability.
"""
if not convergence_result.converged:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["Non-convergence → honest abstention (Invariant #4)"],
)
concepts = convergence_result.concepts
if not concepts:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["No concepts found"],
)
if not evaluate_all:
# Hierarchical: try simplest sufficient method first
# Strategy A: Template matching
result = self._try_template(convergence_result, query_vector=query_vector,
query_words=query_words)
if result is not None:
return result
# Strategy B: Sentence-constrained chain walk
# Use co-occurrence graph to find the taught sentence that best
# matches the query, then output its content words in taught order.
result = self._try_sentence_chain(concepts, query_vector=query_vector,
query_words=query_words)
if result is not None:
return result
# Strategy C: Free successor walk (convergence-guided)
result = self._try_successor_walk(concepts, max_tokens, query_vector=query_vector)
if result is not None:
return result
# Strategy D: Concept list (always works)
return self._concept_list(concepts, convergence_result.confidence)
# Evaluate all strategies, pick best
candidates = []
template_result = self._try_template(
convergence_result, query_vector=query_vector, query_words=query_words
)
if template_result is not None:
candidates.append(template_result)
sentence_result = self._try_sentence_chain(
concepts, query_vector=query_vector, query_words=query_words
)
if sentence_result is not None:
candidates.append(sentence_result)
successor_result = self._try_successor_walk(
concepts, max_tokens, query_vector=query_vector
)
if successor_result is not None:
candidates.append(successor_result)
concept_result = self._concept_list(concepts, convergence_result.confidence)
candidates.append(concept_result)
# Pick highest confidence
best = max(candidates, key=lambda r: r.confidence)
best.trace.insert(0, f"Evaluated {len(candidates)} strategies: "
+ ", ".join(f"{c.strategy}({c.confidence:.3f})" for c in candidates))
return best
def _try_template(self, conv_result: ConvergenceResult,
query_vector: np.ndarray = None,
query_words: list = None) -> Optional[GenerationResult]:
"""
Strategy A: Find closest template, fill slots with concept words.
"""
if self.template_store.count() == 0:
return None
# Score templates by structural word overlap with the QUERY TEXT
# (not concepts — concepts include noise like "the", "is").
# A template's structural words should appear in what the user asked.
query_word_set = set(query_words) if query_words else set()
search_vec = query_vector if query_vector is not None else conv_result.vector
scored_templates = []
for t in self.template_store.templates:
struct_words = t.structural_words
struct_total = len(struct_words) or 1
if query_word_set:
# Match template structural words against query words
struct_overlap = sum(
1 for w in struct_words if w in query_word_set
)
overlap_score = struct_overlap / struct_total
else:
# No query words available — use concept-based matching
concept_word_set = set(self._neurons_to_words(conv_result.concepts))
struct_overlap = sum(
1 for w in struct_words if w in concept_word_set
)
overlap_score = struct_overlap / struct_total
# Vector similarity
vec_sim = float(np.dot(search_vec, t.vector) /
(np.linalg.norm(search_vec) * np.linalg.norm(t.vector) + 1e-10))
# Combined: overlap is dominant, vector similarity breaks ties
combined = overlap_score * 0.75 + max(vec_sim, 0) * 0.25
if overlap_score > 0:
scored_templates.append((t, combined, overlap_score))
scored_templates.sort(key=lambda x: x[1], reverse=True)
templates = [t for t, _, _ in scored_templates[:3]]
if not templates:
# Fallback to pure vector search if no structural overlap
templates = self.template_store.search(search_vec, k=3)
if not templates:
return None
concepts = conv_result.concepts
concept_words = self._neurons_to_words(concepts)
# Sort concepts by TAUGHT sentence order when available.
# The sentence_neurons table records the position each word was
# taught in. Using this order preserves the original sentence
# structure, so template slots get filled correctly:
# taught "paris is the capital of france" → positions [paris=0, capital=1, france=2]
# template "[S0] is the [S1] of [S2]" → S0=paris, S1=capital, S2=france
# Without this, sorting by query similarity puts "capital" first
# (highest sim to "capital of france") → garbled output.
concept_ids = [c.id for c in concepts]
sentence_order = self._get_sentence_order(concept_ids)
if sentence_order:
# Sort by taught position
concept_pairs = list(zip(concepts, concept_words))
concept_pairs.sort(
key=lambda p: sentence_order.get(p[0].id, 999),
)
concepts = [p[0] for p in concept_pairs]
concept_words = [p[1] for p in concept_pairs]
elif query_vector is not None:
# Fallback: sort by relevance to query
concept_pairs = list(zip(concepts, concept_words))
concept_pairs.sort(
key=lambda p: float(np.dot(p[0].vector, query_vector)),
reverse=True,
)
concepts = [p[0] for p in concept_pairs]
concept_words = [p[1] for p in concept_pairs]
for template in templates:
slot_fills = self._match_slots(template, concept_words, concepts)
unfilled = template.unfilled_slots(slot_fills)
if not unfilled:
# All slots filled
text = template.fill(slot_fills)
return GenerationResult(
text=text,
strategy="template",
confidence=template.confidence * conv_result.confidence,
template_used=template,
slot_fills=slot_fills,
trace=[
f"Template matched: {template.pattern}",
f"Slot fills: {slot_fills}",
f"Concepts: {concept_words}",
],
)
# No template fully filled — partial fill of best match
best = templates[0]
slot_fills = self._match_slots(best, concept_words)
unfilled = best.unfilled_slots(slot_fills)
if slot_fills: # at least some slots filled
text = best.fill(slot_fills)
# Replace unfilled slots with "..."
for name in unfilled:
text = text.replace(f"[{name}]", "...")
return GenerationResult(
text=text,
strategy="template",
confidence=best.confidence * conv_result.confidence * 0.5,
template_used=best,
slot_fills=slot_fills,
trace=[
f"Template partial: {best.pattern}",
f"Filled: {slot_fills}, unfilled: {unfilled}",
],
)
return None # no slots matched at all
def _try_sentence_chain(self, concepts: list, query_vector: np.ndarray = None,
query_words: list = None) -> Optional[GenerationResult]:
"""
Strategy B: Sentence-constrained chain retrieval.
Instead of walking the successor graph freely (which picks wrong chains
at ambiguous nodes like "the"), use the sentence_neurons table to find
which taught sentence best matches the query, then output that sentence's
content words in their taught order.
This is the key insight from the convergence analysis: the data IS in the
graph (72% reconstructable), the problem is finding the right chain.
The sentence table tells us which neurons were taught together — that's
the constraint that eliminates ambiguity.
"""
if not concepts:
return None
# Find concept neuron IDs
concept_ids = [c.id for c in concepts]
# Find sentences containing these concepts, scored by coverage
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return None
# Score: how many of the query-relevant concepts does each sentence contain?
scored = []
for sid, matched_neurons in sentences.items():
coverage = len(matched_neurons)
# Also check query vector relevance if available
sent_neurons = self.db.get_sentence_neurons(sid)
if not sent_neurons:
continue
relevance = 0.0
if query_vector is not None:
# Centroid of sentence neurons vs query
vecs = []
for nid, pos in sent_neurons:
n = self.db.get(nid)
if n is not None:
vecs.append(n.vector)
if vecs:
centroid = np.mean(vecs, axis=0).astype(np.float32)
norm = np.linalg.norm(centroid)
if norm > 0:
centroid = centroid / norm
relevance = float(np.dot(centroid, query_vector))
score = coverage * 0.4 + max(relevance, 0) * 0.6
scored.append((sid, score, sent_neurons))
if not scored:
return None
scored.sort(key=lambda x: x[1], reverse=True)
# Get word mappings for neuron → word conversion
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
# Try the top-scoring sentences
for sid, score, sent_neurons in scored[:3]:
# Output neurons in taught position order
ordered = sorted(sent_neurons, key=lambda x: x[1])
words = []
for nid, pos in ordered:
w = neuron_to_word.get(nid)
if not w:
w = self._neuron_to_word(self.db.get(nid)) if self.db.get(nid) else None
if w and not w.startswith("__"):
words.append(w)
if len(words) >= 2:
text = " ".join(words)
avg_conf = np.mean([
self.db.get(nid).confidence
for nid, _ in ordered
if self.db.get(nid) is not None
]) if ordered else 0.5
return GenerationResult(
text=text,
strategy="sentence_chain",
confidence=float(avg_conf) * 0.8,
trace=[
f"Sentence chain: sid={sid}, score={score:.3f}",
f"Coverage: {len(sentences[sid])} query concepts matched",
f"Words: {words}",
],
)
return None
def _try_successor_walk(self, concepts: list[Neuron],
max_tokens: int,
query_vector: np.ndarray = None) -> Optional[GenerationResult]:
"""
Strategy B: Convergence-guided sentence generation.
Each token position is a decision point:
1. FAST PATH: current neuron has a high-confidence successor → emit it
(grammar tokens: "is", "the", "of" — predictable from word order)
2. SLOW PATH: run a mini convergence loop where the search vector
blends query (what was asked) + context (tokens emitted so far).
The convergence result is intersected with successor candidates
to pick the token that is both grammatically valid AND relevant.
This is the decoder from the design spec:
- Successor graph = what CAN follow (grammar constraint)
- Convergence = what SHOULD follow (semantic relevance)
- Query anchor = stay on topic across the whole sentence
- Context accumulation = each token enriches the next search
Stop when: no successors, convergence fails, or max tokens.
"""
if not concepts:
return None
# Pick starting concept: most relevant to the query, not just highest confidence
if query_vector is not None:
start_concept = max(
concepts,
key=lambda n: float(np.dot(n.vector, query_vector))
)
else:
start_concept = max(concepts, key=lambda n: n.confidence)
start = self.db.get(start_concept.id)
if start is None:
return None
tokens = []
trace = []
emitted_neurons = [] # vectors of emitted tokens, for context
current_id = start.id
visited = {current_id}
# Add start word
start_word = self._neuron_to_word(start)
if start_word:
tokens.append(start_word)
emitted_neurons.append(start)
trace.append(f"Start: {start_word} (n{start.id}, conf={start.confidence:.2f})")
consecutive_low_relevance = 0 # track drift from query
had_jump = False # whether we've crossed a sentence boundary
jump_count = 0 # number of convergence jumps taken
max_jumps = MAX_CONVERGENCE_JUMPS
current_sentence_ids = None # sentences the current neuron belongs to
for step in range(max_tokens - 1):
current = self.db.get(current_id)
if current is None:
break
# Get unvisited successors
candidates = [(sid, sc) for sid, sc in current.successors
if sid not in visited]
if not candidates:
# No successors — try convergence to find a continuation
if (query_vector is not None
and len(tokens) < max_tokens - 1
and jump_count < max_jumps):
next_neuron = self._converge_next_token(
query_vector, emitted_neurons, visited
)
if next_neuron is not None:
word = self._neuron_to_word(next_neuron)
if word:
tokens.append(word)
emitted_neurons.append(next_neuron)
trace.append(
f"Step {step + 1}: {word} (n{next_neuron.id}, "
f"converge-jump {jump_count + 1}/{max_jumps}, "
f"conf={next_neuron.confidence:.2f})"
)
visited.add(next_neuron.id)
current_id = next_neuron.id
had_jump = True
jump_count += 1
consecutive_low_relevance = 0
# Track which sentence(s) the jump target belongs to
rows = self.db.get_cooccurring_neurons(next_neuron.id)
current_sentence_ids = {r[2] for r in rows} if rows else None
continue
break
# FAST PATH: best successor confidence above threshold → grammar token
best_id, best_conf = max(candidates, key=lambda s: s[1])
if best_conf >= GRAMMAR_CONFIDENCE_THRESHOLD:
succ = self.db.get(best_id)
if succ is None:
break
word = self._neuron_to_word(succ)
if not word:
break
# Sentence-aware filtering: after a jump, check if this
# successor belongs to the same sentence as the jump target.
# If it's from a different sentence, we've crossed a boundary
# into unrelated content — stop.
if had_jump and current_sentence_ids is not None:
succ_rows = self.db.get_cooccurring_neurons(best_id)
succ_sentences = {r[2] for r in succ_rows} if succ_rows else set()
if succ_sentences and not (succ_sentences & current_sentence_ids):
# Different sentence — stop here
trace.append(
f"Stop: sentence boundary at step {step + 1} "
f"({word} belongs to different sentence)"
)
break
# Relevance drift check (backup for neurons without sentence data)
if had_jump and current_sentence_ids is None and query_vector is not None:
token_rel = float(np.dot(succ.vector, query_vector) /
(np.linalg.norm(succ.vector) *
np.linalg.norm(query_vector) + 1e-10))
if token_rel < 0.25:
consecutive_low_relevance += 1
else:
consecutive_low_relevance = 0
if consecutive_low_relevance >= 2:
trim = min(consecutive_low_relevance, len(tokens))
tokens = tokens[:-trim]
emitted_neurons = emitted_neurons[:-trim]
trace.append(f"Stop: post-jump drift after {len(tokens)} tokens")
break
tokens.append(word)
emitted_neurons.append(succ)
trace.append(
f"Step {step + 1}: {word} (n{best_id}, "
f"fast, succ_conf={best_conf:.2f})"
)
visited.add(best_id)
current_id = best_id
continue
# SLOW PATH: convergence-guided selection among successors
if query_vector is not None:
chosen = self._convergence_pick(
query_vector, emitted_neurons, candidates
)
else:
chosen = None
if chosen is None:
# Fallback: just take the best successor
chosen_id, chosen_conf = best_id, best_conf
else:
chosen_id, chosen_conf = chosen
succ = self.db.get(chosen_id)
if succ is None:
break
word = self._neuron_to_word(succ)
if not word:
break
speed = "converge" if chosen is not None else "fallback"
tokens.append(word)
# Relevance stopping: if content tokens drift away from query, stop.
# Grammar tokens (fast path) don't count — "the", "of" are always low-relevance.
if query_vector is not None:
token_relevance = float(np.dot(succ.vector, query_vector) /
(np.linalg.norm(succ.vector) *
np.linalg.norm(query_vector) + 1e-10))
if token_relevance < 0.2:
consecutive_low_relevance += 1
else:
consecutive_low_relevance = 0
# Two consecutive low-relevance content tokens = we've drifted off topic
if consecutive_low_relevance >= 2:
# Remove the drifted tokens
tokens = tokens[:-consecutive_low_relevance]
emitted_neurons = emitted_neurons[:-consecutive_low_relevance]
trace.append(f"Stop: relevance drift after {len(tokens)} tokens")
break
emitted_neurons.append(succ)
trace.append(
f"Step {step + 1}: {word} (n{chosen_id}, "
f"{speed}, conf={chosen_conf:.2f})"
)
visited.add(chosen_id)
current_id = chosen_id
if len(tokens) < 2:
return None
text = " ".join(tokens)
avg_conf = np.mean([n.confidence for n in emitted_neurons])
return GenerationResult(
text=text,
strategy="successor",
confidence=float(avg_conf) * 0.6,
trace=trace,
)
def _build_context_vector(self, query_vector: np.ndarray,
emitted: list) -> np.ndarray:
"""
Build a search vector that blends query intent with generation context.
Early in generation: mostly query (stay on topic).
Later: more context (maintain coherence with what's been said).
But query never drops below 40% — it's the anchor.
"""
if not emitted:
return query_vector
# Context = average of emitted neuron vectors
context = np.mean([n.vector for n in emitted], axis=0).astype(np.float32)
norm = np.linalg.norm(context)
if norm > 0:
context = context / norm
# Query weight decreases but floors at 0.4
query_weight = max(QUERY_ANCHOR_FLOOR, 1.0 - len(emitted) * 0.1)
blended = query_weight * query_vector + (1 - query_weight) * context
norm = np.linalg.norm(blended)
if norm > 0:
blended = blended / norm
return blended
def _convergence_pick(self, query_vector: np.ndarray,
emitted: list,
candidates: list) -> Optional[tuple]:
"""
Pick the best successor using convergence-guided scoring.
Scores each candidate by how well it fits the query+context blend.
Returns (neuron_id, score) or None if no candidate is relevant.
"""
search_vec = self._build_context_vector(query_vector, emitted)
# Fetch all candidate neurons
valid = []
for cand_id, succ_conf in candidates:
cand = self.db.get(cand_id)
if cand is not None:
valid.append((cand_id, succ_conf, cand))
if not valid:
return None
# Vectorized scoring
vectors = np.array([c.vector for _, _, c in valid], dtype=np.float32)
succ_confs = np.array([sc for _, sc, _ in valid], dtype=np.float32)
norms = np.linalg.norm(vectors, axis=1)
search_norm = np.linalg.norm(search_vec)
sims = (vectors @ search_vec) / (norms * search_norm + 1e-10)
scores = sims * 0.6 + succ_confs * 0.4
best_idx = int(np.argmax(scores))
if scores[best_idx] > 0.0:
return (valid[best_idx][0], float(scores[best_idx]))
return None
def _converge_next_token(self, query_vector: np.ndarray,
emitted: list,
visited: set) -> Optional[Neuron]:
"""
When successor chain runs out, use convergence to jump to
a new concept that's relevant to the query+context.
This enables cross-sentence reasoning: the system can chain
concepts that weren't explicitly connected by successor edges.
Tighter than general search — requires meaningful query relevance
to prevent drifting into unrelated KB regions. Also filters out
generic/function words (the, is, of) which have high similarity
to everything in GloVe but carry no content.
"""
search_vec = self._build_context_vector(query_vector, emitted)
# Search the DB for neurons near the blended vector
neighbors = self.db.search(search_vec, k=10)
best_candidate = None
best_score = 0.0
# Pre-load word mappings to avoid expensive nearest_words calls
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
# Filter candidates: not visited, not function words
valid = []
for n in neighbors:
if n.id in visited:
continue
word = neuron_to_word.get(n.id)
if word and self._is_function_word(word):
continue
if n.confidence > 0.1:
valid.append(n)
if not valid:
return None
# Vectorized: batch cosine similarities against query and context
vectors = np.array([n.vector for n in valid], dtype=np.float32)
norms = np.linalg.norm(vectors, axis=1)
query_sims = (vectors @ query_vector) / (norms * np.linalg.norm(query_vector) + 1e-10)
ctx_sims = (vectors @ search_vec) / (norms * np.linalg.norm(search_vec) + 1e-10)
# Apply thresholds and score
mask = (query_sims > 0.3) & (ctx_sims > 0.25)
if not np.any(mask):
return None
scores = query_sims * 0.6 + ctx_sims * 0.4
scores = np.where(mask, scores, -np.inf)
best_idx = int(np.argmax(scores))
if scores[best_idx] > 0:
return valid[best_idx]
return None
@staticmethod
def _is_function_word(word: str) -> bool:
"""Check if a word is a function/grammar word that shouldn't start a jump."""
return word.lower() in FUNCTION_WORDS
# --- Paragraph Generation ---
def generate_paragraph(self, query_vector: np.ndarray,
convergence_loop: ConvergenceLoop,
max_sentences: int = 5,
max_tokens_per_sentence: int = 15,
query_words: list = None,
sentence_separator: str = ". ") -> GenerationResult:
"""
Generate a multi-sentence paragraph via convergence-driven retrieval.
The key insight: convergence finds WHAT to say. The sentence_neurons
table (taught word order) determines HOW to say it. No separate
decoder needed — convergence IS the decoder at the sentence level.
Phase 1 — CONVERGE: find concepts relevant to the query.
Phase 2 — RETRIEVE SENTENCES: find taught sentences containing
those concepts, ranked by query coverage.
Phase 3 — RENDER: output each sentence's neurons in taught order,
mapped back to words. The original word order IS grammar.
This avoids the drift problem entirely — we don't generate token
by token, we retrieve whole sentences that were taught correctly
and output them in the order they were taught.
"""
# Phase 1: Converge to find relevant concepts
result = convergence_loop.converge(query_vector)
if not result.converged or not result.concepts:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["Paragraph planning: convergence failed"],
)
# Enrich with per-word matches
all_concepts = list(result.concepts)
seen_ids = {c.id for c in all_concepts}
if query_words:
for token in query_words:
wv = self.encoder.encode_word(token)
if not np.all(wv == 0):
for n in self.db.search(wv, k=3):
if n.id not in seen_ids:
sim = float(np.dot(n.vector, wv))
if sim > 0.3:
all_concepts.append(n)
seen_ids.add(n.id)
# Phase 2: Find sentences containing these concepts
concept_ids = [c.id for c in all_concepts]
sentences_map = self.db.get_sentences_for_neurons(concept_ids)
if not sentences_map:
# No sentence structure — fall back to single-sentence generation
conv_result = ConvergenceResult(
converged=True, vector=result.vector,
concepts=all_concepts, confidence=result.confidence,
)
return self.generate(conv_result, max_tokens=max_tokens_per_sentence,
query_vector=query_vector, query_words=query_words)
# Score sentences by:
# 1. Query coverage (how many query-relevant concepts are in this sentence)
# 2. Relevance of the sentence's centroid to the query
scored_sentences = []
for sid, matched_neurons in sentences_map.items():
# Get ALL neurons in this sentence (not just the ones that matched)
full_sentence = self.db.get_sentence_neurons(sid)
if not full_sentence:
continue
# Coverage score: what fraction of the query concepts does this sentence contain?
coverage = len(matched_neurons) / max(len(concept_ids), 1)
# Relevance: centroid of sentence neurons vs query
sent_neurons = []
for nid, pos in full_sentence:
n = self.db.get(nid)
if n is not None:
sent_neurons.append((n, pos))
if not sent_neurons:
continue
centroid = np.mean([n.vector for n, _ in sent_neurons], axis=0).astype(np.float32)
norm = np.linalg.norm(centroid)
if norm > 0:
centroid = centroid / norm
relevance = float(np.dot(centroid, query_vector))
score = coverage * 0.4 + max(relevance, 0) * 0.6
scored_sentences.append((sid, score, sent_neurons))
scored_sentences.sort(key=lambda x: x[1], reverse=True)
# Phase 3: Render top sentences in taught word order
rendered = []
trace = [f"Plan: {len(scored_sentences)} candidate sentences from {len(all_concepts)} concepts"]
used_sids = set()
# Relevance floor: only include sentences scoring at least 50%
# of the best sentence's score. Prevents noise sentences.
best_score = scored_sentences[0][1] if scored_sentences else 0
relevance_floor = best_score * PARAGRAPH_RELEVANCE_FLOOR
for sid, score, sent_neurons in scored_sentences[:max_sentences]:
if sid in used_sids:
continue
if score < relevance_floor:
break # sorted by score, so all remaining are worse
used_sids.add(sid)
# Sort by position (the taught order)
sent_neurons.sort(key=lambda x: x[1])
# Map neurons to words
words = []
for n, pos in sent_neurons:
word = self._neuron_to_word(n)
if word:
words.append(word)
if words:
sentence_text = " ".join(words)
# Skip duplicates
if sentence_text not in rendered:
rendered.append(sentence_text)
trace.append(f"S{len(rendered)} (score={score:.3f}): {sentence_text}")
if not rendered:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=trace + ["No sentences rendered"],
)
# Join sentences using the provided separator.
# Default is ". " — but this is configurable, not hardcoded behavior.
# Future: teach punctuation as neurons so boundaries emerge from data.
paragraph = sentence_separator.join(rendered)
avg_conf = float(np.mean([c.confidence for c in all_concepts]))
return GenerationResult(
text=paragraph,
strategy="paragraph",
confidence=avg_conf * 0.7,
trace=trace,
)
def _cluster_by_sentence(self, concepts: list) -> list:
"""
Group concepts by sentence co-occurrence.
Neurons taught together in the same sentence belong to the same
cluster. This gives natural topic boundaries — each taught sentence
is one "idea unit."
Returns list of clusters, each cluster = list of neurons.
"""
# Map neuron_id → set of sentence_ids
neuron_sentences = {}
for c in concepts:
rows = self.db.get_cooccurring_neurons(c.id)
sentence_ids = {r[2] for r in rows} # r = (neuron_id, position, sentence_id)
if sentence_ids:
neuron_sentences[c.id] = sentence_ids
if not neuron_sentences:
return []
# Group: neurons sharing any sentence_id go together
# Build adjacency from shared sentences
concept_map = {c.id: c for c in concepts}
assigned = set()
clusters = []
for c in concepts:
if c.id in assigned:
continue
if c.id not in neuron_sentences:
continue
# BFS to find all neurons connected by shared sentences
cluster = []
queue = [c.id]
while queue:
nid = queue.pop(0)
if nid in assigned:
continue
assigned.add(nid)
if nid in concept_map:
cluster.append(concept_map[nid])
# Find co-occurring neurons
for other_id, sids in neuron_sentences.items():
if other_id not in assigned:
if sids & neuron_sentences.get(nid, set()):
queue.append(other_id)
if cluster:
clusters.append(cluster)
# Add unclustered concepts as individual clusters
for c in concepts:
if c.id not in assigned:
clusters.append([c])
return clusters
def _concept_list(self, concepts: list[Neuron],
confidence: float) -> GenerationResult:
"""
Strategy C: Return raw concepts. Always works. No fluency.
"""
words = self._neurons_to_words(concepts)
trace = [f"n{c.id} → {w} (conf={c.confidence:.2f})"
for c, w in zip(concepts, words) if w]
return GenerationResult(
text=", ".join(w for w in words if w),
strategy="concept_list",
confidence=confidence * 0.9,
trace=["Concept list fallback"] + trace,
)
def _get_sentence_order(self, concept_ids: list) -> dict:
"""
Find the taught sentence that best covers these concepts and
return a position map: {neuron_id: taught_position}.
When concepts come from a taught sentence, this preserves the
original word order for template slot filling. If no sentence
covers enough concepts, returns empty dict (fall back to other
ordering methods).
"""
if len(concept_ids) < 2:
return {}
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return {}
# Score by coverage: how many of our concepts are in this sentence?
best_sid = None
best_coverage = 0
for sid, matched_nids in sentences.items():
coverage = len(matched_nids)
if coverage > best_coverage:
best_coverage = coverage
best_sid = sid
# Need at least 2 concepts matched to trust the ordering
if best_coverage < 2:
return {}
# Get positions from the best sentence
sent_neurons = self.db.get_sentence_neurons(best_sid)
return {nid: pos for nid, pos in sent_neurons}
def _neurons_to_words(self, neurons: list[Neuron]) -> list[str]:
"""Map neurons back to nearest words via encoder."""
words = []
for n in neurons:
word = self._neuron_to_word(n)
words.append(word if word else f"<n{n.id}>")
return words
def _neuron_to_word(self, neuron: Neuron) -> Optional[str]:
"""Find the closest word to a neuron's vector.
First checks the word→neuron mapping (works for any dimension).
Falls back to encoder nearest_words (requires matching dimensions).
"""
# Fast path: direct lookup from DB mapping
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
label = neuron_to_word.get(neuron.id)
if label and not label.startswith("__"):
return label
# Fallback: encoder nearest word (only works if dimensions match)
try:
nearest = self.encoder.nearest_words(neuron.vector, k=1)
if nearest:
return nearest[0][0]
except (ValueError, RuntimeError):
pass # dimension mismatch (e.g., CLIP 512 vs GloVe 300)
return None
def _match_slots(self, template: Template,
concept_words: list[str],
concept_neurons: list = None) -> dict:
"""
Match concept words to template slots using the successor graph.
The template has structural words (e.g., "wrote" in "[PERSON] wrote [WORK]").
We find the structural word's neuron in the DB, then use its
predecessor/successor relationships to determine which concepts
go in which slots.
Example: "shakespeare wrote hamlet" was taught as a sentence.
The "wrote" neuron has predecessor=[shakespeare] and successor=[hamlet].
Template "[PERSON] wrote [WORK]" → PERSON=predecessor, WORK=successor.
This is the key insight: the successor graph encodes word order,
and word order encodes semantic roles.
"""
fills = {}
# Use pre-parsed template structure
slot_order = template.structure
struct_words = template.structural_words
# Try graph-based assignment: find structural words in concepts,
# use their predecessors/successors to fill adjacent slots
fills = {}
if concept_neurons and struct_words:
fills = self._match_slots_by_graph(
template, slot_order, concept_neurons
)
# Fill remaining unfilled slots by position (hybrid approach)
unfilled = template.unfilled_slots(fills)
if unfilled:
position_fills = self._match_slots_by_position(
template, concept_words, slot_order
)
for slot_name in unfilled:
if slot_name in position_fills:
# Don't reuse words already assigned by graph
if position_fills[slot_name] not in fills.values():
fills[slot_name] = position_fills[slot_name]
return fills
def _match_slots_by_graph(self, template: Template,
slot_order: list,
concept_neurons: list) -> dict:
"""
Use successor/predecessor graph to assign concepts to slots.
Find structural words among the concepts. For each slot adjacent
to a structural word, look at the predecessor (if slot is before)
or successor (if slot is after) of that structural word's neuron.
"""
fills = {}
concept_words_map = {} # neuron_id → word
for n in concept_neurons:
w = self._neuron_to_word(n)
if w:
concept_words_map[n.id] = w
# Find structural word neurons in the concept set
struct_neurons = {}
for n in concept_neurons:
word = concept_words_map.get(n.id, "")
for _, struct_word in [p for p in slot_order if p[0] == "word"]:
if word == struct_word:
struct_neurons[struct_word] = n
break
if not struct_neurons:
return {}
# Walk the slot_order and fill based on graph relationships
for i, (kind, name) in enumerate(slot_order):
if kind != "slot":
continue
# Map template slot name to actual template slot name (case)
actual_slot = None
for sn in template.slots:
if sn.upper() == name:
actual_slot = sn
break
if not actual_slot:
continue
# Find adjacent structural word
# Look right for a structural word after this slot
for j in range(i + 1, len(slot_order)):
if slot_order[j][0] == "word":
struct_word = slot_order[j][1]
struct_n = struct_neurons.get(struct_word)
if struct_n:
# This slot is BEFORE the structural word
# → fill with predecessor
struct_fresh = self.db.get(struct_n.id)
if struct_fresh and struct_fresh.predecessors:
for pred_id in struct_fresh.predecessors:
word = concept_words_map.get(pred_id)
if word and word not in fills.values():
fills[actual_slot] = word
break
break
if actual_slot in fills:
continue
# Look left for a structural word before this slot
for j in range(i - 1, -1, -1):
if slot_order[j][0] == "word":
struct_word = slot_order[j][1]
struct_n = struct_neurons.get(struct_word)
if struct_n:
# This slot is AFTER the structural word
# → fill with successor
struct_fresh = self.db.get(struct_n.id)
if struct_fresh and struct_fresh.successors:
for succ_id, _ in struct_fresh.successors:
word = concept_words_map.get(succ_id)
if word and word not in fills.values():
fills[actual_slot] = word
break
break
return fills
def _match_slots_by_position(self, template: Template,
concept_words: list,
slot_order: list,
query_vector: np.ndarray = None) -> dict:
"""
Fallback: assign content words to slots by relevance to query.
Filters out stop words and template structural words, then
assigns remaining content words to slots. If a query vector
is provided, sorts candidates by relevance to query.
"""
STOP_WORDS = FUNCTION_WORDS
pattern_words = set(template.structural_words)
content = [w for w in concept_words
if w.lower() not in STOP_WORDS and w.lower() not in pattern_words]
if not content:
content = [w for w in concept_words if w.lower() not in STOP_WORDS]
if not content:
content = list(concept_words)
# Deduplicate while preserving order
seen = set()
unique_content = []
for w in content:
if w not in seen:
seen.add(w)
unique_content.append(w)
content = unique_content
fills = {}
available = list(content)
for slot_name, slot_type in template.slots.items():
if not available:
break
matched = None
if slot_type == "number":
for w in available:
if w.isdigit() or _is_number(w):
matched = w
break
if matched is None:
matched = available[0]
fills[slot_name] = matched
available.remove(matched)
return fills
def _is_number(s: str) -> bool:
try:
float(s)
return True
except ValueError:
return False
|