| """ |
| Fast feeder: build brain LMDB using scipy sparse matrices instead of Python dicts. |
| |
| Python dicts cost ~70 bytes per edge. scipy sparse uses ~12 bytes per edge. |
| 178M edges: 12GB in dicts vs 2.1GB in scipy. Processes ALL 1.29M records. |
| |
| Usage: |
| python3 feed.py # all datasets |
| python3 feed.py --limit 1000 # first 1000 records |
| """ |
|
|
| import sys, os, json, time, argparse, re, struct, gc |
| from pathlib import Path |
| from collections import defaultdict |
|
|
| import numpy as np |
| from scipy.sparse import lil_matrix, csr_matrix |
| import lmdb |
|
|
| DATA_DIR = Path.home() / "webmind-research" / "data" |
| SEED_PATH = Path.home() / "nexus-brain" / "seed.jsonl" |
| DB_PATH = os.path.expanduser('~/nexus-brain/brain.lmdb') |
|
|
| COOCCURRENCE_PULL = 0.3 |
| COOC_WINDOW = 10 |
| MAX_CONTENT_TOKENS = 50 |
|
|
| FUNCTION_WORDS = frozenset({ |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", |
| "have", "has", "had", "do", "does", "did", "will", "would", "could", |
| "should", "may", "might", "shall", "can", "must", "of", "in", "to", |
| "for", "with", "on", "at", "by", "from", "as", "into", "through", |
| "during", "before", "after", "above", "below", "between", "out", |
| "off", "over", "under", "and", "but", "or", "nor", "not", "so", |
| "yet", "both", "either", "neither", "each", "every", "all", "any", |
| "few", "more", "most", "other", "some", "such", "no", "only", "own", |
| "same", "than", "too", "very", "just", "about", "up", "what", "which", |
| "who", "whom", "this", "that", "these", "those", "am", "if", "then", |
| "because", "while", "although", "though", "even", "also", "it", "its", |
| "how", "when", "where", "why", "there", "here", |
| }) |
|
|
| _ID_FMT = struct.Struct('<i') |
| _ID_CONF_FMT = struct.Struct('<if') |
| _SENT_ENTRY_FMT = struct.Struct('<ii') |
| _NEURON_FMT = struct.Struct('<fq?b') |
|
|
| VECTOR_DIM = 512 |
|
|
|
|
| def tokenize(text): |
| return re.findall(r'[a-z0-9]+', text.lower()) |
|
|
|
|
| def avail_mb(): |
| try: |
| with open('/proc/meminfo') as f: |
| for line in f: |
| if line.startswith('MemAvailable:'): |
| return int(line.split()[1]) / 1024 |
| except Exception: |
| return 9999 |
|
|
|
|
| def rss_mb(): |
| try: |
| with open('/proc/self/status') as f: |
| for line in f: |
| if line.startswith('VmRSS:'): |
| return int(line.split()[1]) / 1024 |
| except Exception: |
| return 0 |
|
|
|
|
| def disk_free_gb(): |
| try: |
| st = os.statvfs('/') |
| return st.f_bavail * st.f_frsize / (1024 ** 3) |
| except Exception: |
| return 999 |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dataset", default="all") |
| parser.add_argument("--limit", type=int, default=0) |
| parser.add_argument("--mem-floor", type=int, default=2048, |
| help="Stop ingestion when available RAM drops below this (MB)") |
| parser.add_argument("--max-vocab", type=int, default=600_000, |
| help="Max vocabulary size (pre-allocate sparse matrix)") |
| parser.add_argument("--map-size", type=int, default=4, |
| help="LMDB map size in GB") |
| parser.add_argument("--seed", action="store_true", |
| help="Use curated seed.jsonl instead of raw data dir") |
| args = parser.parse_args() |
|
|
| if args.seed and SEED_PATH.exists(): |
| datasets = [SEED_PATH] |
| print(f"Using curated seed: {SEED_PATH}") |
| else: |
| EXCLUDE = {'coco_captions', 'audiocaps', 'vggsound'} |
| datasets = sorted(DATA_DIR.glob("*.jsonl")) |
| if args.dataset != "all": |
| datasets = [DATA_DIR / f"{args.dataset}.jsonl"] |
| else: |
| datasets = [d for d in datasets if d.stem not in EXCLUDE] |
|
|
| print(f"Datasets: {len(datasets)} files") |
| print(f"RAM: {avail_mb():.0f}MB avail | Disk: {disk_free_gb():.1f}GB free") |
| print(f"Max vocab: {args.max_vocab:,} | Mem floor: {args.mem_floor}MB") |
| print(f"LMDB path: {DB_PATH}") |
|
|
| |
| V = args.max_vocab |
| print(f"\n=== Phase 1: Building sparse co-occurrence ({V:,} x {V:,} pre-alloc) ===") |
|
|
| |
| cooc_rows = [] |
| cooc_cols = [] |
| cooc_vals = [] |
|
|
| words = [] |
| word_idx = {} |
| successors = defaultdict(lambda: defaultdict(float)) |
| sentences = [] |
| sentence_texts = [] |
| template_counts = defaultdict(int) |
|
|
| total_fed = 0 |
| t0 = time.time() |
|
|
| try: |
| for ds_path in datasets: |
| if not ds_path.exists() or ds_path.stat().st_size == 0: |
| continue |
| ds_name = ds_path.stem |
| ds_fed = 0 |
|
|
| with open(ds_path) as f: |
| for line in f: |
| if args.limit and total_fed >= args.limit: |
| break |
| try: |
| record = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
|
|
| texts = [] |
| text = record.get("text", "").strip() |
| question = record.get("question", "").strip() |
| answer = record.get("answer", "").strip() |
| if text and len(text) >= 10: |
| texts.append(text) |
| if question and answer: |
| if len(answer) < 50 and not answer.endswith('.'): |
| texts.append(f"{question.rstrip('?')} is {answer}") |
| else: |
| texts.append(answer) |
|
|
| for sent in texts: |
| tokens = tokenize(sent) |
| content = [t for t in tokens if t not in FUNCTION_WORDS] |
| if not content: |
| continue |
|
|
| for w in content: |
| if w not in word_idx: |
| if len(words) >= V: |
| continue |
| idx = len(words) |
| words.append(w) |
| word_idx[w] = idx |
|
|
| indices = [word_idx[w] for w in content if w in word_idx] |
| if len(indices) < 2: |
| continue |
| |
| if len(indices) > MAX_CONTENT_TOKENS: |
| indices = indices[:MAX_CONTENT_TOKENS] |
|
|
| |
| n_idx = len(indices) |
| for i in range(n_idx): |
| for j in range(i + 1, min(i + COOC_WINDOW + 1, n_idx)): |
| a, b = indices[i], indices[j] |
| cooc_rows.append(a) |
| cooc_cols.append(b) |
| cooc_vals.append(COOCCURRENCE_PULL) |
| cooc_rows.append(b) |
| cooc_cols.append(a) |
| cooc_vals.append(COOCCURRENCE_PULL) |
|
|
| for i in range(len(indices) - 1): |
| successors[indices[i]][indices[i+1]] += 1.0 |
|
|
| sentences.append(tuple(indices)) |
| sentence_texts.append(sent) |
|
|
| |
| if 3 <= len(tokens) <= 20: |
| structural_count = sum(1 for t in tokens if t in FUNCTION_WORDS) |
| if structural_count >= 1 and structural_count < len(tokens): |
| parts = [] |
| slot_idx = 0 |
| for t in tokens: |
| if t in FUNCTION_WORDS: |
| parts.append(t) |
| else: |
| parts.append(f"[S{slot_idx}]") |
| slot_idx += 1 |
| pattern = " ".join(parts) |
| template_counts[pattern] += 1 |
|
|
| total_fed += 1 |
| ds_fed += 1 |
|
|
| if total_fed % 10000 == 0: |
| elapsed = time.time() - t0 |
| rate = total_fed / elapsed |
| mem = avail_mb() |
| n_edges = len(cooc_rows) |
| edge_mb = n_edges * 12 / (1024 * 1024) |
| print( |
| f" [{ds_name}] {total_fed:,} fed | " |
| f"{len(words):,} words | {n_edges:,} edges ({edge_mb:.0f}MB) | " |
| f"{rate:,.0f}/sec | RSS: {rss_mb():.0f}MB | " |
| f"Avail: {mem:.0f}MB", |
| flush=True |
| ) |
| if mem < args.mem_floor: |
| raise MemoryError("RAM floor hit") |
|
|
| if args.limit and total_fed >= args.limit: |
| break |
| print(f" {ds_name}: {ds_fed:,} records", flush=True) |
|
|
| except (KeyboardInterrupt, MemoryError) as e: |
| print(f"\nPhase 1 stopped: {e}") |
|
|
| elapsed1 = time.time() - t0 |
| n_edges = len(cooc_rows) |
| print(f"\nPhase 1: {total_fed:,} records | {len(words):,} words | " |
| f"{n_edges:,} COO entries | {len(sentences):,} sentences | {elapsed1:.1f}s") |
| print(f" COO memory: {n_edges * 12 / (1024*1024):.0f} MB " |
| f"(vs ~{n_edges * 70 / (1024*1024):.0f} MB in Python dicts)") |
|
|
| |
| print(f"\n=== Phase 1.5: COO → CSR matrix + {VECTOR_DIM}-dim vectors ===") |
| t15 = time.time() |
|
|
| V_actual = len(words) |
|
|
| |
| from scipy.sparse import coo_matrix |
| cooc_mat = coo_matrix( |
| (np.array(cooc_vals, dtype=np.float32), |
| (np.array(cooc_rows, dtype=np.int32), |
| np.array(cooc_cols, dtype=np.int32))), |
| shape=(V_actual, V_actual), |
| ).tocsr() |
|
|
| |
| del cooc_rows, cooc_cols, cooc_vals |
| gc.collect() |
|
|
| print(f" CSR: {V_actual:,} x {V_actual:,}, {cooc_mat.nnz:,} non-zeros, " |
| f"{cooc_mat.data.nbytes / (1024*1024):.0f} MB") |
|
|
| |
| word_vectors = np.zeros((V_actual, VECTOR_DIM), dtype=np.float32) |
| indptr = cooc_mat.indptr |
| indices = cooc_mat.indices |
| data = cooc_mat.data |
|
|
| for widx in range(V_actual): |
| start, end = indptr[widx], indptr[widx + 1] |
| for k in range(start, end): |
| dim = int(indices[k]) % VECTOR_DIM |
| word_vectors[widx, dim] += data[k] |
|
|
| norms = np.linalg.norm(word_vectors, axis=1, keepdims=True) |
| norms = np.maximum(norms, 1e-8) |
| word_vectors = word_vectors / norms |
|
|
| elapsed15 = time.time() - t15 |
| print(f" Vectors: {V_actual:,} × {VECTOR_DIM} = {word_vectors.nbytes/(1024*1024):.0f} MB | {elapsed15:.1f}s") |
| print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB") |
|
|
| |
| print("\n=== Phase 2: Writing to LMDB (batched) ===") |
| t2 = time.time() |
|
|
| |
| del indptr, indices, data |
| gc.collect() |
| print(f" Pre-LMDB: RSS {rss_mb():.0f} MB | Avail {avail_mb():.0f} MB") |
|
|
| if os.path.exists(DB_PATH): |
| import shutil |
| shutil.rmtree(DB_PATH) |
|
|
| map_size = args.map_size * 1024 * 1024 * 1024 |
| env = lmdb.open(DB_PATH, max_dbs=16, map_size=map_size) |
|
|
| neurons_db = env.open_db(b'neurons') |
| vectors_db = env.open_db(b'vectors') |
| successors_db = env.open_db(b'successors') |
| predecessors_db = env.open_db(b'predecessors') |
| words_db = env.open_db(b'words') |
| sentences_db = env.open_db(b'sentences') |
| sent_index_db = env.open_db(b'sent_index') |
| cooc_db = env.open_db(b'cooccurrence') |
| templates_db = env.open_db(b'templates') |
| sent_text_db = env.open_db(b'sentence_text') |
| meta_db = env.open_db(b'meta') |
|
|
| BATCH = 10_000 |
|
|
| def batched_write(label, items, db, transform=None): |
| """Write items in batches of BATCH, committing + syncing between.""" |
| count = 0 |
| txn = env.begin(write=True) |
| for item in items: |
| if transform: |
| k, v = transform(item) |
| else: |
| k, v = item |
| if k is not None: |
| txn.put(k, v, db=db) |
| count += 1 |
| if count % BATCH == 0: |
| txn.commit() |
| env.sync(True) |
| txn = env.begin(write=True) |
| if count % 50_000 == 0: |
| print(f" {label}: {count:,} written | RSS {rss_mb():.0f} MB", flush=True) |
| txn.commit() |
| env.sync(True) |
| print(f" {label}: {count:,} done", flush=True) |
| return count |
|
|
| |
| with env.begin(write=True) as txn: |
| txn.put(b'count', _ID_FMT.pack(V_actual), db=meta_db) |
| txn.put(b'next_id', _ID_FMT.pack(V_actual), db=meta_db) |
| txn.put(b'dim', _ID_FMT.pack(VECTOR_DIM), db=meta_db) |
| txn.put(b'next_sentence_id', _ID_FMT.pack(len(sentences)), db=meta_db) |
|
|
| |
| print(f" Writing {V_actual:,} neurons...") |
| batched_write("neurons", range(V_actual), neurons_db, |
| transform=lambda i: (_ID_FMT.pack(i), _NEURON_FMT.pack(0.5, 0, False, 1))) |
|
|
| |
| print(f" Writing {V_actual:,} vectors...") |
| batched_write("vectors", range(V_actual), vectors_db, |
| transform=lambda i: (_ID_FMT.pack(i), word_vectors[i].tobytes())) |
|
|
| |
| print(f" Writing word mappings...") |
| skipped = 0 |
| def word_transform(item): |
| nonlocal skipped |
| w, idx = item |
| encoded = w.encode('utf-8') |
| if len(encoded) > 500: |
| skipped += 1 |
| return (None, None) |
| return (encoded, _ID_FMT.pack(idx)) |
| batched_write("words", word_idx.items(), words_db, transform=word_transform) |
| if skipped: |
| print(f" (skipped {skipped} oversized word keys)") |
|
|
| |
| print(f" Writing successors...") |
| succ_count = 0 |
| txn = env.begin(write=True) |
| for src, targets in successors.items(): |
| top = sorted(targets.items(), key=lambda x: -x[1])[:10] |
| max_c = top[0][1] if top else 1.0 |
| succ_bytes = b''.join( |
| _ID_CONF_FMT.pack(tid, min(c / max_c, 1.0)) |
| for tid, c in top |
| ) |
| txn.put(_ID_FMT.pack(src), succ_bytes, db=successors_db) |
| for tid, c in top[:3]: |
| key = _ID_FMT.pack(tid) |
| existing = txn.get(key, db=predecessors_db) or b'' |
| if len(existing) < 3 * 4: |
| existing += _ID_FMT.pack(src) |
| txn.put(key, existing, db=predecessors_db) |
| succ_count += 1 |
| if succ_count % BATCH == 0: |
| txn.commit() |
| env.sync(True) |
| txn = env.begin(write=True) |
| if succ_count % 50_000 == 0: |
| print(f" successors: {succ_count:,} | RSS {rss_mb():.0f} MB", flush=True) |
| txn.commit() |
| env.sync(True) |
| print(f" successors: {succ_count:,} done", flush=True) |
|
|
| |
| csr_indptr = cooc_mat.indptr |
| csr_indices = cooc_mat.indices |
| csr_data = cooc_mat.data |
| print(f" Writing {cooc_mat.nnz:,} co-occurrence edges...") |
| cooc_count = 0 |
| txn = env.begin(write=True) |
| for a in range(V_actual): |
| start, end = csr_indptr[a], csr_indptr[a + 1] |
| if start == end: |
| continue |
| parts = [] |
| for k in range(start, end): |
| b = int(csr_indices[k]) |
| w = float(csr_data[k]) |
| if a != b and w > 0: |
| parts.append(_ID_CONF_FMT.pack(b, w)) |
| if parts: |
| txn.put(_ID_FMT.pack(a), b''.join(parts), db=cooc_db) |
| cooc_count += 1 |
| if cooc_count % BATCH == 0: |
| txn.commit() |
| env.sync(True) |
| txn = env.begin(write=True) |
| if cooc_count % 50_000 == 0: |
| print(f" cooc rows: {cooc_count:,} | RSS {rss_mb():.0f} MB", flush=True) |
| txn.commit() |
| env.sync(True) |
| print(f" cooc rows: {cooc_count:,} done", flush=True) |
|
|
| |
| print(f" Writing {len(sentences):,} sentences...") |
| sent_reverse = defaultdict(list) |
| sent_count = 0 |
| txn = env.begin(write=True) |
| for sid, word_indices_tuple in enumerate(sentences): |
| sent_bytes = b''.join( |
| _SENT_ENTRY_FMT.pack(wid, pos) |
| for pos, wid in enumerate(word_indices_tuple) |
| ) |
| txn.put(_ID_FMT.pack(sid), sent_bytes, db=sentences_db) |
| for wid in word_indices_tuple: |
| sent_reverse[wid].append(sid) |
| sent_count += 1 |
| if sent_count % BATCH == 0: |
| txn.commit() |
| env.sync(True) |
| txn = env.begin(write=True) |
| txn.commit() |
| env.sync(True) |
| print(f" sentences: {sent_count:,} done", flush=True) |
|
|
| |
| print(f" Writing {len(sentence_texts):,} sentence texts...") |
| batched_write("sent_text", enumerate(sentence_texts), sent_text_db, |
| transform=lambda item: ( |
| _ID_FMT.pack(item[0]), |
| item[1].encode('utf-8')[:500] |
| )) |
|
|
| |
| print(f" Writing sentence index ({len(sent_reverse):,} entries)...") |
| batched_write("sent_index", sent_reverse.items(), sent_index_db, |
| transform=lambda item: ( |
| _ID_FMT.pack(item[0]), |
| b''.join(_ID_FMT.pack(s) for s in item[1][:50]) |
| )) |
|
|
| |
| TOP_TEMPLATES = 5000 |
| top_templates = sorted(template_counts.items(), key=lambda x: -x[1])[:TOP_TEMPLATES] |
| |
| top_templates = [(p, c) for p, c in top_templates |
| if c >= 3 and 1 <= p.count('[S') <= 6] |
| print(f" Writing {len(top_templates):,} templates (from {len(template_counts):,} unique)...") |
| with env.begin(write=True) as txn: |
| for tid, (pattern, count) in enumerate(top_templates): |
| key = _ID_FMT.pack(tid) |
| val = json.dumps({"pattern": pattern, "count": count, "slots": pattern.count('[S')}).encode() |
| txn.put(key, val, db=templates_db) |
| txn.put(b'count', _ID_FMT.pack(len(top_templates)), db=templates_db) |
| env.sync(True) |
| print(f" templates: {len(top_templates):,} done", flush=True) |
|
|
| env.close() |
|
|
| elapsed2 = time.time() - t2 |
| total_time = time.time() - t0 |
| lmdb_mb = sum(f.stat().st_size for f in Path(DB_PATH).iterdir()) / (1024 * 1024) |
|
|
| print(f"\nPhase 2: {elapsed2:.1f}s") |
| print(f"\n{'='*60}") |
| print(f"COMPLETE: {total_fed:,} records → {V_actual:,} words") |
| print(f" Edges: {cooc_mat.nnz:,} (CSR)") |
| print(f" Sentences: {len(sentences):,}") |
| print(f" Templates: {len(top_templates):,}") |
| print(f" Vectors: {V_actual:,} × {VECTOR_DIM}") |
| print(f" LMDB: {lmdb_mb:.1f} MB | Disk: {disk_free_gb():.1f} GB") |
| print(f" Time: {total_time:.1f}s ({total_fed/max(total_time,1):,.0f} rec/sec)") |
| print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB") |
| print(f" Path: {DB_PATH}") |
| print(f"{'='*60}") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|