Spaces:
Running
Running
File size: 2,248 Bytes
0ed74db | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | """Build data/per_marker_embeddings.parquet from data/per_marker_embeddings.jsonl.
Streams the (large) jsonl, keeps the last record per genome_accession (Modal jobs may
have written multiple times during retries), and casts the embedding dims to float32
to keep the parquet under ~500 MB.
"""
from __future__ import annotations
import json
import time
import numpy as np
import pandas as pd
from microbe_model import config
def main() -> None:
src = config.DATA / "per_marker_embeddings.jsonl"
if not src.exists():
raise SystemExit(f"Missing {src}")
t0 = time.time()
by_genome: dict[str, dict] = {}
n_lines = 0
with open(src) as fh:
for line in fh:
n_lines += 1
try:
r = json.loads(line)
except json.JSONDecodeError:
continue
ga = r.get("genome_accession") or r.get("accession")
if not ga:
continue
by_genome[ga] = r
if n_lines % 5000 == 0:
print(f" read {n_lines:,} lines, {len(by_genome):,} unique genomes")
print(f"Parsed {n_lines:,} lines → {len(by_genome):,} unique genomes ({time.time()-t0:.1f}s)")
rows = list(by_genome.values())
df = pd.DataFrame(rows)
if "bacdive_id" in df.columns:
df["bacdive_id"] = pd.to_numeric(df["bacdive_id"], errors="coerce").astype("Int64")
float_cols = [c for c in df.columns if c.startswith("pme_") and c != "pme_marker_proteins_total"]
df[float_cols] = df[float_cols].astype(np.float32)
if "pme_marker_proteins_total" in df.columns:
df["pme_marker_proteins_total"] = pd.to_numeric(
df["pme_marker_proteins_total"], errors="coerce"
).astype("Int32")
# Reorder: ids first, then features
id_cols = [c for c in ("bacdive_id", "genome_accession") if c in df.columns]
other_cols = [c for c in df.columns if c not in id_cols]
df = df[id_cols + other_cols]
out = config.DATA / "per_marker_embeddings.parquet"
df.to_parquet(out, index=False)
sz_mb = out.stat().st_size / 1e6
print(f"Wrote {len(df):,} rows × {df.shape[1]} cols → {out} ({sz_mb:.1f} MB, {time.time()-t0:.1f}s)")
if __name__ == "__main__":
main()
|