|
|
|
|
|
""" |
|
|
extract_iptm_affinity_csv_all.py |
|
|
|
|
|
Writes: |
|
|
- out_dir/wt_iptm_affinity_all.csv |
|
|
- out_dir/smiles_iptm_affinity_all.csv |
|
|
|
|
|
Also prints: |
|
|
- N |
|
|
- Spearman rho (affinity vs iptm) |
|
|
- Pearson r (affinity vs iptm) |
|
|
""" |
|
|
|
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
def corr_stats(df: pd.DataFrame, x: str, y: str): |
|
|
|
|
|
xx = pd.to_numeric(df[x], errors="coerce") |
|
|
yy = pd.to_numeric(df[y], errors="coerce") |
|
|
m = xx.notna() & yy.notna() |
|
|
xx = xx[m] |
|
|
yy = yy[m] |
|
|
n = int(m.sum()) |
|
|
|
|
|
|
|
|
pearson_r = float(xx.corr(yy, method="pearson")) if n > 1 else float("nan") |
|
|
|
|
|
spearman_rho = float(xx.corr(yy, method="spearman")) if n > 1 else float("nan") |
|
|
|
|
|
return {"n": n, "pearson_r": pearson_r, "spearman_rho": spearman_rho} |
|
|
|
|
|
|
|
|
def clean_one( |
|
|
in_csv: Path, |
|
|
out_csv: Path, |
|
|
iptm_col: str, |
|
|
affinity_col: str = "affinity", |
|
|
keep_cols=(), |
|
|
): |
|
|
df = pd.read_csv(in_csv) |
|
|
|
|
|
|
|
|
need = [affinity_col, iptm_col] |
|
|
missing = [c for c in need if c not in df.columns] |
|
|
if missing: |
|
|
raise ValueError(f"{in_csv} missing columns: {missing}. Found: {list(df.columns)}") |
|
|
|
|
|
|
|
|
df[affinity_col] = pd.to_numeric(df[affinity_col], errors="coerce") |
|
|
df[iptm_col] = pd.to_numeric(df[iptm_col], errors="coerce") |
|
|
|
|
|
|
|
|
df = df.dropna(subset=[affinity_col, iptm_col]).reset_index(drop=True) |
|
|
|
|
|
|
|
|
out = pd.DataFrame({ |
|
|
"affinity": df[affinity_col].astype(float), |
|
|
"iptm": df[iptm_col].astype(float), |
|
|
}) |
|
|
|
|
|
|
|
|
if "split" in df.columns: |
|
|
out.insert(0, "split", df["split"].astype(str)) |
|
|
|
|
|
|
|
|
for c in keep_cols: |
|
|
if c in df.columns: |
|
|
out[c] = df[c] |
|
|
|
|
|
out_csv.parent.mkdir(parents=True, exist_ok=True) |
|
|
out.to_csv(out_csv, index=False) |
|
|
|
|
|
stats = corr_stats(out, "iptm", "affinity") |
|
|
print(f"[write] {out_csv}") |
|
|
print(f" N={stats['n']} | Pearson r={stats['pearson_r']:.4f} | Spearman rho={stats['spearman_rho']:.4f}") |
|
|
|
|
|
|
|
|
stats_path = out_csv.with_suffix(".stats.json") |
|
|
with open(stats_path, "w") as f: |
|
|
import json |
|
|
json.dump( |
|
|
{ |
|
|
"input_csv": str(in_csv), |
|
|
"output_csv": str(out_csv), |
|
|
"iptm_col": iptm_col, |
|
|
"affinity_col": affinity_col, |
|
|
**stats, |
|
|
}, |
|
|
f, |
|
|
indent=2, |
|
|
) |
|
|
|
|
|
|
|
|
def main(): |
|
|
import argparse |
|
|
ap = argparse.ArgumentParser() |
|
|
ap.add_argument("--wt_meta_csv", type=str, required=True) |
|
|
ap.add_argument("--smiles_meta_csv", type=str, required=True) |
|
|
ap.add_argument("--out_dir", type=str, required=True) |
|
|
|
|
|
ap.add_argument("--wt_iptm_col", type=str, default="wt_iptm_score") |
|
|
ap.add_argument("--smiles_iptm_col", type=str, default="smiles_iptm_score") |
|
|
ap.add_argument("--affinity_col", type=str, default="affinity") |
|
|
args = ap.parse_args() |
|
|
|
|
|
out_dir = Path(args.out_dir) |
|
|
|
|
|
clean_one( |
|
|
Path(args.wt_meta_csv), |
|
|
out_dir / "wt_iptm_affinity_all.csv", |
|
|
iptm_col=args.wt_iptm_col, |
|
|
affinity_col=args.affinity_col, |
|
|
keep_cols=("seq1", "seq2", "Fasta2SMILES", "REACT_SMILES"), |
|
|
) |
|
|
|
|
|
clean_one( |
|
|
Path(args.smiles_meta_csv), |
|
|
out_dir / "smiles_iptm_affinity_all.csv", |
|
|
iptm_col=args.smiles_iptm_col, |
|
|
affinity_col=args.affinity_col, |
|
|
keep_cols=("seq1", "seq2", "Fasta2SMILES", "REACT_SMILES", "smiles_sequence"), |
|
|
) |
|
|
|
|
|
print(f"\n[DONE] CSVs + stats JSONs in: {out_dir}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|