guru / sparse_convergence.py
tejadabheja's picture
Upload folder using huggingface_hub
d38c1d3 verified
"""
Sparse Convergence: multi-hop reasoning over sparse co-occurrence graphs.
The dense convergence loop (convergence.py) operates on numpy vectors via
NeuronDB.search(). This module does the same math — softmax weighting,
per-hop specialization, mutual attention, query anchoring — but natively
on sparse dicts (word_idx → weight). No dense vectors. No NeuronDB.
Transformer correspondence (same as convergence.py):
- _softmax_blend() → scaled dot-product attention
- _mutual_attention() → token-to-token self-attention
- query anchor → residual connection
- per-hop k/threshold → layer specialization
- convergence check → stopping criterion
Input: word indices (from tokenized query)
Output: list of (word_idx, confidence) pairs + inspectable trace
"""
import math
from dataclasses import dataclass, field
import numpy as np
try:
from scipy.sparse import csr_matrix as _scipy_csr
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
@dataclass
class SparseHop:
"""One step in the sparse convergence trace."""
hop_number: int
neighbors: list # [(word_idx, similarity)]
query_profile: dict # sparse profile at this hop (before anchor)
current: dict # sparse profile after anchor blend
movement: float # 1 - cosine(current, previous)
@dataclass
class SparseConvergenceResult:
"""Result of a sparse convergence loop."""
converged: bool
concepts: list # [(word_idx, confidence)] — final concept set
hops: list = field(default_factory=list)
confidence: float = 0.0
def trace(self) -> str:
lines = []
for hop in self.hops:
nb_str = ", ".join(
f"w{idx}({sim:.3f})" for idx, sim in hop.neighbors[:5]
)
lines.append(
f" Hop {hop.hop_number}: [{nb_str}] movement={hop.movement:.4f}"
)
status = "CONVERGED" if self.converged else "DID NOT CONVERGE"
lines.insert(0, f"SparseConvergence: {status} (confidence={self.confidence:.3f})")
return "\n".join(lines)
@dataclass
class SparseMultiHopResult:
"""Result of multi-hop sparse reasoning across rounds."""
converged: bool
concepts: list
rounds: list = field(default_factory=list)
confidence: float = 0.0
def trace(self) -> str:
lines = []
for i, r in enumerate(self.rounds):
lines.append(f"=== Round {i + 1} ===")
lines.append(r.trace())
status = "CONVERGED" if self.converged else "DID NOT CONVERGE"
lines.insert(0,
f"SparseMultiHop: {status} in {len(self.rounds)} round(s), "
f"{len(self.concepts)} concepts (confidence={self.confidence:.3f})"
)
return "\n".join(lines)
# --- Sparse math utilities ---
def sparse_norm(d: dict) -> float:
"""L2 norm of a sparse vector."""
if not d:
return 0.0
return math.sqrt(sum(v * v for v in d.values()))
def sparse_cosine(a: dict, b: dict) -> float:
"""Cosine similarity between two sparse dicts. O(min(|a|, |b|))."""
if not a or not b:
return 0.0
if len(a) > len(b):
a, b = b, a
dot = sum(v * b.get(k, 0) for k, v in a.items())
if dot == 0:
return 0.0
na = sparse_norm(a)
nb = sparse_norm(b)
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
def sparse_blend(profiles: list, weights: list = None) -> dict:
"""Weighted average of multiple sparse profiles."""
if not profiles:
return {}
if weights is None:
weights = [1.0] * len(profiles)
total_w = sum(weights)
if total_w == 0:
return {}
result = {}
for profile, w in zip(profiles, weights):
for k, v in profile.items():
result[k] = result.get(k, 0) + w * v
for k in result:
result[k] /= total_w
return result
def sparse_add(a: dict, b: dict, alpha: float, beta: float) -> dict:
"""Compute alpha*a + beta*b as sparse dicts."""
result = {}
for k, v in a.items():
result[k] = alpha * v
for k, v in b.items():
result[k] = result.get(k, 0) + beta * v
return result
def sparse_normalize(d: dict) -> dict:
"""L2-normalize a sparse dict."""
n = sparse_norm(d)
if n == 0:
return dict(d)
return {k: v / n for k, v in d.items()}
class SparseConvergenceLoop:
"""Multi-hop convergence over sparse co-occurrence graphs.
Each hop:
1. Build sparse query profile from current concepts
2. Search for neighbors with highest cosine to query profile
3. Filter by per-hop confidence threshold (specializing)
4. NxN mutual attention among neighbors
5. Softmax-weighted blend into activation profile
6. Anchor back to original query (residual connection)
7. Check convergence (cosine(current, previous) > threshold)
The cooc dict is accessed by reference and supports lazy loading
via ensure_cooc callback.
"""
def __init__(self, cooc=None, word_idx=None, words=None, word_neurons=None,
ensure_cooc=None,
get_profile=None,
max_hops=10, k=5,
convergence_threshold=0.99,
min_confidence=0.1,
min_relevance=0.3,
temperature=1.0):
"""
Args:
cooc: dict of dicts — word_idx → {neighbor_idx: weight} (legacy)
word_idx: dict — word → index
words: list — index → word
word_neurons: dict — word → neuron_id
ensure_cooc: callable(word_idx) — lazy-loads cooc for a word (legacy)
get_profile: callable(word_idx) → dict — replaces cooc+ensure_cooc.
If provided, cooc and ensure_cooc are ignored.
max_hops: maximum reasoning steps
k: neighbors per hop
convergence_threshold: cosine threshold for "stable"
min_confidence: minimum neighbor similarity to participate
min_relevance: minimum cosine between query and best neighbor
temperature: softmax temperature for blend weighting
"""
self.word_idx = word_idx
self.words = words
self.word_neurons = word_neurons
self.max_hops = max_hops
self.k = k
self.convergence_threshold = convergence_threshold
self.min_confidence = min_confidence
self.min_relevance = min_relevance
self.temperature = temperature
# Profile access: prefer get_profile callback, fall back to cooc dict
if get_profile is not None:
self._get_profile_fn = get_profile
else:
self.cooc = cooc or {}
self.ensure_cooc = ensure_cooc or (lambda idx: None)
self._get_profile_fn = None
def _get_word_profile(self, idx: int) -> dict:
"""Get a word's co-occurrence profile.
Uses get_profile callback if provided (CSR+WAL path),
else falls back to cooc dict + ensure_cooc (legacy path).
"""
if self._get_profile_fn is not None:
return self._get_profile_fn(idx)
self.ensure_cooc(idx)
return self.cooc.get(idx, {})
def _search(self, query_profile: dict, k: int) -> list:
"""Find top-k words by co-occurrence overlap with query profile.
Strategy: score each candidate by how much weight the query profile
assigns to it. This is O(|query_profile|) — no need to load cooc
for candidates. Each key in query_profile is a word_idx with a
weight; that weight IS the relevance score for that word.
Then for the top candidates, optionally load their cooc for a
more precise cosine (only for the top 2*k candidates).
"""
if not query_profile:
return []
# Phase 1: fast scoring — each word_idx in query_profile gets
# its weight as a direct relevance score. O(|query_profile|).
fast_scores = []
for widx, weight in query_profile.items():
if weight > 0:
fast_scores.append((widx, weight))
fast_scores.sort(key=lambda x: x[1], reverse=True)
# Phase 2: for top candidates, refine with full cosine.
# Load cooc only for top 2*k candidates — bounded work.
refine_count = min(2 * k, len(fast_scores))
top_candidates = fast_scores[:refine_count]
if not top_candidates:
return []
q_norm = sparse_norm(query_profile)
if q_norm == 0:
return []
refined = []
for widx, _ in top_candidates:
word_profile = self._get_word_profile(widx)
if not word_profile:
continue
dot = sum(query_profile.get(j, 0) * v for j, v in word_profile.items())
if dot > 0:
w_norm = sparse_norm(word_profile)
if w_norm > 0:
sim = dot / (q_norm * w_norm)
refined.append((widx, sim))
refined.sort(key=lambda x: x[1], reverse=True)
return refined[:k]
def _mutual_attention(self, neighbors: list) -> list:
"""NxN mutual attention among neighbors.
Each neighbor's similarity score is boosted by how much it
relates to the other neighbors (compositional reasoning).
neighbors: [(word_idx, similarity)]
Returns: [(word_idx, boosted_similarity)]
"""
if len(neighbors) <= 1:
return neighbors
# Get profiles
profiles = []
for widx, sim in neighbors:
profiles.append(self._get_word_profile(widx))
n = len(neighbors)
# Compute pairwise cosine
mutual_scores = []
for i in range(n):
total_sim = 0.0
for j in range(n):
if i != j:
total_sim += sparse_cosine(profiles[i], profiles[j])
mutual_scores.append(total_sim / max(n - 1, 1))
# Boost: original_sim * (1 + mutual_score)
boosted = []
for i, (widx, sim) in enumerate(neighbors):
boosted.append((widx, sim * (1.0 + mutual_scores[i])))
return boosted
def _softmax_blend(self, neighbors: list) -> dict:
"""Blend neighbor profiles using softmax over their similarity scores.
neighbors: [(word_idx, similarity)]
Returns: blended sparse profile
"""
if not neighbors:
return {}
sims = [sim for _, sim in neighbors]
max_sim = max(sims) if sims else 0
if self.temperature == float('inf'):
# Uniform weighting
weights = [1.0 / len(neighbors)] * len(neighbors)
elif self.temperature <= 0:
# Winner-take-all
weights = [0.0] * len(neighbors)
weights[sims.index(max_sim)] = 1.0
else:
# Softmax with temperature
scaled = [(s - max_sim) / self.temperature for s in sims]
exp_scaled = [math.exp(s) for s in scaled]
total = sum(exp_scaled)
if total > 0:
weights = [e / total for e in exp_scaled]
else:
weights = [1.0 / len(neighbors)] * len(neighbors)
# Blend profiles
profiles = []
for widx, _ in neighbors:
profiles.append(self._get_word_profile(widx))
return sparse_blend(profiles, weights)
def converge(self, query_word_indices: list,
query_weights: list = None) -> SparseConvergenceResult:
"""Run convergence on word indices.
Args:
query_word_indices: list of word indices from tokenized query
query_weights: optional per-word weights (default: position decay)
Returns:
SparseConvergenceResult with concepts and trace
"""
if not query_word_indices:
return SparseConvergenceResult(
converged=False, concepts=[], confidence=0.0
)
# Build initial query profile from word co-occurrence
if query_weights is None:
query_weights = [1.0 / (1.0 + 0.1 * i)
for i in range(len(query_word_indices))]
profiles = [self._get_word_profile(idx) for idx in query_word_indices]
query_profile = sparse_blend(profiles, query_weights)
query_profile = sparse_normalize(query_profile)
if not query_profile:
return SparseConvergenceResult(
converged=False, concepts=[], confidence=0.0
)
current = dict(query_profile)
hops = []
last_neighbors = []
for hop_num in range(self.max_hops):
previous = dict(current)
# Per-hop specialization
progress = hop_num / max(self.max_hops - 1, 1)
# Early: explore broadly (more neighbors)
# Later: focus narrowly (fewer neighbors)
hop_k = max(2, int(self.k * (1.5 - 0.7 * progress)))
# Early: accept weaker matches
# Later: require stronger matches
hop_min_sim = self.min_confidence * (1.0 + 0.5 * progress)
# 1. Search for neighbors
neighbors = self._search(current, k=hop_k)
# 2. Filter by minimum similarity
neighbors = [(widx, sim) for widx, sim in neighbors
if sim >= hop_min_sim]
if not neighbors:
return SparseConvergenceResult(
converged=False, concepts=last_neighbors,
hops=hops, confidence=0.0,
)
# 3. Mutual attention (NxN among neighbors)
neighbors = self._mutual_attention(neighbors)
# Re-sort after boosting
neighbors.sort(key=lambda x: x[1], reverse=True)
# 4. Softmax-weighted blend into activation
activation = self._softmax_blend(neighbors)
activation = sparse_normalize(activation)
# 5. Anchor to query (residual connection)
# Early hops: more activation (explore)
# Later hops: more query anchor (contract)
alpha = hop_num / self.max_hops # 0 → 1
current = sparse_add(activation, query_profile,
1.0 - alpha, alpha)
current = sparse_normalize(current)
# 6. Compute movement
movement = 1.0 - sparse_cosine(current, previous)
hops.append(SparseHop(
hop_number=hop_num,
neighbors=[(widx, sim) for widx, sim in neighbors],
query_profile=dict(activation),
current=dict(current),
movement=movement,
))
last_neighbors = neighbors
# 7. Check convergence
sim = sparse_cosine(current, previous)
if sim >= self.convergence_threshold:
# Check relevance: are the neighbors actually related to query?
best_relevance = max(
sparse_cosine(self._get_word_profile(widx), query_profile)
for widx, _ in neighbors
)
if best_relevance < self.min_relevance:
return SparseConvergenceResult(
converged=False,
concepts=[(widx, sim) for widx, sim in last_neighbors],
hops=hops, confidence=0.0,
)
avg_sim = sum(s for _, s in neighbors) / len(neighbors)
return SparseConvergenceResult(
converged=True,
concepts=[(widx, sim) for widx, sim in last_neighbors],
hops=hops,
confidence=avg_sim,
)
# Did not converge
avg_sim = (sum(s for _, s in last_neighbors) / len(last_neighbors)
if last_neighbors else 0.0)
return SparseConvergenceResult(
converged=False,
concepts=[(widx, sim) for widx, sim in last_neighbors],
hops=hops,
confidence=avg_sim * 0.5, # penalize non-convergence
)
class VectorizedConvergenceLoop:
"""Multi-hop convergence using scipy spmv — uses all CPUs via BLAS.
Same algorithm as SparseConvergenceLoop but operates on numpy/scipy
instead of Python dicts. 5-10x faster on 4+ CPUs.
Requires a scipy.sparse.csr_matrix (from MMapCSR.scipy_matrix or
CSRWriteAheadLog.effective_scipy_matrix).
"""
def __init__(self, scipy_mat, words=None, word_idx=None, word_neurons=None,
max_hops=10, k=5,
convergence_threshold=0.99,
min_confidence=0.1,
min_relevance=0.3,
temperature=1.0):
self._mat = scipy_mat # scipy.sparse.csr_matrix (V x V)
self._V = scipy_mat.shape[0]
self.words = words
self.word_idx = word_idx
self.word_neurons = word_neurons
self.max_hops = max_hops
self.k = k
self.convergence_threshold = convergence_threshold
self.min_confidence = min_confidence
self.min_relevance = min_relevance
self.temperature = temperature
# Precompute row norms for cosine
self._row_norms = np.sqrt(
np.asarray(scipy_mat.multiply(scipy_mat).sum(axis=1)).ravel()
).astype(np.float32)
def update_matrix(self, scipy_mat):
"""Hot-swap the matrix (e.g., when WAL changes)."""
self._mat = scipy_mat
if scipy_mat.shape[0] != self._V:
self._V = scipy_mat.shape[0]
self._row_norms = np.sqrt(
np.asarray(scipy_mat.multiply(scipy_mat).sum(axis=1)).ravel()
).astype(np.float32)
def _search(self, query_vec: np.ndarray, k: int) -> list:
"""Top-k words by spmv attention. Uses all CPUs via BLAS."""
# scores = W @ q — sparse matrix-vector multiply
scores = np.asarray(self._mat @ query_vec).ravel()
q_norm = float(np.linalg.norm(query_vec))
if q_norm == 0:
return []
# Cosine = dot / (row_norm * q_norm)
with np.errstate(divide='ignore', invalid='ignore'):
cosines = scores / (self._row_norms * q_norm + 1e-10)
# Top-k
if len(cosines) <= k:
top_indices = np.argsort(-cosines)
else:
top_indices = np.argpartition(-cosines, k)[:k]
top_indices = top_indices[np.argsort(-cosines[top_indices])]
return [(int(idx), float(cosines[idx]))
for idx in top_indices if cosines[idx] > 0]
def _mutual_attention(self, neighbors: list) -> list:
"""NxN mutual attention via scipy submatrix multiply."""
if len(neighbors) <= 1:
return neighbors
indices = np.array([idx for idx, _ in neighbors], dtype=np.int32)
# Extract k rows as submatrix
sub = self._mat[indices, :]
# k×k similarity matrix in one call
sim_mat = (sub @ sub.T).toarray()
# Normalize to cosine
norms = self._row_norms[indices]
with np.errstate(divide='ignore', invalid='ignore'):
norm_outer = np.outer(norms, norms) + 1e-10
sim_mat = sim_mat / norm_outer
n = len(neighbors)
boosted = []
for i, (widx, sim) in enumerate(neighbors):
mutual = 0.0
for j in range(n):
if i != j:
mutual += sim_mat[i, j]
mutual /= max(n - 1, 1)
boosted.append((widx, sim * (1.0 + mutual)))
return boosted
def _softmax_blend(self, neighbors: list) -> np.ndarray:
"""Blend neighbor rows using softmax weights. Returns dense vector."""
if not neighbors:
return np.zeros(self._V, dtype=np.float32)
sims = np.array([s for _, s in neighbors], dtype=np.float32)
max_sim = sims.max()
if self.temperature <= 0:
weights = np.zeros_like(sims)
weights[sims.argmax()] = 1.0
elif self.temperature == float('inf'):
weights = np.ones_like(sims) / len(sims)
else:
scaled = (sims - max_sim) / self.temperature
exp_scaled = np.exp(scaled)
total = exp_scaled.sum()
weights = exp_scaled / total if total > 0 else np.ones_like(sims) / len(sims)
# Blend: profiles.T @ weights — one spmv
indices = np.array([idx for idx, _ in neighbors], dtype=np.int32)
profiles = self._mat[indices, :] # k × V sparse
blended = np.asarray(profiles.T @ weights).ravel()
return blended.astype(np.float32)
def converge(self, query_word_indices: list,
query_weights: list = None) -> SparseConvergenceResult:
"""Run vectorized convergence. Same interface as SparseConvergenceLoop."""
if not query_word_indices:
return SparseConvergenceResult(converged=False, concepts=[], confidence=0.0)
if query_weights is None:
query_weights = [1.0 / (1.0 + 0.1 * i)
for i in range(len(query_word_indices))]
# Build initial query vector from weighted blend of word rows
weights_arr = np.array(query_weights, dtype=np.float32)
weights_arr /= weights_arr.sum() + 1e-10
valid_indices = [idx for idx in query_word_indices if idx < self._V]
if not valid_indices:
return SparseConvergenceResult(converged=False, concepts=[], confidence=0.0)
query_rows = self._mat[valid_indices, :]
query_w = weights_arr[:len(valid_indices)]
query_profile = np.asarray(query_rows.T @ query_w).ravel().astype(np.float32)
# Normalize
qn = np.linalg.norm(query_profile)
if qn > 0:
query_profile /= qn
current = query_profile.copy()
hops = []
last_neighbors = []
for hop_num in range(self.max_hops):
previous = current.copy()
progress = hop_num / max(self.max_hops - 1, 1)
hop_k = max(2, int(self.k * (1.5 - 0.7 * progress)))
hop_min_sim = self.min_confidence * (1.0 + 0.5 * progress)
# 1. Search via spmv
neighbors = self._search(current, k=hop_k)
# 2. Filter by minimum similarity
neighbors = [(w, s) for w, s in neighbors if s >= hop_min_sim]
if not neighbors:
return SparseConvergenceResult(
converged=False, concepts=last_neighbors,
hops=hops, confidence=0.0)
# 3. Mutual attention
neighbors = self._mutual_attention(neighbors)
neighbors.sort(key=lambda x: x[1], reverse=True)
# 4. Softmax blend → activation vector
activation = self._softmax_blend(neighbors)
an = np.linalg.norm(activation)
if an > 0:
activation /= an
# 5. Query anchor (residual connection)
alpha = hop_num / self.max_hops
current = (1.0 - alpha) * activation + alpha * query_profile
cn = np.linalg.norm(current)
if cn > 0:
current /= cn
# 6. Movement
dot = float(np.dot(current, previous))
movement = 1.0 - min(dot, 1.0)
# Build sparse hop trace (convert to dict for compatibility)
hops.append(SparseHop(
hop_number=hop_num,
neighbors=[(w, s) for w, s in neighbors],
query_profile={}, # skip for perf — dict conversion expensive
current={},
movement=movement,
))
last_neighbors = neighbors
# 7. Convergence check
if dot >= self.convergence_threshold:
# Relevance check
best_idx = neighbors[0][0]
if best_idx < self._V:
row = np.asarray(self._mat[best_idx, :].todense()).ravel()
rn = self._row_norms[best_idx]
qn2 = np.linalg.norm(query_profile)
if rn > 0 and qn2 > 0:
best_relevance = float(np.dot(row, query_profile) / (rn * qn2))
else:
best_relevance = 0.0
else:
best_relevance = 0.0
if best_relevance < self.min_relevance:
return SparseConvergenceResult(
converged=False,
concepts=[(w, s) for w, s in last_neighbors],
hops=hops, confidence=0.0)
avg_sim = sum(s for _, s in neighbors) / len(neighbors)
return SparseConvergenceResult(
converged=True,
concepts=[(w, s) for w, s in last_neighbors],
hops=hops, confidence=avg_sim)
avg_sim = (sum(s for _, s in last_neighbors) / len(last_neighbors)
if last_neighbors else 0.0)
return SparseConvergenceResult(
converged=False,
concepts=[(w, s) for w, s in last_neighbors],
hops=hops, confidence=avg_sim * 0.5)
class SparseMultiHop:
"""Chains multiple SparseConvergenceLoop rounds.
Round 1: query → converge → concepts A
Round 2: query + concepts_A blend → converge → concepts B
Stop when: no new concepts or max rounds.
"""
def __init__(self, loop: SparseConvergenceLoop,
max_rounds: int = 3,
concept_blend_weight: float = 0.4):
self.loop = loop
self.max_rounds = max_rounds
self.concept_blend_weight = concept_blend_weight
def reason(self, query_word_indices: list,
query_weights: list = None) -> SparseMultiHopResult:
"""Run multi-hop sparse reasoning."""
if not query_word_indices:
return SparseMultiHopResult(
converged=False, concepts=[], confidence=0.0,
)
all_concepts = []
seen_indices = set()
rounds = []
# Build initial query profile
if query_weights is None:
query_weights = [1.0 / (1.0 + 0.1 * i)
for i in range(len(query_word_indices))]
current_indices = list(query_word_indices)
current_weights = list(query_weights)
for round_num in range(self.max_rounds):
result = self.loop.converge(current_indices, current_weights)
rounds.append(result)
# Collect new concepts
new_concepts = []
for widx, sim in result.concepts:
if widx not in seen_indices:
new_concepts.append((widx, sim))
seen_indices.add(widx)
all_concepts.extend(new_concepts)
# Stop conditions
if round_num == 0 and not result.converged and not result.concepts:
break
if not new_concepts and round_num > 0:
break
if round_num == self.max_rounds - 1:
break
# Prepare next round: add discovered concepts to query
if new_concepts:
w = self.concept_blend_weight
# Build new query: original words + discovered concepts
new_indices = [widx for widx, _ in new_concepts]
new_weights_list = [sim * w for _, sim in new_concepts]
# Original query words get (1-w) weight
orig_w = [(1 - w) * qw for qw in query_weights]
current_indices = list(query_word_indices) + new_indices
current_weights = orig_w + new_weights_list
any_converged = any(r.converged for r in rounds)
if all_concepts and any_converged:
avg_conf = sum(s for _, s in all_concepts) / len(all_concepts)
return SparseMultiHopResult(
converged=True,
concepts=all_concepts,
rounds=rounds,
confidence=avg_conf,
)
else:
return SparseMultiHopResult(
converged=False,
concepts=all_concepts,
rounds=rounds,
confidence=0.0,
)