| |
| """ |
| extract_iptm_affinity_csv_all.py |
| |
| Writes: |
| - out_dir/wt_iptm_affinity_all.csv |
| - out_dir/smiles_iptm_affinity_all.csv |
| |
| Also prints: |
| - N |
| - Spearman rho (affinity vs iptm) |
| - Pearson r (affinity vs iptm) |
| """ |
|
|
| from pathlib import Path |
| import numpy as np |
| import pandas as pd |
|
|
|
|
| def corr_stats(df: pd.DataFrame, x: str, y: str): |
| |
| xx = pd.to_numeric(df[x], errors="coerce") |
| yy = pd.to_numeric(df[y], errors="coerce") |
| m = xx.notna() & yy.notna() |
| xx = xx[m] |
| yy = yy[m] |
| n = int(m.sum()) |
|
|
| |
| pearson_r = float(xx.corr(yy, method="pearson")) if n > 1 else float("nan") |
| |
| spearman_rho = float(xx.corr(yy, method="spearman")) if n > 1 else float("nan") |
|
|
| return {"n": n, "pearson_r": pearson_r, "spearman_rho": spearman_rho} |
|
|
|
|
| def clean_one( |
| in_csv: Path, |
| out_csv: Path, |
| iptm_col: str, |
| affinity_col: str = "affinity", |
| keep_cols=(), |
| ): |
| df = pd.read_csv(in_csv) |
|
|
| |
| need = [affinity_col, iptm_col] |
| missing = [c for c in need if c not in df.columns] |
| if missing: |
| raise ValueError(f"{in_csv} missing columns: {missing}. Found: {list(df.columns)}") |
|
|
| |
| df[affinity_col] = pd.to_numeric(df[affinity_col], errors="coerce") |
| df[iptm_col] = pd.to_numeric(df[iptm_col], errors="coerce") |
|
|
| |
| df = df.dropna(subset=[affinity_col, iptm_col]).reset_index(drop=True) |
|
|
| |
| out = pd.DataFrame({ |
| "affinity": df[affinity_col].astype(float), |
| "iptm": df[iptm_col].astype(float), |
| }) |
|
|
| |
| if "split" in df.columns: |
| out.insert(0, "split", df["split"].astype(str)) |
|
|
| |
| for c in keep_cols: |
| if c in df.columns: |
| out[c] = df[c] |
|
|
| out_csv.parent.mkdir(parents=True, exist_ok=True) |
| out.to_csv(out_csv, index=False) |
|
|
| stats = corr_stats(out, "iptm", "affinity") |
| print(f"[write] {out_csv}") |
| print(f" N={stats['n']} | Pearson r={stats['pearson_r']:.4f} | Spearman rho={stats['spearman_rho']:.4f}") |
|
|
| |
| stats_path = out_csv.with_suffix(".stats.json") |
| with open(stats_path, "w") as f: |
| import json |
| json.dump( |
| { |
| "input_csv": str(in_csv), |
| "output_csv": str(out_csv), |
| "iptm_col": iptm_col, |
| "affinity_col": affinity_col, |
| **stats, |
| }, |
| f, |
| indent=2, |
| ) |
|
|
|
|
| def main(): |
| import argparse |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--wt_meta_csv", type=str, required=True) |
| ap.add_argument("--smiles_meta_csv", type=str, required=True) |
| ap.add_argument("--out_dir", type=str, required=True) |
|
|
| ap.add_argument("--wt_iptm_col", type=str, default="wt_iptm_score") |
| ap.add_argument("--smiles_iptm_col", type=str, default="smiles_iptm_score") |
| ap.add_argument("--affinity_col", type=str, default="affinity") |
| args = ap.parse_args() |
|
|
| out_dir = Path(args.out_dir) |
|
|
| clean_one( |
| Path(args.wt_meta_csv), |
| out_dir / "wt_iptm_affinity_all.csv", |
| iptm_col=args.wt_iptm_col, |
| affinity_col=args.affinity_col, |
| keep_cols=("seq1", "seq2", "Fasta2SMILES", "REACT_SMILES"), |
| ) |
|
|
| clean_one( |
| Path(args.smiles_meta_csv), |
| out_dir / "smiles_iptm_affinity_all.csv", |
| iptm_col=args.smiles_iptm_col, |
| affinity_col=args.affinity_col, |
| keep_cols=("seq1", "seq2", "Fasta2SMILES", "REACT_SMILES", "smiles_sequence"), |
| ) |
|
|
| print(f"\n[DONE] CSVs + stats JSONs in: {out_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|