File size: 2,248 Bytes
0ed74db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Build data/per_marker_embeddings.parquet from data/per_marker_embeddings.jsonl.

Streams the (large) jsonl, keeps the last record per genome_accession (Modal jobs may
have written multiple times during retries), and casts the embedding dims to float32
to keep the parquet under ~500 MB.
"""
from __future__ import annotations

import json
import time

import numpy as np
import pandas as pd

from microbe_model import config


def main() -> None:
    src = config.DATA / "per_marker_embeddings.jsonl"
    if not src.exists():
        raise SystemExit(f"Missing {src}")

    t0 = time.time()
    by_genome: dict[str, dict] = {}
    n_lines = 0
    with open(src) as fh:
        for line in fh:
            n_lines += 1
            try:
                r = json.loads(line)
            except json.JSONDecodeError:
                continue
            ga = r.get("genome_accession") or r.get("accession")
            if not ga:
                continue
            by_genome[ga] = r
            if n_lines % 5000 == 0:
                print(f"  read {n_lines:,} lines, {len(by_genome):,} unique genomes")

    print(f"Parsed {n_lines:,} lines → {len(by_genome):,} unique genomes ({time.time()-t0:.1f}s)")

    rows = list(by_genome.values())
    df = pd.DataFrame(rows)
    if "bacdive_id" in df.columns:
        df["bacdive_id"] = pd.to_numeric(df["bacdive_id"], errors="coerce").astype("Int64")

    float_cols = [c for c in df.columns if c.startswith("pme_") and c != "pme_marker_proteins_total"]
    df[float_cols] = df[float_cols].astype(np.float32)
    if "pme_marker_proteins_total" in df.columns:
        df["pme_marker_proteins_total"] = pd.to_numeric(
            df["pme_marker_proteins_total"], errors="coerce"
        ).astype("Int32")

    # Reorder: ids first, then features
    id_cols = [c for c in ("bacdive_id", "genome_accession") if c in df.columns]
    other_cols = [c for c in df.columns if c not in id_cols]
    df = df[id_cols + other_cols]

    out = config.DATA / "per_marker_embeddings.parquet"
    df.to_parquet(out, index=False)
    sz_mb = out.stat().st_size / 1e6
    print(f"Wrote {len(df):,} rows × {df.shape[1]} cols → {out} ({sz_mb:.1f} MB, {time.time()-t0:.1f}s)")


if __name__ == "__main__":
    main()