Spaces:
Running
Running
| """Build data/per_marker_embeddings.parquet from data/per_marker_embeddings.jsonl. | |
| Streams the (large) jsonl, keeps the last record per genome_accession (Modal jobs may | |
| have written multiple times during retries), and casts the embedding dims to float32 | |
| to keep the parquet under ~500 MB. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| from microbe_model import config | |
| def main() -> None: | |
| src = config.DATA / "per_marker_embeddings.jsonl" | |
| if not src.exists(): | |
| raise SystemExit(f"Missing {src}") | |
| t0 = time.time() | |
| by_genome: dict[str, dict] = {} | |
| n_lines = 0 | |
| with open(src) as fh: | |
| for line in fh: | |
| n_lines += 1 | |
| try: | |
| r = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| ga = r.get("genome_accession") or r.get("accession") | |
| if not ga: | |
| continue | |
| by_genome[ga] = r | |
| if n_lines % 5000 == 0: | |
| print(f" read {n_lines:,} lines, {len(by_genome):,} unique genomes") | |
| print(f"Parsed {n_lines:,} lines → {len(by_genome):,} unique genomes ({time.time()-t0:.1f}s)") | |
| rows = list(by_genome.values()) | |
| df = pd.DataFrame(rows) | |
| if "bacdive_id" in df.columns: | |
| df["bacdive_id"] = pd.to_numeric(df["bacdive_id"], errors="coerce").astype("Int64") | |
| float_cols = [c for c in df.columns if c.startswith("pme_") and c != "pme_marker_proteins_total"] | |
| df[float_cols] = df[float_cols].astype(np.float32) | |
| if "pme_marker_proteins_total" in df.columns: | |
| df["pme_marker_proteins_total"] = pd.to_numeric( | |
| df["pme_marker_proteins_total"], errors="coerce" | |
| ).astype("Int32") | |
| # Reorder: ids first, then features | |
| id_cols = [c for c in ("bacdive_id", "genome_accession") if c in df.columns] | |
| other_cols = [c for c in df.columns if c not in id_cols] | |
| df = df[id_cols + other_cols] | |
| out = config.DATA / "per_marker_embeddings.parquet" | |
| df.to_parquet(out, index=False) | |
| sz_mb = out.stat().st_size / 1e6 | |
| print(f"Wrote {len(df):,} rows × {df.shape[1]} cols → {out} ({sz_mb:.1f} MB, {time.time()-t0:.1f}s)") | |
| if __name__ == "__main__": | |
| main() | |