""" Fast feeder: build brain LMDB using scipy sparse matrices instead of Python dicts. Python dicts cost ~70 bytes per edge. scipy sparse uses ~12 bytes per edge. 178M edges: 12GB in dicts vs 2.1GB in scipy. Processes ALL 1.29M records. Usage: python3 feed.py # all datasets python3 feed.py --limit 1000 # first 1000 records """ import sys, os, json, time, argparse, re, struct, gc from pathlib import Path from collections import defaultdict import numpy as np from scipy.sparse import lil_matrix, csr_matrix import lmdb DATA_DIR = Path.home() / "webmind-research" / "data" SEED_PATH = Path.home() / "nexus-brain" / "seed.jsonl" DB_PATH = os.path.expanduser('~/nexus-brain/brain.lmdb') COOCCURRENCE_PULL = 0.3 COOC_WINDOW = 10 # only pair words within this window (not all-pairs) MAX_CONTENT_TOKENS = 50 # cap content tokens per sentence to limit O(n²) FUNCTION_WORDS = frozenset({ "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "shall", "can", "must", "of", "in", "to", "for", "with", "on", "at", "by", "from", "as", "into", "through", "during", "before", "after", "above", "below", "between", "out", "off", "over", "under", "and", "but", "or", "nor", "not", "so", "yet", "both", "either", "neither", "each", "every", "all", "any", "few", "more", "most", "other", "some", "such", "no", "only", "own", "same", "than", "too", "very", "just", "about", "up", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "if", "then", "because", "while", "although", "though", "even", "also", "it", "its", "how", "when", "where", "why", "there", "here", }) _ID_FMT = struct.Struct('= args.limit: break try: record = json.loads(line) except json.JSONDecodeError: continue texts = [] text = record.get("text", "").strip() question = record.get("question", "").strip() answer = record.get("answer", "").strip() if text and len(text) >= 10: texts.append(text) if question and answer: if len(answer) < 50 and not answer.endswith('.'): texts.append(f"{question.rstrip('?')} is {answer}") else: texts.append(answer) for sent in texts: tokens = tokenize(sent) content = [t for t in tokens if t not in FUNCTION_WORDS] if not content: continue for w in content: if w not in word_idx: if len(words) >= V: continue # vocabulary full idx = len(words) words.append(w) word_idx[w] = idx indices = [word_idx[w] for w in content if w in word_idx] if len(indices) < 2: continue # Cap long documents to prevent O(n²) explosion if len(indices) > MAX_CONTENT_TOKENS: indices = indices[:MAX_CONTENT_TOKENS] # Window-based co-occurrence (O(n*W) not O(n²)) n_idx = len(indices) for i in range(n_idx): for j in range(i + 1, min(i + COOC_WINDOW + 1, n_idx)): a, b = indices[i], indices[j] cooc_rows.append(a) cooc_cols.append(b) cooc_vals.append(COOCCURRENCE_PULL) cooc_rows.append(b) cooc_cols.append(a) cooc_vals.append(COOCCURRENCE_PULL) for i in range(len(indices) - 1): successors[indices[i]][indices[i+1]] += 1.0 sentences.append(tuple(indices)) sentence_texts.append(sent) # Extract template: structural words stay, content → slots if 3 <= len(tokens) <= 20: structural_count = sum(1 for t in tokens if t in FUNCTION_WORDS) if structural_count >= 1 and structural_count < len(tokens): parts = [] slot_idx = 0 for t in tokens: if t in FUNCTION_WORDS: parts.append(t) else: parts.append(f"[S{slot_idx}]") slot_idx += 1 pattern = " ".join(parts) template_counts[pattern] += 1 total_fed += 1 ds_fed += 1 if total_fed % 10000 == 0: elapsed = time.time() - t0 rate = total_fed / elapsed mem = avail_mb() n_edges = len(cooc_rows) edge_mb = n_edges * 12 / (1024 * 1024) # 4+4+4 bytes per COO entry print( f" [{ds_name}] {total_fed:,} fed | " f"{len(words):,} words | {n_edges:,} edges ({edge_mb:.0f}MB) | " f"{rate:,.0f}/sec | RSS: {rss_mb():.0f}MB | " f"Avail: {mem:.0f}MB", flush=True ) if mem < args.mem_floor: raise MemoryError("RAM floor hit") if args.limit and total_fed >= args.limit: break print(f" {ds_name}: {ds_fed:,} records", flush=True) except (KeyboardInterrupt, MemoryError) as e: print(f"\nPhase 1 stopped: {e}") elapsed1 = time.time() - t0 n_edges = len(cooc_rows) print(f"\nPhase 1: {total_fed:,} records | {len(words):,} words | " f"{n_edges:,} COO entries | {len(sentences):,} sentences | {elapsed1:.1f}s") print(f" COO memory: {n_edges * 12 / (1024*1024):.0f} MB " f"(vs ~{n_edges * 70 / (1024*1024):.0f} MB in Python dicts)") # ============ Phase 1.5: COO → CSR + hashed projection vectors ============ print(f"\n=== Phase 1.5: COO → CSR matrix + {VECTOR_DIM}-dim vectors ===") t15 = time.time() V_actual = len(words) # Build CSR from COO (scipy handles duplicate summing) from scipy.sparse import coo_matrix cooc_mat = coo_matrix( (np.array(cooc_vals, dtype=np.float32), (np.array(cooc_rows, dtype=np.int32), np.array(cooc_cols, dtype=np.int32))), shape=(V_actual, V_actual), ).tocsr() # Free COO arrays del cooc_rows, cooc_cols, cooc_vals gc.collect() print(f" CSR: {V_actual:,} x {V_actual:,}, {cooc_mat.nnz:,} non-zeros, " f"{cooc_mat.data.nbytes / (1024*1024):.0f} MB") # Hashed projection vectors from CSR (streaming, no extra memory) word_vectors = np.zeros((V_actual, VECTOR_DIM), dtype=np.float32) indptr = cooc_mat.indptr indices = cooc_mat.indices data = cooc_mat.data for widx in range(V_actual): start, end = indptr[widx], indptr[widx + 1] for k in range(start, end): dim = int(indices[k]) % VECTOR_DIM word_vectors[widx, dim] += data[k] norms = np.linalg.norm(word_vectors, axis=1, keepdims=True) norms = np.maximum(norms, 1e-8) word_vectors = word_vectors / norms elapsed15 = time.time() - t15 print(f" Vectors: {V_actual:,} × {VECTOR_DIM} = {word_vectors.nbytes/(1024*1024):.0f} MB | {elapsed15:.1f}s") print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB") # ============ Phase 2: Write to LMDB (batched to avoid I/O spikes) ============ print("\n=== Phase 2: Writing to LMDB (batched) ===") t2 = time.time() # Free COO/CSR references we no longer need — reduce peak memory before mmap del indptr, indices, data gc.collect() print(f" Pre-LMDB: RSS {rss_mb():.0f} MB | Avail {avail_mb():.0f} MB") if os.path.exists(DB_PATH): import shutil shutil.rmtree(DB_PATH) map_size = args.map_size * 1024 * 1024 * 1024 env = lmdb.open(DB_PATH, max_dbs=16, map_size=map_size) neurons_db = env.open_db(b'neurons') vectors_db = env.open_db(b'vectors') successors_db = env.open_db(b'successors') predecessors_db = env.open_db(b'predecessors') words_db = env.open_db(b'words') sentences_db = env.open_db(b'sentences') sent_index_db = env.open_db(b'sent_index') cooc_db = env.open_db(b'cooccurrence') templates_db = env.open_db(b'templates') sent_text_db = env.open_db(b'sentence_text') meta_db = env.open_db(b'meta') BATCH = 10_000 # commit every 10K entries to limit dirty pages def batched_write(label, items, db, transform=None): """Write items in batches of BATCH, committing + syncing between.""" count = 0 txn = env.begin(write=True) for item in items: if transform: k, v = transform(item) else: k, v = item if k is not None: txn.put(k, v, db=db) count += 1 if count % BATCH == 0: txn.commit() env.sync(True) txn = env.begin(write=True) if count % 50_000 == 0: print(f" {label}: {count:,} written | RSS {rss_mb():.0f} MB", flush=True) txn.commit() env.sync(True) print(f" {label}: {count:,} done", flush=True) return count # Meta (tiny, one txn) with env.begin(write=True) as txn: txn.put(b'count', _ID_FMT.pack(V_actual), db=meta_db) txn.put(b'next_id', _ID_FMT.pack(V_actual), db=meta_db) txn.put(b'dim', _ID_FMT.pack(VECTOR_DIM), db=meta_db) txn.put(b'next_sentence_id', _ID_FMT.pack(len(sentences)), db=meta_db) # Neurons (batched) print(f" Writing {V_actual:,} neurons...") batched_write("neurons", range(V_actual), neurons_db, transform=lambda i: (_ID_FMT.pack(i), _NEURON_FMT.pack(0.5, 0, False, 1))) # Vectors (batched — each is 2KB, so 10K batch = 20MB per commit) print(f" Writing {V_actual:,} vectors...") batched_write("vectors", range(V_actual), vectors_db, transform=lambda i: (_ID_FMT.pack(i), word_vectors[i].tobytes())) # Word mappings (batched) print(f" Writing word mappings...") skipped = 0 def word_transform(item): nonlocal skipped w, idx = item encoded = w.encode('utf-8') if len(encoded) > 500: skipped += 1 return (None, None) return (encoded, _ID_FMT.pack(idx)) batched_write("words", word_idx.items(), words_db, transform=word_transform) if skipped: print(f" (skipped {skipped} oversized word keys)") # Successors + predecessors (batched, needs read-back for predecessors) print(f" Writing successors...") succ_count = 0 txn = env.begin(write=True) for src, targets in successors.items(): top = sorted(targets.items(), key=lambda x: -x[1])[:10] max_c = top[0][1] if top else 1.0 succ_bytes = b''.join( _ID_CONF_FMT.pack(tid, min(c / max_c, 1.0)) for tid, c in top ) txn.put(_ID_FMT.pack(src), succ_bytes, db=successors_db) for tid, c in top[:3]: key = _ID_FMT.pack(tid) existing = txn.get(key, db=predecessors_db) or b'' if len(existing) < 3 * 4: existing += _ID_FMT.pack(src) txn.put(key, existing, db=predecessors_db) succ_count += 1 if succ_count % BATCH == 0: txn.commit() env.sync(True) txn = env.begin(write=True) if succ_count % 50_000 == 0: print(f" successors: {succ_count:,} | RSS {rss_mb():.0f} MB", flush=True) txn.commit() env.sync(True) print(f" successors: {succ_count:,} done", flush=True) # Co-occurrence from CSR (batched) csr_indptr = cooc_mat.indptr csr_indices = cooc_mat.indices csr_data = cooc_mat.data print(f" Writing {cooc_mat.nnz:,} co-occurrence edges...") cooc_count = 0 txn = env.begin(write=True) for a in range(V_actual): start, end = csr_indptr[a], csr_indptr[a + 1] if start == end: continue parts = [] for k in range(start, end): b = int(csr_indices[k]) w = float(csr_data[k]) if a != b and w > 0: parts.append(_ID_CONF_FMT.pack(b, w)) if parts: txn.put(_ID_FMT.pack(a), b''.join(parts), db=cooc_db) cooc_count += 1 if cooc_count % BATCH == 0: txn.commit() env.sync(True) txn = env.begin(write=True) if cooc_count % 50_000 == 0: print(f" cooc rows: {cooc_count:,} | RSS {rss_mb():.0f} MB", flush=True) txn.commit() env.sync(True) print(f" cooc rows: {cooc_count:,} done", flush=True) # Sentences (batched) print(f" Writing {len(sentences):,} sentences...") sent_reverse = defaultdict(list) sent_count = 0 txn = env.begin(write=True) for sid, word_indices_tuple in enumerate(sentences): sent_bytes = b''.join( _SENT_ENTRY_FMT.pack(wid, pos) for pos, wid in enumerate(word_indices_tuple) ) txn.put(_ID_FMT.pack(sid), sent_bytes, db=sentences_db) for wid in word_indices_tuple: sent_reverse[wid].append(sid) sent_count += 1 if sent_count % BATCH == 0: txn.commit() env.sync(True) txn = env.begin(write=True) txn.commit() env.sync(True) print(f" sentences: {sent_count:,} done", flush=True) # Sentence full text (batched) print(f" Writing {len(sentence_texts):,} sentence texts...") batched_write("sent_text", enumerate(sentence_texts), sent_text_db, transform=lambda item: ( _ID_FMT.pack(item[0]), item[1].encode('utf-8')[:500] # cap at 500 bytes )) # Sentence index (batched) print(f" Writing sentence index ({len(sent_reverse):,} entries)...") batched_write("sent_index", sent_reverse.items(), sent_index_db, transform=lambda item: ( _ID_FMT.pack(item[0]), b''.join(_ID_FMT.pack(s) for s in item[1][:50]) )) # Templates (top-K most frequent patterns) TOP_TEMPLATES = 5000 top_templates = sorted(template_counts.items(), key=lambda x: -x[1])[:TOP_TEMPLATES] # Filter: only templates seen at least 3 times and with 1-6 slots top_templates = [(p, c) for p, c in top_templates if c >= 3 and 1 <= p.count('[S') <= 6] print(f" Writing {len(top_templates):,} templates (from {len(template_counts):,} unique)...") with env.begin(write=True) as txn: for tid, (pattern, count) in enumerate(top_templates): key = _ID_FMT.pack(tid) val = json.dumps({"pattern": pattern, "count": count, "slots": pattern.count('[S')}).encode() txn.put(key, val, db=templates_db) txn.put(b'count', _ID_FMT.pack(len(top_templates)), db=templates_db) env.sync(True) print(f" templates: {len(top_templates):,} done", flush=True) env.close() elapsed2 = time.time() - t2 total_time = time.time() - t0 lmdb_mb = sum(f.stat().st_size for f in Path(DB_PATH).iterdir()) / (1024 * 1024) print(f"\nPhase 2: {elapsed2:.1f}s") print(f"\n{'='*60}") print(f"COMPLETE: {total_fed:,} records → {V_actual:,} words") print(f" Edges: {cooc_mat.nnz:,} (CSR)") print(f" Sentences: {len(sentences):,}") print(f" Templates: {len(top_templates):,}") print(f" Vectors: {V_actual:,} × {VECTOR_DIM}") print(f" LMDB: {lmdb_mb:.1f} MB | Disk: {disk_free_gb():.1f} GB") print(f" Time: {total_time:.1f}s ({total_fed/max(total_time,1):,.0f} rec/sec)") print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB") print(f" Path: {DB_PATH}") print(f"{'='*60}") if __name__ == '__main__': main()