guru / feed.py
tejadabheja's picture
Upload folder using huggingface_hub
d38c1d3 verified
"""
Fast feeder: build brain LMDB using scipy sparse matrices instead of Python dicts.
Python dicts cost ~70 bytes per edge. scipy sparse uses ~12 bytes per edge.
178M edges: 12GB in dicts vs 2.1GB in scipy. Processes ALL 1.29M records.
Usage:
python3 feed.py # all datasets
python3 feed.py --limit 1000 # first 1000 records
"""
import sys, os, json, time, argparse, re, struct, gc
from pathlib import Path
from collections import defaultdict
import numpy as np
from scipy.sparse import lil_matrix, csr_matrix
import lmdb
DATA_DIR = Path.home() / "webmind-research" / "data"
SEED_PATH = Path.home() / "nexus-brain" / "seed.jsonl"
DB_PATH = os.path.expanduser('~/nexus-brain/brain.lmdb')
COOCCURRENCE_PULL = 0.3
COOC_WINDOW = 10 # only pair words within this window (not all-pairs)
MAX_CONTENT_TOKENS = 50 # cap content tokens per sentence to limit O(n²)
FUNCTION_WORDS = frozenset({
"the", "a", "an", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "must", "of", "in", "to",
"for", "with", "on", "at", "by", "from", "as", "into", "through",
"during", "before", "after", "above", "below", "between", "out",
"off", "over", "under", "and", "but", "or", "nor", "not", "so",
"yet", "both", "either", "neither", "each", "every", "all", "any",
"few", "more", "most", "other", "some", "such", "no", "only", "own",
"same", "than", "too", "very", "just", "about", "up", "what", "which",
"who", "whom", "this", "that", "these", "those", "am", "if", "then",
"because", "while", "although", "though", "even", "also", "it", "its",
"how", "when", "where", "why", "there", "here",
})
_ID_FMT = struct.Struct('<i')
_ID_CONF_FMT = struct.Struct('<if')
_SENT_ENTRY_FMT = struct.Struct('<ii')
_NEURON_FMT = struct.Struct('<fq?b')
VECTOR_DIM = 512
def tokenize(text):
return re.findall(r'[a-z0-9]+', text.lower())
def avail_mb():
try:
with open('/proc/meminfo') as f:
for line in f:
if line.startswith('MemAvailable:'):
return int(line.split()[1]) / 1024
except Exception:
return 9999
def rss_mb():
try:
with open('/proc/self/status') as f:
for line in f:
if line.startswith('VmRSS:'):
return int(line.split()[1]) / 1024
except Exception:
return 0
def disk_free_gb():
try:
st = os.statvfs('/')
return st.f_bavail * st.f_frsize / (1024 ** 3)
except Exception:
return 999
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dataset", default="all")
parser.add_argument("--limit", type=int, default=0)
parser.add_argument("--mem-floor", type=int, default=2048,
help="Stop ingestion when available RAM drops below this (MB)")
parser.add_argument("--max-vocab", type=int, default=600_000,
help="Max vocabulary size (pre-allocate sparse matrix)")
parser.add_argument("--map-size", type=int, default=4,
help="LMDB map size in GB")
parser.add_argument("--seed", action="store_true",
help="Use curated seed.jsonl instead of raw data dir")
args = parser.parse_args()
if args.seed and SEED_PATH.exists():
datasets = [SEED_PATH]
print(f"Using curated seed: {SEED_PATH}")
else:
EXCLUDE = {'coco_captions', 'audiocaps', 'vggsound'}
datasets = sorted(DATA_DIR.glob("*.jsonl"))
if args.dataset != "all":
datasets = [DATA_DIR / f"{args.dataset}.jsonl"]
else:
datasets = [d for d in datasets if d.stem not in EXCLUDE]
print(f"Datasets: {len(datasets)} files")
print(f"RAM: {avail_mb():.0f}MB avail | Disk: {disk_free_gb():.1f}GB free")
print(f"Max vocab: {args.max_vocab:,} | Mem floor: {args.mem_floor}MB")
print(f"LMDB path: {DB_PATH}")
# ============ Phase 1: Build sparse co-occurrence matrix ============
V = args.max_vocab # pre-allocate
print(f"\n=== Phase 1: Building sparse co-occurrence ({V:,} x {V:,} pre-alloc) ===")
# COO accumulation (fastest for construction)
cooc_rows = []
cooc_cols = []
cooc_vals = []
words = []
word_idx = {}
successors = defaultdict(lambda: defaultdict(float))
sentences = []
sentence_texts = [] # original text for each sentence (for full-text retrieval)
template_counts = defaultdict(int) # pattern → count
total_fed = 0
t0 = time.time()
try:
for ds_path in datasets:
if not ds_path.exists() or ds_path.stat().st_size == 0:
continue
ds_name = ds_path.stem
ds_fed = 0
with open(ds_path) as f:
for line in f:
if args.limit and total_fed >= args.limit:
break
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
texts = []
text = record.get("text", "").strip()
question = record.get("question", "").strip()
answer = record.get("answer", "").strip()
if text and len(text) >= 10:
texts.append(text)
if question and answer:
if len(answer) < 50 and not answer.endswith('.'):
texts.append(f"{question.rstrip('?')} is {answer}")
else:
texts.append(answer)
for sent in texts:
tokens = tokenize(sent)
content = [t for t in tokens if t not in FUNCTION_WORDS]
if not content:
continue
for w in content:
if w not in word_idx:
if len(words) >= V:
continue # vocabulary full
idx = len(words)
words.append(w)
word_idx[w] = idx
indices = [word_idx[w] for w in content if w in word_idx]
if len(indices) < 2:
continue
# Cap long documents to prevent O(n²) explosion
if len(indices) > MAX_CONTENT_TOKENS:
indices = indices[:MAX_CONTENT_TOKENS]
# Window-based co-occurrence (O(n*W) not O(n²))
n_idx = len(indices)
for i in range(n_idx):
for j in range(i + 1, min(i + COOC_WINDOW + 1, n_idx)):
a, b = indices[i], indices[j]
cooc_rows.append(a)
cooc_cols.append(b)
cooc_vals.append(COOCCURRENCE_PULL)
cooc_rows.append(b)
cooc_cols.append(a)
cooc_vals.append(COOCCURRENCE_PULL)
for i in range(len(indices) - 1):
successors[indices[i]][indices[i+1]] += 1.0
sentences.append(tuple(indices))
sentence_texts.append(sent)
# Extract template: structural words stay, content → slots
if 3 <= len(tokens) <= 20:
structural_count = sum(1 for t in tokens if t in FUNCTION_WORDS)
if structural_count >= 1 and structural_count < len(tokens):
parts = []
slot_idx = 0
for t in tokens:
if t in FUNCTION_WORDS:
parts.append(t)
else:
parts.append(f"[S{slot_idx}]")
slot_idx += 1
pattern = " ".join(parts)
template_counts[pattern] += 1
total_fed += 1
ds_fed += 1
if total_fed % 10000 == 0:
elapsed = time.time() - t0
rate = total_fed / elapsed
mem = avail_mb()
n_edges = len(cooc_rows)
edge_mb = n_edges * 12 / (1024 * 1024) # 4+4+4 bytes per COO entry
print(
f" [{ds_name}] {total_fed:,} fed | "
f"{len(words):,} words | {n_edges:,} edges ({edge_mb:.0f}MB) | "
f"{rate:,.0f}/sec | RSS: {rss_mb():.0f}MB | "
f"Avail: {mem:.0f}MB",
flush=True
)
if mem < args.mem_floor:
raise MemoryError("RAM floor hit")
if args.limit and total_fed >= args.limit:
break
print(f" {ds_name}: {ds_fed:,} records", flush=True)
except (KeyboardInterrupt, MemoryError) as e:
print(f"\nPhase 1 stopped: {e}")
elapsed1 = time.time() - t0
n_edges = len(cooc_rows)
print(f"\nPhase 1: {total_fed:,} records | {len(words):,} words | "
f"{n_edges:,} COO entries | {len(sentences):,} sentences | {elapsed1:.1f}s")
print(f" COO memory: {n_edges * 12 / (1024*1024):.0f} MB "
f"(vs ~{n_edges * 70 / (1024*1024):.0f} MB in Python dicts)")
# ============ Phase 1.5: COO → CSR + hashed projection vectors ============
print(f"\n=== Phase 1.5: COO → CSR matrix + {VECTOR_DIM}-dim vectors ===")
t15 = time.time()
V_actual = len(words)
# Build CSR from COO (scipy handles duplicate summing)
from scipy.sparse import coo_matrix
cooc_mat = coo_matrix(
(np.array(cooc_vals, dtype=np.float32),
(np.array(cooc_rows, dtype=np.int32),
np.array(cooc_cols, dtype=np.int32))),
shape=(V_actual, V_actual),
).tocsr()
# Free COO arrays
del cooc_rows, cooc_cols, cooc_vals
gc.collect()
print(f" CSR: {V_actual:,} x {V_actual:,}, {cooc_mat.nnz:,} non-zeros, "
f"{cooc_mat.data.nbytes / (1024*1024):.0f} MB")
# Hashed projection vectors from CSR (streaming, no extra memory)
word_vectors = np.zeros((V_actual, VECTOR_DIM), dtype=np.float32)
indptr = cooc_mat.indptr
indices = cooc_mat.indices
data = cooc_mat.data
for widx in range(V_actual):
start, end = indptr[widx], indptr[widx + 1]
for k in range(start, end):
dim = int(indices[k]) % VECTOR_DIM
word_vectors[widx, dim] += data[k]
norms = np.linalg.norm(word_vectors, axis=1, keepdims=True)
norms = np.maximum(norms, 1e-8)
word_vectors = word_vectors / norms
elapsed15 = time.time() - t15
print(f" Vectors: {V_actual:,} × {VECTOR_DIM} = {word_vectors.nbytes/(1024*1024):.0f} MB | {elapsed15:.1f}s")
print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB")
# ============ Phase 2: Write to LMDB (batched to avoid I/O spikes) ============
print("\n=== Phase 2: Writing to LMDB (batched) ===")
t2 = time.time()
# Free COO/CSR references we no longer need — reduce peak memory before mmap
del indptr, indices, data
gc.collect()
print(f" Pre-LMDB: RSS {rss_mb():.0f} MB | Avail {avail_mb():.0f} MB")
if os.path.exists(DB_PATH):
import shutil
shutil.rmtree(DB_PATH)
map_size = args.map_size * 1024 * 1024 * 1024
env = lmdb.open(DB_PATH, max_dbs=16, map_size=map_size)
neurons_db = env.open_db(b'neurons')
vectors_db = env.open_db(b'vectors')
successors_db = env.open_db(b'successors')
predecessors_db = env.open_db(b'predecessors')
words_db = env.open_db(b'words')
sentences_db = env.open_db(b'sentences')
sent_index_db = env.open_db(b'sent_index')
cooc_db = env.open_db(b'cooccurrence')
templates_db = env.open_db(b'templates')
sent_text_db = env.open_db(b'sentence_text')
meta_db = env.open_db(b'meta')
BATCH = 10_000 # commit every 10K entries to limit dirty pages
def batched_write(label, items, db, transform=None):
"""Write items in batches of BATCH, committing + syncing between."""
count = 0
txn = env.begin(write=True)
for item in items:
if transform:
k, v = transform(item)
else:
k, v = item
if k is not None:
txn.put(k, v, db=db)
count += 1
if count % BATCH == 0:
txn.commit()
env.sync(True)
txn = env.begin(write=True)
if count % 50_000 == 0:
print(f" {label}: {count:,} written | RSS {rss_mb():.0f} MB", flush=True)
txn.commit()
env.sync(True)
print(f" {label}: {count:,} done", flush=True)
return count
# Meta (tiny, one txn)
with env.begin(write=True) as txn:
txn.put(b'count', _ID_FMT.pack(V_actual), db=meta_db)
txn.put(b'next_id', _ID_FMT.pack(V_actual), db=meta_db)
txn.put(b'dim', _ID_FMT.pack(VECTOR_DIM), db=meta_db)
txn.put(b'next_sentence_id', _ID_FMT.pack(len(sentences)), db=meta_db)
# Neurons (batched)
print(f" Writing {V_actual:,} neurons...")
batched_write("neurons", range(V_actual), neurons_db,
transform=lambda i: (_ID_FMT.pack(i), _NEURON_FMT.pack(0.5, 0, False, 1)))
# Vectors (batched — each is 2KB, so 10K batch = 20MB per commit)
print(f" Writing {V_actual:,} vectors...")
batched_write("vectors", range(V_actual), vectors_db,
transform=lambda i: (_ID_FMT.pack(i), word_vectors[i].tobytes()))
# Word mappings (batched)
print(f" Writing word mappings...")
skipped = 0
def word_transform(item):
nonlocal skipped
w, idx = item
encoded = w.encode('utf-8')
if len(encoded) > 500:
skipped += 1
return (None, None)
return (encoded, _ID_FMT.pack(idx))
batched_write("words", word_idx.items(), words_db, transform=word_transform)
if skipped:
print(f" (skipped {skipped} oversized word keys)")
# Successors + predecessors (batched, needs read-back for predecessors)
print(f" Writing successors...")
succ_count = 0
txn = env.begin(write=True)
for src, targets in successors.items():
top = sorted(targets.items(), key=lambda x: -x[1])[:10]
max_c = top[0][1] if top else 1.0
succ_bytes = b''.join(
_ID_CONF_FMT.pack(tid, min(c / max_c, 1.0))
for tid, c in top
)
txn.put(_ID_FMT.pack(src), succ_bytes, db=successors_db)
for tid, c in top[:3]:
key = _ID_FMT.pack(tid)
existing = txn.get(key, db=predecessors_db) or b''
if len(existing) < 3 * 4:
existing += _ID_FMT.pack(src)
txn.put(key, existing, db=predecessors_db)
succ_count += 1
if succ_count % BATCH == 0:
txn.commit()
env.sync(True)
txn = env.begin(write=True)
if succ_count % 50_000 == 0:
print(f" successors: {succ_count:,} | RSS {rss_mb():.0f} MB", flush=True)
txn.commit()
env.sync(True)
print(f" successors: {succ_count:,} done", flush=True)
# Co-occurrence from CSR (batched)
csr_indptr = cooc_mat.indptr
csr_indices = cooc_mat.indices
csr_data = cooc_mat.data
print(f" Writing {cooc_mat.nnz:,} co-occurrence edges...")
cooc_count = 0
txn = env.begin(write=True)
for a in range(V_actual):
start, end = csr_indptr[a], csr_indptr[a + 1]
if start == end:
continue
parts = []
for k in range(start, end):
b = int(csr_indices[k])
w = float(csr_data[k])
if a != b and w > 0:
parts.append(_ID_CONF_FMT.pack(b, w))
if parts:
txn.put(_ID_FMT.pack(a), b''.join(parts), db=cooc_db)
cooc_count += 1
if cooc_count % BATCH == 0:
txn.commit()
env.sync(True)
txn = env.begin(write=True)
if cooc_count % 50_000 == 0:
print(f" cooc rows: {cooc_count:,} | RSS {rss_mb():.0f} MB", flush=True)
txn.commit()
env.sync(True)
print(f" cooc rows: {cooc_count:,} done", flush=True)
# Sentences (batched)
print(f" Writing {len(sentences):,} sentences...")
sent_reverse = defaultdict(list)
sent_count = 0
txn = env.begin(write=True)
for sid, word_indices_tuple in enumerate(sentences):
sent_bytes = b''.join(
_SENT_ENTRY_FMT.pack(wid, pos)
for pos, wid in enumerate(word_indices_tuple)
)
txn.put(_ID_FMT.pack(sid), sent_bytes, db=sentences_db)
for wid in word_indices_tuple:
sent_reverse[wid].append(sid)
sent_count += 1
if sent_count % BATCH == 0:
txn.commit()
env.sync(True)
txn = env.begin(write=True)
txn.commit()
env.sync(True)
print(f" sentences: {sent_count:,} done", flush=True)
# Sentence full text (batched)
print(f" Writing {len(sentence_texts):,} sentence texts...")
batched_write("sent_text", enumerate(sentence_texts), sent_text_db,
transform=lambda item: (
_ID_FMT.pack(item[0]),
item[1].encode('utf-8')[:500] # cap at 500 bytes
))
# Sentence index (batched)
print(f" Writing sentence index ({len(sent_reverse):,} entries)...")
batched_write("sent_index", sent_reverse.items(), sent_index_db,
transform=lambda item: (
_ID_FMT.pack(item[0]),
b''.join(_ID_FMT.pack(s) for s in item[1][:50])
))
# Templates (top-K most frequent patterns)
TOP_TEMPLATES = 5000
top_templates = sorted(template_counts.items(), key=lambda x: -x[1])[:TOP_TEMPLATES]
# Filter: only templates seen at least 3 times and with 1-6 slots
top_templates = [(p, c) for p, c in top_templates
if c >= 3 and 1 <= p.count('[S') <= 6]
print(f" Writing {len(top_templates):,} templates (from {len(template_counts):,} unique)...")
with env.begin(write=True) as txn:
for tid, (pattern, count) in enumerate(top_templates):
key = _ID_FMT.pack(tid)
val = json.dumps({"pattern": pattern, "count": count, "slots": pattern.count('[S')}).encode()
txn.put(key, val, db=templates_db)
txn.put(b'count', _ID_FMT.pack(len(top_templates)), db=templates_db)
env.sync(True)
print(f" templates: {len(top_templates):,} done", flush=True)
env.close()
elapsed2 = time.time() - t2
total_time = time.time() - t0
lmdb_mb = sum(f.stat().st_size for f in Path(DB_PATH).iterdir()) / (1024 * 1024)
print(f"\nPhase 2: {elapsed2:.1f}s")
print(f"\n{'='*60}")
print(f"COMPLETE: {total_fed:,} records → {V_actual:,} words")
print(f" Edges: {cooc_mat.nnz:,} (CSR)")
print(f" Sentences: {len(sentences):,}")
print(f" Templates: {len(top_templates):,}")
print(f" Vectors: {V_actual:,} × {VECTOR_DIM}")
print(f" LMDB: {lmdb_mb:.1f} MB | Disk: {disk_free_gb():.1f} GB")
print(f" Time: {total_time:.1f}s ({total_fed/max(total_time,1):,.0f} rec/sec)")
print(f" RSS: {rss_mb():.0f} MB | Avail: {avail_mb():.0f} MB")
print(f" Path: {DB_PATH}")
print(f"{'='*60}")
if __name__ == '__main__':
main()