guru / generator.py
tejadabheja's picture
Upload folder using huggingface_hub
a5ae1ac verified
"""
Generator: converts converged concepts into text output.
Three strategies, tried in order:
A. Template Matching (primary) — concepts → find closest template → fill slots
B. Successor Walk (secondary) — walk successor lists, emit tokens
C. Concept List (fallback) — return raw concepts as structured output
The generator does NOT generate from nothing. It takes the output of the
convergence loop (a set of concept neurons) and renders it as text.
Templates are stored as neurons in the DB with a special pattern field.
They're inspectable, editable, deletable — same as any other neuron.
"""
import re
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from neuron import NeuronDB, Neuron
from encoder import Encoder
from convergence import ConvergenceLoop, ConvergenceResult
from constants import (
ABSTAIN_MESSAGE, FUNCTION_WORDS, STRUCTURAL_WORDS,
GRAMMAR_CONFIDENCE_THRESHOLD, MAX_CONVERGENCE_JUMPS,
QUERY_ANCHOR_FLOOR, PARAGRAPH_RELEVANCE_FLOOR,
)
def _parse_template_structure(pattern: str) -> list:
"""
Parse a template pattern into an ordered structure.
"[PERSON] wrote [WORK]" → [("slot", "PERSON"), ("word", "wrote"), ("slot", "WORK")]
Uses simple character-level parsing, no regex.
"""
parts = []
i = 0
current_word = []
while i < len(pattern):
if pattern[i] == '[':
# Flush any accumulated word
if current_word:
word = ''.join(current_word).strip()
if word:
for w in word.split():
parts.append(("word", w.lower()))
current_word = []
# Find closing bracket
j = pattern.index(']', i)
slot_name = pattern[i + 1:j]
parts.append(("slot", slot_name))
i = j + 1
else:
current_word.append(pattern[i])
i += 1
# Flush remaining
if current_word:
word = ''.join(current_word).strip()
if word:
for w in word.split():
parts.append(("word", w.lower()))
return parts
@dataclass
class Template:
"""
A sentence pattern with fillable slots.
Example:
pattern: "[PERSON] wrote [WORK] in [YEAR]"
slots: {"PERSON": "noun", "WORK": "noun", "YEAR": "number"}
structure: [("slot", "PERSON"), ("word", "wrote"), ("slot", "WORK"), ...]
"""
id: int
pattern: str
slots: dict # slot_name → slot_type
vector: np.ndarray # embedding of the template (for search)
confidence: float = 0.5
structure: list = field(default_factory=list) # parsed at creation
def __post_init__(self):
if not self.structure:
self.structure = _parse_template_structure(self.pattern)
@property
def slot_names(self) -> list:
return list(self.slots.keys())
@property
def structural_words(self) -> list:
"""Words in the template that aren't slots."""
return [name for kind, name in self.structure if kind == "word"]
def fill(self, slot_values: dict) -> str:
"""Fill slots with values. Returns the filled text."""
result = self.pattern
for name, value in slot_values.items():
result = result.replace(f"[{name}]", str(value))
return result
def unfilled_slots(self, slot_values: dict) -> list:
"""Return slot names that haven't been filled."""
return [s for s in self.slots if s not in slot_values]
@dataclass
class GenerationResult:
"""Result of text generation."""
text: str
strategy: str # "template", "successor", "concept_list"
confidence: float
template_used: Optional[Template] = None
slot_fills: dict = field(default_factory=dict)
trace: list = field(default_factory=list) # step-by-step for inspectability
def explain(self) -> str:
"""Human-readable explanation. Invariant #2."""
lines = [f"Strategy: {self.strategy} (confidence={self.confidence:.3f})"]
if self.template_used:
lines.append(f"Template: {self.template_used.pattern}")
lines.append(f"Fills: {self.slot_fills}")
for step in self.trace:
lines.append(f" {step}")
return "\n".join(lines)
class TemplateStore:
"""
Stores and retrieves sentence templates.
Templates are patterns like "[PERSON] wrote [WORK]" with typed slots.
Stored with embeddings for spatial search — find the template that
best matches a set of concepts.
Persistence: when a NeuronDB is provided, templates are saved to and
loaded from SQLite. Without a DB, templates live in memory only.
"""
def __init__(self, encoder: Encoder, db: 'NeuronDB' = None):
self.encoder = encoder
self.db = db
self.templates: list[Template] = []
self._next_id = 0
# Load persisted templates if DB is available
if db is not None:
self._load_from_db()
def _load_from_db(self):
"""Load templates from SQLite on startup."""
import json
rows = self.db.load_templates()
for tid, pattern, slots_json, conf, vector in rows:
slots = json.loads(slots_json)
template = Template(
id=tid, pattern=pattern, slots=slots,
vector=vector, confidence=conf,
)
self.templates.append(template)
if tid >= self._next_id:
self._next_id = tid + 1
def add(self, pattern: str, slots: dict, confidence: float = 0.5) -> Template:
"""Register a template."""
import json
# Embed the template by encoding its non-slot words
text = re.sub(r'\[([A-Z_]+)\]', '', pattern).strip()
vector = self.encoder.encode_sentence(text)
template = Template(
id=self._next_id,
pattern=pattern,
slots=slots,
vector=vector,
confidence=confidence,
)
self.templates.append(template)
# Persist to SQLite
if self.db is not None:
self.db.save_template(
self._next_id, pattern, json.dumps(slots),
confidence, vector,
)
self._next_id += 1
return template
def search(self, concept_vector: np.ndarray, k: int = 3,
min_similarity: float = -1.0) -> list[Template]:
"""Find templates closest to a concept vector, above min similarity."""
if not self.templates:
return []
# Vectorized: batch cosine similarity
template_matrix = np.array([t.vector for t in self.templates], dtype=np.float32)
query_norm = np.linalg.norm(concept_vector)
template_norms = np.linalg.norm(template_matrix, axis=1)
sims = (template_matrix @ concept_vector) / (query_norm * template_norms + 1e-10)
# Filter and sort
mask = sims >= min_similarity
indices = np.where(mask)[0]
if len(indices) == 0:
return []
sorted_idx = indices[np.argsort(-sims[indices])]
return [self.templates[i] for i in sorted_idx[:k]]
def count(self) -> int:
return len(self.templates)
def delete(self, template_id: int) -> bool:
"""Delete = gone. Invariant #3."""
before = len(self.templates)
self.templates = [t for t in self.templates if t.id != template_id]
if self.db is not None:
self.db.delete_template(template_id)
return len(self.templates) < before
class Generator:
"""
Converts convergence results into text.
Tries strategies in order:
1. Template matching — if a matching template is found
2. Successor walk — if concepts have successors
3. Concept list — always works (fallback)
"""
def __init__(self, db: NeuronDB, encoder: Encoder,
template_store: TemplateStore):
self.db = db
self.encoder = encoder
self.template_store = template_store
def generate(self, convergence_result: ConvergenceResult,
max_tokens: int = 20,
query_vector: np.ndarray = None,
query_words: list = None,
evaluate_all: bool = False) -> GenerationResult:
"""
Generate text from a convergence result.
Default mode: tries template → successor → concept_list, returns first success.
Evaluate mode (evaluate_all=True): runs ALL strategies, picks highest confidence.
The winning strategy and all candidates are in the trace for inspectability.
"""
if not convergence_result.converged:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["Non-convergence → honest abstention (Invariant #4)"],
)
concepts = convergence_result.concepts
if not concepts:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["No concepts found"],
)
if not evaluate_all:
# Hierarchical: try simplest sufficient method first
# Strategy A: Template matching
result = self._try_template(convergence_result, query_vector=query_vector,
query_words=query_words)
if result is not None:
return result
# Strategy B: Sentence-constrained chain walk
# Use co-occurrence graph to find the taught sentence that best
# matches the query, then output its content words in taught order.
result = self._try_sentence_chain(concepts, query_vector=query_vector,
query_words=query_words)
if result is not None:
return result
# Strategy C: Free successor walk (convergence-guided)
result = self._try_successor_walk(concepts, max_tokens, query_vector=query_vector)
if result is not None:
return result
# Strategy D: Concept list (always works)
return self._concept_list(concepts, convergence_result.confidence)
# Evaluate all strategies, pick best
candidates = []
template_result = self._try_template(
convergence_result, query_vector=query_vector, query_words=query_words
)
if template_result is not None:
candidates.append(template_result)
sentence_result = self._try_sentence_chain(
concepts, query_vector=query_vector, query_words=query_words
)
if sentence_result is not None:
candidates.append(sentence_result)
successor_result = self._try_successor_walk(
concepts, max_tokens, query_vector=query_vector
)
if successor_result is not None:
candidates.append(successor_result)
concept_result = self._concept_list(concepts, convergence_result.confidence)
candidates.append(concept_result)
# Pick highest confidence
best = max(candidates, key=lambda r: r.confidence)
best.trace.insert(0, f"Evaluated {len(candidates)} strategies: "
+ ", ".join(f"{c.strategy}({c.confidence:.3f})" for c in candidates))
return best
def _try_template(self, conv_result: ConvergenceResult,
query_vector: np.ndarray = None,
query_words: list = None) -> Optional[GenerationResult]:
"""
Strategy A: Find closest template, fill slots with concept words.
"""
if self.template_store.count() == 0:
return None
# Score templates by structural word overlap with the QUERY TEXT
# (not concepts — concepts include noise like "the", "is").
# A template's structural words should appear in what the user asked.
query_word_set = set(query_words) if query_words else set()
search_vec = query_vector if query_vector is not None else conv_result.vector
scored_templates = []
for t in self.template_store.templates:
struct_words = t.structural_words
struct_total = len(struct_words) or 1
if query_word_set:
# Match template structural words against query words
struct_overlap = sum(
1 for w in struct_words if w in query_word_set
)
overlap_score = struct_overlap / struct_total
else:
# No query words available — use concept-based matching
concept_word_set = set(self._neurons_to_words(conv_result.concepts))
struct_overlap = sum(
1 for w in struct_words if w in concept_word_set
)
overlap_score = struct_overlap / struct_total
# Vector similarity
vec_sim = float(np.dot(search_vec, t.vector) /
(np.linalg.norm(search_vec) * np.linalg.norm(t.vector) + 1e-10))
# Combined: overlap is dominant, vector similarity breaks ties
combined = overlap_score * 0.75 + max(vec_sim, 0) * 0.25
if overlap_score > 0:
scored_templates.append((t, combined, overlap_score))
scored_templates.sort(key=lambda x: x[1], reverse=True)
templates = [t for t, _, _ in scored_templates[:3]]
if not templates:
# Fallback to pure vector search if no structural overlap
templates = self.template_store.search(search_vec, k=3)
if not templates:
return None
concepts = conv_result.concepts
concept_words = self._neurons_to_words(concepts)
# Sort concepts by TAUGHT sentence order when available.
# The sentence_neurons table records the position each word was
# taught in. Using this order preserves the original sentence
# structure, so template slots get filled correctly:
# taught "paris is the capital of france" → positions [paris=0, capital=1, france=2]
# template "[S0] is the [S1] of [S2]" → S0=paris, S1=capital, S2=france
# Without this, sorting by query similarity puts "capital" first
# (highest sim to "capital of france") → garbled output.
concept_ids = [c.id for c in concepts]
sentence_order = self._get_sentence_order(concept_ids)
if sentence_order:
# Sort by taught position
concept_pairs = list(zip(concepts, concept_words))
concept_pairs.sort(
key=lambda p: sentence_order.get(p[0].id, 999),
)
concepts = [p[0] for p in concept_pairs]
concept_words = [p[1] for p in concept_pairs]
elif query_vector is not None:
# Fallback: sort by relevance to query
concept_pairs = list(zip(concepts, concept_words))
concept_pairs.sort(
key=lambda p: float(np.dot(p[0].vector, query_vector)),
reverse=True,
)
concepts = [p[0] for p in concept_pairs]
concept_words = [p[1] for p in concept_pairs]
for template in templates:
slot_fills = self._match_slots(template, concept_words, concepts)
unfilled = template.unfilled_slots(slot_fills)
if not unfilled:
# All slots filled
text = template.fill(slot_fills)
return GenerationResult(
text=text,
strategy="template",
confidence=template.confidence * conv_result.confidence,
template_used=template,
slot_fills=slot_fills,
trace=[
f"Template matched: {template.pattern}",
f"Slot fills: {slot_fills}",
f"Concepts: {concept_words}",
],
)
# No template fully filled — partial fill of best match
best = templates[0]
slot_fills = self._match_slots(best, concept_words)
unfilled = best.unfilled_slots(slot_fills)
if slot_fills: # at least some slots filled
text = best.fill(slot_fills)
# Replace unfilled slots with "..."
for name in unfilled:
text = text.replace(f"[{name}]", "...")
return GenerationResult(
text=text,
strategy="template",
confidence=best.confidence * conv_result.confidence * 0.5,
template_used=best,
slot_fills=slot_fills,
trace=[
f"Template partial: {best.pattern}",
f"Filled: {slot_fills}, unfilled: {unfilled}",
],
)
return None # no slots matched at all
def _try_sentence_chain(self, concepts: list, query_vector: np.ndarray = None,
query_words: list = None) -> Optional[GenerationResult]:
"""
Strategy B: Sentence-constrained chain retrieval.
Instead of walking the successor graph freely (which picks wrong chains
at ambiguous nodes like "the"), use the sentence_neurons table to find
which taught sentence best matches the query, then output that sentence's
content words in their taught order.
This is the key insight from the convergence analysis: the data IS in the
graph (72% reconstructable), the problem is finding the right chain.
The sentence table tells us which neurons were taught together — that's
the constraint that eliminates ambiguity.
"""
if not concepts:
return None
# Find concept neuron IDs
concept_ids = [c.id for c in concepts]
# Find sentences containing these concepts, scored by coverage
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return None
# Score: how many of the query-relevant concepts does each sentence contain?
scored = []
for sid, matched_neurons in sentences.items():
coverage = len(matched_neurons)
# Also check query vector relevance if available
sent_neurons = self.db.get_sentence_neurons(sid)
if not sent_neurons:
continue
relevance = 0.0
if query_vector is not None:
# Centroid of sentence neurons vs query
vecs = []
for nid, pos in sent_neurons:
n = self.db.get(nid)
if n is not None:
vecs.append(n.vector)
if vecs:
centroid = np.mean(vecs, axis=0).astype(np.float32)
norm = np.linalg.norm(centroid)
if norm > 0:
centroid = centroid / norm
relevance = float(np.dot(centroid, query_vector))
score = coverage * 0.4 + max(relevance, 0) * 0.6
scored.append((sid, score, sent_neurons))
if not scored:
return None
scored.sort(key=lambda x: x[1], reverse=True)
# Get word mappings for neuron → word conversion
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
# Try the top-scoring sentences
for sid, score, sent_neurons in scored[:3]:
# Output neurons in taught position order
ordered = sorted(sent_neurons, key=lambda x: x[1])
words = []
for nid, pos in ordered:
w = neuron_to_word.get(nid)
if not w:
w = self._neuron_to_word(self.db.get(nid)) if self.db.get(nid) else None
if w and not w.startswith("__"):
words.append(w)
if len(words) >= 2:
text = " ".join(words)
avg_conf = np.mean([
self.db.get(nid).confidence
for nid, _ in ordered
if self.db.get(nid) is not None
]) if ordered else 0.5
return GenerationResult(
text=text,
strategy="sentence_chain",
confidence=float(avg_conf) * 0.8,
trace=[
f"Sentence chain: sid={sid}, score={score:.3f}",
f"Coverage: {len(sentences[sid])} query concepts matched",
f"Words: {words}",
],
)
return None
def _try_successor_walk(self, concepts: list[Neuron],
max_tokens: int,
query_vector: np.ndarray = None) -> Optional[GenerationResult]:
"""
Strategy B: Convergence-guided sentence generation.
Each token position is a decision point:
1. FAST PATH: current neuron has a high-confidence successor → emit it
(grammar tokens: "is", "the", "of" — predictable from word order)
2. SLOW PATH: run a mini convergence loop where the search vector
blends query (what was asked) + context (tokens emitted so far).
The convergence result is intersected with successor candidates
to pick the token that is both grammatically valid AND relevant.
This is the decoder from the design spec:
- Successor graph = what CAN follow (grammar constraint)
- Convergence = what SHOULD follow (semantic relevance)
- Query anchor = stay on topic across the whole sentence
- Context accumulation = each token enriches the next search
Stop when: no successors, convergence fails, or max tokens.
"""
if not concepts:
return None
# Pick starting concept: most relevant to the query, not just highest confidence
if query_vector is not None:
start_concept = max(
concepts,
key=lambda n: float(np.dot(n.vector, query_vector))
)
else:
start_concept = max(concepts, key=lambda n: n.confidence)
start = self.db.get(start_concept.id)
if start is None:
return None
tokens = []
trace = []
emitted_neurons = [] # vectors of emitted tokens, for context
current_id = start.id
visited = {current_id}
# Add start word
start_word = self._neuron_to_word(start)
if start_word:
tokens.append(start_word)
emitted_neurons.append(start)
trace.append(f"Start: {start_word} (n{start.id}, conf={start.confidence:.2f})")
consecutive_low_relevance = 0 # track drift from query
had_jump = False # whether we've crossed a sentence boundary
jump_count = 0 # number of convergence jumps taken
max_jumps = MAX_CONVERGENCE_JUMPS
current_sentence_ids = None # sentences the current neuron belongs to
for step in range(max_tokens - 1):
current = self.db.get(current_id)
if current is None:
break
# Get unvisited successors
candidates = [(sid, sc) for sid, sc in current.successors
if sid not in visited]
if not candidates:
# No successors — try convergence to find a continuation
if (query_vector is not None
and len(tokens) < max_tokens - 1
and jump_count < max_jumps):
next_neuron = self._converge_next_token(
query_vector, emitted_neurons, visited
)
if next_neuron is not None:
word = self._neuron_to_word(next_neuron)
if word:
tokens.append(word)
emitted_neurons.append(next_neuron)
trace.append(
f"Step {step + 1}: {word} (n{next_neuron.id}, "
f"converge-jump {jump_count + 1}/{max_jumps}, "
f"conf={next_neuron.confidence:.2f})"
)
visited.add(next_neuron.id)
current_id = next_neuron.id
had_jump = True
jump_count += 1
consecutive_low_relevance = 0
# Track which sentence(s) the jump target belongs to
rows = self.db.get_cooccurring_neurons(next_neuron.id)
current_sentence_ids = {r[2] for r in rows} if rows else None
continue
break
# FAST PATH: best successor confidence above threshold → grammar token
best_id, best_conf = max(candidates, key=lambda s: s[1])
if best_conf >= GRAMMAR_CONFIDENCE_THRESHOLD:
succ = self.db.get(best_id)
if succ is None:
break
word = self._neuron_to_word(succ)
if not word:
break
# Sentence-aware filtering: after a jump, check if this
# successor belongs to the same sentence as the jump target.
# If it's from a different sentence, we've crossed a boundary
# into unrelated content — stop.
if had_jump and current_sentence_ids is not None:
succ_rows = self.db.get_cooccurring_neurons(best_id)
succ_sentences = {r[2] for r in succ_rows} if succ_rows else set()
if succ_sentences and not (succ_sentences & current_sentence_ids):
# Different sentence — stop here
trace.append(
f"Stop: sentence boundary at step {step + 1} "
f"({word} belongs to different sentence)"
)
break
# Relevance drift check (backup for neurons without sentence data)
if had_jump and current_sentence_ids is None and query_vector is not None:
token_rel = float(np.dot(succ.vector, query_vector) /
(np.linalg.norm(succ.vector) *
np.linalg.norm(query_vector) + 1e-10))
if token_rel < 0.25:
consecutive_low_relevance += 1
else:
consecutive_low_relevance = 0
if consecutive_low_relevance >= 2:
trim = min(consecutive_low_relevance, len(tokens))
tokens = tokens[:-trim]
emitted_neurons = emitted_neurons[:-trim]
trace.append(f"Stop: post-jump drift after {len(tokens)} tokens")
break
tokens.append(word)
emitted_neurons.append(succ)
trace.append(
f"Step {step + 1}: {word} (n{best_id}, "
f"fast, succ_conf={best_conf:.2f})"
)
visited.add(best_id)
current_id = best_id
continue
# SLOW PATH: convergence-guided selection among successors
if query_vector is not None:
chosen = self._convergence_pick(
query_vector, emitted_neurons, candidates
)
else:
chosen = None
if chosen is None:
# Fallback: just take the best successor
chosen_id, chosen_conf = best_id, best_conf
else:
chosen_id, chosen_conf = chosen
succ = self.db.get(chosen_id)
if succ is None:
break
word = self._neuron_to_word(succ)
if not word:
break
speed = "converge" if chosen is not None else "fallback"
tokens.append(word)
# Relevance stopping: if content tokens drift away from query, stop.
# Grammar tokens (fast path) don't count — "the", "of" are always low-relevance.
if query_vector is not None:
token_relevance = float(np.dot(succ.vector, query_vector) /
(np.linalg.norm(succ.vector) *
np.linalg.norm(query_vector) + 1e-10))
if token_relevance < 0.2:
consecutive_low_relevance += 1
else:
consecutive_low_relevance = 0
# Two consecutive low-relevance content tokens = we've drifted off topic
if consecutive_low_relevance >= 2:
# Remove the drifted tokens
tokens = tokens[:-consecutive_low_relevance]
emitted_neurons = emitted_neurons[:-consecutive_low_relevance]
trace.append(f"Stop: relevance drift after {len(tokens)} tokens")
break
emitted_neurons.append(succ)
trace.append(
f"Step {step + 1}: {word} (n{chosen_id}, "
f"{speed}, conf={chosen_conf:.2f})"
)
visited.add(chosen_id)
current_id = chosen_id
if len(tokens) < 2:
return None
text = " ".join(tokens)
avg_conf = np.mean([n.confidence for n in emitted_neurons])
return GenerationResult(
text=text,
strategy="successor",
confidence=float(avg_conf) * 0.6,
trace=trace,
)
def _build_context_vector(self, query_vector: np.ndarray,
emitted: list) -> np.ndarray:
"""
Build a search vector that blends query intent with generation context.
Early in generation: mostly query (stay on topic).
Later: more context (maintain coherence with what's been said).
But query never drops below 40% — it's the anchor.
"""
if not emitted:
return query_vector
# Context = average of emitted neuron vectors
context = np.mean([n.vector for n in emitted], axis=0).astype(np.float32)
norm = np.linalg.norm(context)
if norm > 0:
context = context / norm
# Query weight decreases but floors at 0.4
query_weight = max(QUERY_ANCHOR_FLOOR, 1.0 - len(emitted) * 0.1)
blended = query_weight * query_vector + (1 - query_weight) * context
norm = np.linalg.norm(blended)
if norm > 0:
blended = blended / norm
return blended
def _convergence_pick(self, query_vector: np.ndarray,
emitted: list,
candidates: list) -> Optional[tuple]:
"""
Pick the best successor using convergence-guided scoring.
Scores each candidate by how well it fits the query+context blend.
Returns (neuron_id, score) or None if no candidate is relevant.
"""
search_vec = self._build_context_vector(query_vector, emitted)
# Fetch all candidate neurons
valid = []
for cand_id, succ_conf in candidates:
cand = self.db.get(cand_id)
if cand is not None:
valid.append((cand_id, succ_conf, cand))
if not valid:
return None
# Vectorized scoring
vectors = np.array([c.vector for _, _, c in valid], dtype=np.float32)
succ_confs = np.array([sc for _, sc, _ in valid], dtype=np.float32)
norms = np.linalg.norm(vectors, axis=1)
search_norm = np.linalg.norm(search_vec)
sims = (vectors @ search_vec) / (norms * search_norm + 1e-10)
scores = sims * 0.6 + succ_confs * 0.4
best_idx = int(np.argmax(scores))
if scores[best_idx] > 0.0:
return (valid[best_idx][0], float(scores[best_idx]))
return None
def _converge_next_token(self, query_vector: np.ndarray,
emitted: list,
visited: set) -> Optional[Neuron]:
"""
When successor chain runs out, use convergence to jump to
a new concept that's relevant to the query+context.
This enables cross-sentence reasoning: the system can chain
concepts that weren't explicitly connected by successor edges.
Tighter than general search — requires meaningful query relevance
to prevent drifting into unrelated KB regions. Also filters out
generic/function words (the, is, of) which have high similarity
to everything in GloVe but carry no content.
"""
search_vec = self._build_context_vector(query_vector, emitted)
# Search the DB for neurons near the blended vector
neighbors = self.db.search(search_vec, k=10)
best_candidate = None
best_score = 0.0
# Pre-load word mappings to avoid expensive nearest_words calls
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
# Filter candidates: not visited, not function words
valid = []
for n in neighbors:
if n.id in visited:
continue
word = neuron_to_word.get(n.id)
if word and self._is_function_word(word):
continue
if n.confidence > 0.1:
valid.append(n)
if not valid:
return None
# Vectorized: batch cosine similarities against query and context
vectors = np.array([n.vector for n in valid], dtype=np.float32)
norms = np.linalg.norm(vectors, axis=1)
query_sims = (vectors @ query_vector) / (norms * np.linalg.norm(query_vector) + 1e-10)
ctx_sims = (vectors @ search_vec) / (norms * np.linalg.norm(search_vec) + 1e-10)
# Apply thresholds and score
mask = (query_sims > 0.3) & (ctx_sims > 0.25)
if not np.any(mask):
return None
scores = query_sims * 0.6 + ctx_sims * 0.4
scores = np.where(mask, scores, -np.inf)
best_idx = int(np.argmax(scores))
if scores[best_idx] > 0:
return valid[best_idx]
return None
@staticmethod
def _is_function_word(word: str) -> bool:
"""Check if a word is a function/grammar word that shouldn't start a jump."""
return word.lower() in FUNCTION_WORDS
# --- Paragraph Generation ---
def generate_paragraph(self, query_vector: np.ndarray,
convergence_loop: ConvergenceLoop,
max_sentences: int = 5,
max_tokens_per_sentence: int = 15,
query_words: list = None,
sentence_separator: str = ". ") -> GenerationResult:
"""
Generate a multi-sentence paragraph via convergence-driven retrieval.
The key insight: convergence finds WHAT to say. The sentence_neurons
table (taught word order) determines HOW to say it. No separate
decoder needed — convergence IS the decoder at the sentence level.
Phase 1 — CONVERGE: find concepts relevant to the query.
Phase 2 — RETRIEVE SENTENCES: find taught sentences containing
those concepts, ranked by query coverage.
Phase 3 — RENDER: output each sentence's neurons in taught order,
mapped back to words. The original word order IS grammar.
This avoids the drift problem entirely — we don't generate token
by token, we retrieve whole sentences that were taught correctly
and output them in the order they were taught.
"""
# Phase 1: Converge to find relevant concepts
result = convergence_loop.converge(query_vector)
if not result.converged or not result.concepts:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=["Paragraph planning: convergence failed"],
)
# Enrich with per-word matches
all_concepts = list(result.concepts)
seen_ids = {c.id for c in all_concepts}
if query_words:
for token in query_words:
wv = self.encoder.encode_word(token)
if not np.all(wv == 0):
for n in self.db.search(wv, k=3):
if n.id not in seen_ids:
sim = float(np.dot(n.vector, wv))
if sim > 0.3:
all_concepts.append(n)
seen_ids.add(n.id)
# Phase 2: Find sentences containing these concepts
concept_ids = [c.id for c in all_concepts]
sentences_map = self.db.get_sentences_for_neurons(concept_ids)
if not sentences_map:
# No sentence structure — fall back to single-sentence generation
conv_result = ConvergenceResult(
converged=True, vector=result.vector,
concepts=all_concepts, confidence=result.confidence,
)
return self.generate(conv_result, max_tokens=max_tokens_per_sentence,
query_vector=query_vector, query_words=query_words)
# Score sentences by:
# 1. Query coverage (how many query-relevant concepts are in this sentence)
# 2. Relevance of the sentence's centroid to the query
scored_sentences = []
for sid, matched_neurons in sentences_map.items():
# Get ALL neurons in this sentence (not just the ones that matched)
full_sentence = self.db.get_sentence_neurons(sid)
if not full_sentence:
continue
# Coverage score: what fraction of the query concepts does this sentence contain?
coverage = len(matched_neurons) / max(len(concept_ids), 1)
# Relevance: centroid of sentence neurons vs query
sent_neurons = []
for nid, pos in full_sentence:
n = self.db.get(nid)
if n is not None:
sent_neurons.append((n, pos))
if not sent_neurons:
continue
centroid = np.mean([n.vector for n, _ in sent_neurons], axis=0).astype(np.float32)
norm = np.linalg.norm(centroid)
if norm > 0:
centroid = centroid / norm
relevance = float(np.dot(centroid, query_vector))
score = coverage * 0.4 + max(relevance, 0) * 0.6
scored_sentences.append((sid, score, sent_neurons))
scored_sentences.sort(key=lambda x: x[1], reverse=True)
# Phase 3: Render top sentences in taught word order
rendered = []
trace = [f"Plan: {len(scored_sentences)} candidate sentences from {len(all_concepts)} concepts"]
used_sids = set()
# Relevance floor: only include sentences scoring at least 50%
# of the best sentence's score. Prevents noise sentences.
best_score = scored_sentences[0][1] if scored_sentences else 0
relevance_floor = best_score * PARAGRAPH_RELEVANCE_FLOOR
for sid, score, sent_neurons in scored_sentences[:max_sentences]:
if sid in used_sids:
continue
if score < relevance_floor:
break # sorted by score, so all remaining are worse
used_sids.add(sid)
# Sort by position (the taught order)
sent_neurons.sort(key=lambda x: x[1])
# Map neurons to words
words = []
for n, pos in sent_neurons:
word = self._neuron_to_word(n)
if word:
words.append(word)
if words:
sentence_text = " ".join(words)
# Skip duplicates
if sentence_text not in rendered:
rendered.append(sentence_text)
trace.append(f"S{len(rendered)} (score={score:.3f}): {sentence_text}")
if not rendered:
return GenerationResult(
text=ABSTAIN_MESSAGE,
strategy="abstain",
confidence=0.0,
trace=trace + ["No sentences rendered"],
)
# Join sentences using the provided separator.
# Default is ". " — but this is configurable, not hardcoded behavior.
# Future: teach punctuation as neurons so boundaries emerge from data.
paragraph = sentence_separator.join(rendered)
avg_conf = float(np.mean([c.confidence for c in all_concepts]))
return GenerationResult(
text=paragraph,
strategy="paragraph",
confidence=avg_conf * 0.7,
trace=trace,
)
def _cluster_by_sentence(self, concepts: list) -> list:
"""
Group concepts by sentence co-occurrence.
Neurons taught together in the same sentence belong to the same
cluster. This gives natural topic boundaries — each taught sentence
is one "idea unit."
Returns list of clusters, each cluster = list of neurons.
"""
# Map neuron_id → set of sentence_ids
neuron_sentences = {}
for c in concepts:
rows = self.db.get_cooccurring_neurons(c.id)
sentence_ids = {r[2] for r in rows} # r = (neuron_id, position, sentence_id)
if sentence_ids:
neuron_sentences[c.id] = sentence_ids
if not neuron_sentences:
return []
# Group: neurons sharing any sentence_id go together
# Build adjacency from shared sentences
concept_map = {c.id: c for c in concepts}
assigned = set()
clusters = []
for c in concepts:
if c.id in assigned:
continue
if c.id not in neuron_sentences:
continue
# BFS to find all neurons connected by shared sentences
cluster = []
queue = [c.id]
while queue:
nid = queue.pop(0)
if nid in assigned:
continue
assigned.add(nid)
if nid in concept_map:
cluster.append(concept_map[nid])
# Find co-occurring neurons
for other_id, sids in neuron_sentences.items():
if other_id not in assigned:
if sids & neuron_sentences.get(nid, set()):
queue.append(other_id)
if cluster:
clusters.append(cluster)
# Add unclustered concepts as individual clusters
for c in concepts:
if c.id not in assigned:
clusters.append([c])
return clusters
def _concept_list(self, concepts: list[Neuron],
confidence: float) -> GenerationResult:
"""
Strategy C: Return raw concepts. Always works. No fluency.
"""
words = self._neurons_to_words(concepts)
trace = [f"n{c.id}{w} (conf={c.confidence:.2f})"
for c, w in zip(concepts, words) if w]
return GenerationResult(
text=", ".join(w for w in words if w),
strategy="concept_list",
confidence=confidence * 0.9,
trace=["Concept list fallback"] + trace,
)
def _get_sentence_order(self, concept_ids: list) -> dict:
"""
Find the taught sentence that best covers these concepts and
return a position map: {neuron_id: taught_position}.
When concepts come from a taught sentence, this preserves the
original word order for template slot filling. If no sentence
covers enough concepts, returns empty dict (fall back to other
ordering methods).
"""
if len(concept_ids) < 2:
return {}
sentences = self.db.get_sentences_for_neurons(concept_ids)
if not sentences:
return {}
# Score by coverage: how many of our concepts are in this sentence?
best_sid = None
best_coverage = 0
for sid, matched_nids in sentences.items():
coverage = len(matched_nids)
if coverage > best_coverage:
best_coverage = coverage
best_sid = sid
# Need at least 2 concepts matched to trust the ordering
if best_coverage < 2:
return {}
# Get positions from the best sentence
sent_neurons = self.db.get_sentence_neurons(best_sid)
return {nid: pos for nid, pos in sent_neurons}
def _neurons_to_words(self, neurons: list[Neuron]) -> list[str]:
"""Map neurons back to nearest words via encoder."""
words = []
for n in neurons:
word = self._neuron_to_word(n)
words.append(word if word else f"<n{n.id}>")
return words
def _neuron_to_word(self, neuron: Neuron) -> Optional[str]:
"""Find the closest word to a neuron's vector.
First checks the word→neuron mapping (works for any dimension).
Falls back to encoder nearest_words (requires matching dimensions).
"""
# Fast path: direct lookup from DB mapping
word_map = self.db.load_word_mappings()
neuron_to_word = {nid: w for w, nid in word_map.items()}
label = neuron_to_word.get(neuron.id)
if label and not label.startswith("__"):
return label
# Fallback: encoder nearest word (only works if dimensions match)
try:
nearest = self.encoder.nearest_words(neuron.vector, k=1)
if nearest:
return nearest[0][0]
except (ValueError, RuntimeError):
pass # dimension mismatch (e.g., CLIP 512 vs GloVe 300)
return None
def _match_slots(self, template: Template,
concept_words: list[str],
concept_neurons: list = None) -> dict:
"""
Match concept words to template slots using the successor graph.
The template has structural words (e.g., "wrote" in "[PERSON] wrote [WORK]").
We find the structural word's neuron in the DB, then use its
predecessor/successor relationships to determine which concepts
go in which slots.
Example: "shakespeare wrote hamlet" was taught as a sentence.
The "wrote" neuron has predecessor=[shakespeare] and successor=[hamlet].
Template "[PERSON] wrote [WORK]" → PERSON=predecessor, WORK=successor.
This is the key insight: the successor graph encodes word order,
and word order encodes semantic roles.
"""
fills = {}
# Use pre-parsed template structure
slot_order = template.structure
struct_words = template.structural_words
# Try graph-based assignment: find structural words in concepts,
# use their predecessors/successors to fill adjacent slots
fills = {}
if concept_neurons and struct_words:
fills = self._match_slots_by_graph(
template, slot_order, concept_neurons
)
# Fill remaining unfilled slots by position (hybrid approach)
unfilled = template.unfilled_slots(fills)
if unfilled:
position_fills = self._match_slots_by_position(
template, concept_words, slot_order
)
for slot_name in unfilled:
if slot_name in position_fills:
# Don't reuse words already assigned by graph
if position_fills[slot_name] not in fills.values():
fills[slot_name] = position_fills[slot_name]
return fills
def _match_slots_by_graph(self, template: Template,
slot_order: list,
concept_neurons: list) -> dict:
"""
Use successor/predecessor graph to assign concepts to slots.
Find structural words among the concepts. For each slot adjacent
to a structural word, look at the predecessor (if slot is before)
or successor (if slot is after) of that structural word's neuron.
"""
fills = {}
concept_words_map = {} # neuron_id → word
for n in concept_neurons:
w = self._neuron_to_word(n)
if w:
concept_words_map[n.id] = w
# Find structural word neurons in the concept set
struct_neurons = {}
for n in concept_neurons:
word = concept_words_map.get(n.id, "")
for _, struct_word in [p for p in slot_order if p[0] == "word"]:
if word == struct_word:
struct_neurons[struct_word] = n
break
if not struct_neurons:
return {}
# Walk the slot_order and fill based on graph relationships
for i, (kind, name) in enumerate(slot_order):
if kind != "slot":
continue
# Map template slot name to actual template slot name (case)
actual_slot = None
for sn in template.slots:
if sn.upper() == name:
actual_slot = sn
break
if not actual_slot:
continue
# Find adjacent structural word
# Look right for a structural word after this slot
for j in range(i + 1, len(slot_order)):
if slot_order[j][0] == "word":
struct_word = slot_order[j][1]
struct_n = struct_neurons.get(struct_word)
if struct_n:
# This slot is BEFORE the structural word
# → fill with predecessor
struct_fresh = self.db.get(struct_n.id)
if struct_fresh and struct_fresh.predecessors:
for pred_id in struct_fresh.predecessors:
word = concept_words_map.get(pred_id)
if word and word not in fills.values():
fills[actual_slot] = word
break
break
if actual_slot in fills:
continue
# Look left for a structural word before this slot
for j in range(i - 1, -1, -1):
if slot_order[j][0] == "word":
struct_word = slot_order[j][1]
struct_n = struct_neurons.get(struct_word)
if struct_n:
# This slot is AFTER the structural word
# → fill with successor
struct_fresh = self.db.get(struct_n.id)
if struct_fresh and struct_fresh.successors:
for succ_id, _ in struct_fresh.successors:
word = concept_words_map.get(succ_id)
if word and word not in fills.values():
fills[actual_slot] = word
break
break
return fills
def _match_slots_by_position(self, template: Template,
concept_words: list,
slot_order: list,
query_vector: np.ndarray = None) -> dict:
"""
Fallback: assign content words to slots by relevance to query.
Filters out stop words and template structural words, then
assigns remaining content words to slots. If a query vector
is provided, sorts candidates by relevance to query.
"""
STOP_WORDS = FUNCTION_WORDS
pattern_words = set(template.structural_words)
content = [w for w in concept_words
if w.lower() not in STOP_WORDS and w.lower() not in pattern_words]
if not content:
content = [w for w in concept_words if w.lower() not in STOP_WORDS]
if not content:
content = list(concept_words)
# Deduplicate while preserving order
seen = set()
unique_content = []
for w in content:
if w not in seen:
seen.add(w)
unique_content.append(w)
content = unique_content
fills = {}
available = list(content)
for slot_name, slot_type in template.slots.items():
if not available:
break
matched = None
if slot_type == "number":
for w in available:
if w.isdigit() or _is_number(w):
matched = w
break
if matched is None:
matched = available[0]
fills[slot_name] = matched
available.remove(matched)
return fills
def _is_number(s: str) -> bool:
try:
float(s)
return True
except ValueError:
return False