File size: 7,102 Bytes
eb69de4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Sample ~20 records per genuinely-hierarchical secondary-level category from
master_annotations_clean.parquet for SAE qualitative exploration.

Categories sampled (rationale — see THREADS.md Thread C):
  - 24 AMR drug-class secondary_labels                  (e.g. BETA-LACTAM, MACROLIDE)
  - 12 STRESS metals/biocides secondary_labels          (e.g. MERCURY, ARSENIC)
  - 6  iGEM type secondary_labels                       (CRISPR, fluorescent, …)
  - 14 VFDB vfcategory_name (true virulence subclass)   (Effector delivery system, …)

Skipped (non-hierarchical / degenerate / provenance-only):
  - virulence → full / core / VIRULENCE  (VFDB curation tier, not mechanism)
  - STRESS → STRESS                       (echo of primary label)
  - AMR → AMR_gene                        (CARD's catch-all "unspecified mechanism")
  - AMR → synthetic_AMR                   (provenance label, not mechanism)

Output: one JSONL per category at data/targeted_jsonl/qual/<slug>.jsonl
Schema mirrors the VFDB pipeline; positives only (no matched negatives needed
for qualitative SAE exploration).

Caveat: iGEM-derived categories sampled here are multi-component constructs
(~84% have internal stop codons). SAE features will reflect the whole
construct rather than a single CDS — flagged to the consumer.
"""
import argparse
import json
import re
from collections import defaultdict
from pathlib import Path

import pandas as pd


# ---- Categories to skip ----
EXCLUDE_SECONDARY = {
    # Non-hierarchical or degenerate
    "full", "core", "VIRULENCE",          # virulence-tier, not mechanism
    "STRESS",                              # echo of primary
    "AMR_gene",                            # catch-all, no finer info
    "synthetic_AMR",                       # provenance, not mechanism
}


def category_slug(s: str) -> str:
    return re.sub(r"[^A-Za-z0-9]+", "_", str(s)).strip("_")


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--master-parquet", type=Path,
                    default=Path("/home/ror25cal/MGnify/data/master_annotations_clean.parquet"))
    ap.add_argument("--out-dir", type=Path,
                    default=Path("/home/ror25cal/MGnify/data/targeted_jsonl/qual"))
    ap.add_argument("--per-category", type=int, default=20)
    ap.add_argument("--seed", type=int, default=42)
    args = ap.parse_args()

    args.out_dir.mkdir(parents=True, exist_ok=True)
    df = pd.read_parquet(args.master_parquet)
    df = df[df["actual_sequence"].notna()].copy()

    # Build the three category groupings.
    # 1. AMR drug classes + 2. STRESS metals + 3. iGEM types: from secondary_label,
    #    excluding the non-hierarchical labels.
    # 4. VFDB vfcategory_name: from the source_header parse (already in cleaned parquet).
    rows_by_cat: dict[tuple[str, str], pd.DataFrame] = {}

    # AMR drug-class
    sub = df[(df["primary_label"] == "AMR") & ~df["secondary_label"].isin(EXCLUDE_SECONDARY)]
    for sec, grp in sub.groupby("secondary_label"):
        rows_by_cat[("AMR", str(sec))] = grp

    # STRESS metals/biocides
    sub = df[(df["primary_label"] == "STRESS") & ~df["secondary_label"].isin(EXCLUDE_SECONDARY)]
    for sec, grp in sub.groupby("secondary_label"):
        rows_by_cat[("STRESS", str(sec))] = grp

    # iGEM subtypes — primary_label varies (reporter, gene_editing, integration,
    # toxin, biosafety, containment), but functional_class == "synthetic_marker"
    # captures iGEM cleanly along with toxin/biosafety/containment.
    igem_primaries = {"reporter", "gene_editing", "integration", "toxin", "biosafety", "containment"}
    sub = df[df["primary_label"].isin(igem_primaries) & ~df["secondary_label"].isin(EXCLUDE_SECONDARY)]
    for sec, grp in sub.groupby("secondary_label"):
        rows_by_cat[("iGEM_synthetic", str(sec))] = grp

    # VFDB vfcategory_name (true virulence subclasses)
    sub = df[df["vfcategory_name"].notna()]
    for cat, grp in sub.groupby("vfcategory_name"):
        rows_by_cat[("VFDB_virulence", str(cat))] = grp

    # Sample, write JSONL, record provenance summary
    summary_rows = []
    total_records = 0
    for (group, sec), grp in sorted(rows_by_cat.items()):
        n_avail = len(grp)
        n_take = min(args.per_category, n_avail)
        sampled = grp.sample(n=n_take, random_state=args.seed).reset_index(drop=True)
        slug = f"{group}__{category_slug(sec)}"
        out_path = args.out_dir / f"{slug}.jsonl"
        with open(out_path, "w") as f:
            for _, r in sampled.iterrows():
                seq = str(r["actual_sequence"]).upper()
                rec = {
                    "region_id": f"{r.get('seq_hash')}_QUAL",   # seq_hash unique per row in master
                    "is_positive": True,
                    "label": "QUAL",
                    "label_group": group,                       # AMR / STRESS / iGEM_synthetic / VFDB_virulence
                    "label_class": sec,                         # the secondary value
                    "primary_label": r.get("primary_label"),
                    "functional_class": r.get("functional_class"),
                    "gene_symbol": r.get("gene_name"),
                    "product_name": r.get("product_name"),
                    "organism": r.get("organism"),
                    "source_db": r.get("db"),
                    "source_accession": r.get("source_accession"),
                    "vf_id": r.get("vf_id"),
                    "vfcategory_name": r.get("vfcategory_name"),
                    "vfcategory_id": r.get("vfcategory_id"),
                    "vf_prototype_name": r.get("vf_prototype_name"),
                    "seq_hash": r.get("seq_hash"),
                    "cds_length": len(seq),
                    "mag_id": slug,                             # placeholder for path layout (matches embed_vfdb_lean)
                    "random_seed": args.seed,
                    "sequence": seq,
                }
                # Coerce pandas NA / NaN / numpy scalars to JSON-safe types
                clean = {}
                for k, v in rec.items():
                    if v is None or (isinstance(v, float) and v != v):
                        clean[k] = None
                    elif pd.isna(v):
                        clean[k] = None
                    elif hasattr(v, "item"):
                        clean[k] = v.item()
                    else:
                        clean[k] = v
                f.write(json.dumps(clean) + "\n")
        summary_rows.append({"group": group, "category": sec, "available": n_avail, "sampled": n_take, "slug": slug})
        total_records += n_take

    # Print + save summary
    summary_df = pd.DataFrame(summary_rows).sort_values(["group", "category"])
    summary_path = args.out_dir / "_sample_summary.csv"
    summary_df.to_csv(summary_path, index=False)
    print(f"Sampled {len(summary_rows)} categories, {total_records} total records")
    print(f"Output: {args.out_dir}")
    print(f"Summary: {summary_path}")
    print()
    print(summary_df.to_string(index=False))


if __name__ == "__main__":
    main()