RLDT / OVdata.py
KingmaoQ's picture
Clean history without large data files
59bd924
#!/usr/bin/env python3
"""Generate an offline-ready dataset from the CPTAC ovarian cohort.
This utility downloads (if necessary) the CPTAC ovarian multi-modal dataset
and converts the proteomics, transcriptomics, and clinical tables into a
normalized feature table compatible with the RLDT offline pipeline.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
# Guard against older pyarrow wheels compiled for NumPy 1.x which can cause
# `_ARRAY_API` errors when pandas enables the Arrow backend automatically.
os.environ.setdefault("PANDAS_USE_PYARROW_BACKEND", "0")
os.environ.setdefault("PANDAS_USE_PYARROW_EXTENSION_ARRAY", "0")
import numpy as np
from RL0910.pandas_compat import get_pandas
pd = get_pandas()
def _import_cptac():
"""Import the cptac package with helpful diagnostics."""
try:
import cptac # type: ignore
except Exception as exc: # pragma: no cover - depends on runtime environment
if exc.__class__.__name__ == "NoInternetError":
print(
"[ERROR] CPTAC index missing. Connect to the internet and rerun,\n"
"or execute `python -m cptac download` once to bootstrap the index.",
file=sys.stderr,
)
sys.exit(1)
raise
from cptac.exceptions import ( # type: ignore
NoInternetError,
DataSourceNotFoundError,
MissingFileError,
)
return cptac, NoInternetError, DataSourceNotFoundError, MissingFileError
def _ensure_dataset(
cptac_module,
no_internet_error,
datasource_error,
missing_file_error,
):
"""Load the CPTAC ovarian dataset, downloading it if necessary."""
download_attempted = False
try:
dataset = cptac_module.Ov()
return dataset
except no_internet_error as err: # pragma: no cover - depends on connectivity
print(f"[ERROR] {err}", file=sys.stderr)
print(
"Please connect to the CPTAC data portal and rerun, or download the dataset manually.",
file=sys.stderr,
)
sys.exit(1)
except (datasource_error, missing_file_error, FileNotFoundError):
download_attempted = True
except Exception as err: # pragma: no cover - defensive fallback
print(f"[WARN] Initial CPTAC load failed: {err}", file=sys.stderr)
download_attempted = True
if download_attempted:
print("Attempting to download CPTAC ovarian assets (requires internet)...")
last_error: Exception | None = None
for cancer_key in ("ov", "ovarian", "Ovarian"):
try:
cptac_module.download_cancer(cancer_key)
last_error = None
break
except Exception as inner: # pragma: no cover - depends on remote service
last_error = inner
if last_error is not None:
print(f"[ERROR] Unable to download CPTAC ovarian dataset: {last_error}", file=sys.stderr)
sys.exit(1)
try:
dataset = cptac_module.Ov()
return dataset
except Exception as err: # pragma: no cover - defensive
print(f"[ERROR] CPTAC dataset still unavailable after download: {err}", file=sys.stderr)
sys.exit(1)
raise RuntimeError("Unexpected CPTAC import state")
def _standardize_index(df: pd.DataFrame) -> pd.DataFrame:
"""Return a copy with a flat string index keyed by patient identifier."""
out = df.copy()
if isinstance(out.index, pd.MultiIndex):
out.index = out.index.get_level_values(0)
out.index = out.index.astype(str)
out = out[~out.index.duplicated(keep="first")]
return out
def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Flatten MultiIndex columns to single strings."""
if isinstance(df.columns, pd.MultiIndex):
flat = []
for col in df.columns:
pieces = [str(level) for level in col if level not in (None, "", "nan")]
label = "_".join(pieces).strip("_") or "feature"
flat.append(label)
df = df.copy()
df.columns = flat
else:
df = df.copy()
df.columns = [str(c) for c in df.columns]
return df
def _safe_label(label: str) -> str:
"""Create a filesystem and YAML friendly slug."""
cleaned = "".join(ch if ch.isalnum() else "_" for ch in label)
cleaned = cleaned.strip("_").lower()
return cleaned or "feature"
def _unique_name(base: str, registry: set[str]) -> str:
name = base
counter = 2
while name in registry:
name = f"{base}_{counter}"
counter += 1
registry.add(name)
return name
def _select_top_variance(df: pd.DataFrame, top_n: int) -> pd.DataFrame:
if top_n <= 0 or df.empty:
return df
if df.shape[1] <= top_n:
return df
variances = df.var(axis=0, skipna=True)
keep = variances.sort_values(ascending=False).head(top_n).index
return df.loc[:, keep]
def _prepare_modality(
raw_df: pd.DataFrame,
modality_name: str,
prefix: str,
top_n: int,
min_coverage: float,
name_registry: set[str],
) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float | str]]]:
"""Convert a CPTAC table into numeric features and collect statistics."""
if raw_df is None or raw_df.empty:
return pd.DataFrame(), {}
df = _flatten_columns(_standardize_index(raw_df))
if df.empty:
return df, {}
numeric = {}
for col in df.columns:
series = df[col]
if pd.api.types.is_numeric_dtype(series):
numeric[col] = pd.to_numeric(series, errors="coerce")
else:
numeric[col] = series.astype("category").cat.codes.replace(-1, np.nan)
numeric_df = pd.DataFrame(numeric, index=df.index).astype(float)
if numeric_df.empty:
return numeric_df, {}
coverage = numeric_df.notna().mean()
keep_cols = coverage[coverage >= min_coverage].index.tolist()
numeric_df = numeric_df.loc[:, keep_cols]
if numeric_df.empty:
return numeric_df, {}
medians = numeric_df.median()
filled = numeric_df.fillna(medians)
filtered = _select_top_variance(filled, top_n)
medians = medians.loc[filtered.columns]
coverage = coverage.loc[filtered.columns]
variances = filtered.var(axis=0, skipna=True)
rename_map: Dict[str, str] = {}
stats: Dict[str, Dict[str, float | str]] = {}
for original in filtered.columns:
base = _safe_label(original)
unique = _unique_name(f"{prefix}_{base}", name_registry)
state_name = f"state_{unique}"
rename_map[original] = state_name
stats[state_name] = {
"original_name": original,
"modality": modality_name,
"coverage": float(coverage.get(original, float("nan"))),
"median": float(medians.get(original, float("nan"))),
"variance": float(variances.get(original, float("nan"))),
}
prepared = filtered.rename(columns=rename_map)
return prepared, stats
def _combine_modalities(frames: Iterable[pd.DataFrame]) -> pd.DataFrame:
indices = pd.Index([])
dataframes = [df for df in frames if not df.empty]
for df in dataframes:
indices = indices.union(df.index)
combined = pd.DataFrame(index=indices.sort_values())
for df in dataframes:
combined = combined.join(df, how="left")
return combined
def _normalize_features(
df: pd.DataFrame,
apply_minmax: bool,
) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]:
stats: Dict[str, Dict[str, float]] = {}
if df.empty:
return df, stats
if apply_minmax:
mins = df.min(axis=0)
maxs = df.max(axis=0)
denom = (maxs - mins).replace(0, np.nan)
scaled = (df - mins) / denom
scaled = scaled.clip(0.0, 1.0)
scaled = scaled.fillna(0.5)
for col in scaled.columns:
stats[col] = {
"min": float(mins.get(col, float("nan"))),
"max": float(maxs.get(col, float("nan"))),
}
return scaled, stats
filled = df.copy()
medians = filled.median(axis=0)
filled = filled.fillna(medians)
for col in filled.columns:
stats[col] = {
"mean": float(filled[col].mean()),
"std": float(filled[col].std(ddof=0)),
}
return filled, stats
def _derive_actions(
risk: pd.Series,
desired_actions: int,
) -> Tuple[pd.Series, List[str]]:
if risk.empty:
return pd.Series(dtype=int), ["Undefined"]
unique_values = risk.nunique(dropna=True)
n_actions = max(1, min(desired_actions, unique_values))
if n_actions <= 1:
actions = pd.Series(0, index=risk.index, dtype=int)
return actions, ["Single bucket"]
quantiles = pd.qcut(
risk.rank(method="first"),
q=n_actions,
labels=False,
duplicates="drop",
).astype(int)
action_names = [f"Risk quantile {i + 1}/{len(quantiles.unique())}" for i in range(len(quantiles.unique()))]
return quantiles, action_names
def _yaml_quote(value: str) -> str:
if value == "":
return "''"
if any(ch in value for ch in ":{}[]&,#|>!-?%*@\"'\t\n\r "):
return json.dumps(value, ensure_ascii=False)
return value
def build_dataset(args: argparse.Namespace) -> None:
cptac_module, no_internet_error, datasource_error, missing_file_error = _import_cptac()
dataset = _ensure_dataset(cptac_module, no_internet_error, datasource_error, missing_file_error)
np.random.seed(args.seed)
proteomics = dataset.get_proteomics()
transcriptomics = dataset.get_transcriptomics()
clinical = dataset.get_clinical()
name_registry: set[str] = set()
prot_df, prot_stats = _prepare_modality(
proteomics,
"proteomics",
"prot",
args.max_proteins,
args.min_coverage,
name_registry,
)
rna_df, rna_stats = _prepare_modality(
transcriptomics,
"transcriptomics",
"rna",
args.max_transcripts,
args.min_coverage,
name_registry,
)
clin_df, clin_stats = _prepare_modality(
clinical,
"clinical",
"clin",
args.max_clinical,
max(0.35, args.min_coverage * 0.5),
name_registry,
)
combined = _combine_modalities([prot_df, rna_df, clin_df])
if combined.empty:
print(
"[ERROR] No features satisfied the filtering criteria. Consider lowering --min-coverage or increasing --max-* limits.",
file=sys.stderr,
)
sys.exit(1)
# Fill residual gaps with column medians before normalization
medians = combined.median(axis=0)
combined = combined.fillna(medians)
normalized, scaling_stats = _normalize_features(combined, not args.no_normalize)
state_columns = list(normalized.columns)
if not state_columns:
print("[ERROR] Normalized feature matrix is empty after preprocessing.", file=sys.stderr)
sys.exit(1)
risk_score = normalized[state_columns].mean(axis=1)
actions, action_names = _derive_actions(risk_score, args.actions)
rewards = (0.5 - risk_score) * 2.0
rewards = rewards.clip(-1.0, 1.0)
output_dir = Path(args.output_dir).expanduser().resolve()
output_dir.mkdir(parents=True, exist_ok=True)
dataset_df = pd.DataFrame(
{
"patient_id": normalized.index.astype(str),
"timestep": 0,
"action": actions.reindex(normalized.index).fillna(0).astype(int),
"reward": rewards.reindex(normalized.index).astype(float),
"terminal": True,
}
)
dataset_df = dataset_df.join(normalized[state_columns])
csv_path = output_dir / "ovarian_offline_dataset.csv"
dataset_df.to_csv(csv_path, index=False)
npz_path = output_dir / "ovarian_offline_dataset.npz"
np.savez(
npz_path,
states=normalized[state_columns].to_numpy(dtype=np.float32),
actions=dataset_df["action"].to_numpy(dtype=np.int64),
rewards=dataset_df["reward"].to_numpy(dtype=np.float32),
terminals=np.ones(len(dataset_df), dtype=np.bool_),
patient_ids=dataset_df["patient_id"].to_numpy(dtype="U"),
timesteps=dataset_df["timestep"].to_numpy(dtype=np.int64),
state_columns=np.array(state_columns),
action_names=np.array(action_names, dtype="U"),
risk_scores=risk_score.reindex(normalized.index).to_numpy(dtype=np.float32),
)
feature_catalog = {**prot_stats, **rna_stats, **clin_stats}
for column, stats in feature_catalog.items():
stats.update(scaling_stats.get(column, {}))
feature_labels = {
column: f"{stats['modality']}::{stats['original_name']}"
for column, stats in feature_catalog.items()
}
schema_lines = [
"data_type: tabular",
"mapping:",
" trajectory_id: patient_id",
" timestep: timestep",
" action: action",
" reward: reward",
" terminal: terminal",
" feature_cols:",
]
for col in state_columns:
schema_lines.append(f" - {col}")
if args.no_normalize:
schema_lines.extend([
"normalization:",
" method: none",
])
else:
schema_lines.extend([
"normalization:",
" method: minmax",
" clip_min: 0.0",
" clip_max: 1.0",
])
schema_lines.append("action_names:")
for name in action_names:
schema_lines.append(f" - {_yaml_quote(name)}")
schema_lines.append("feature_names:")
for col in state_columns:
schema_lines.append(f" - {_yaml_quote(feature_labels.get(col, col))}")
schema_lines.extend(
[
"reward_spec:",
" expression: normalized_risk_score",
" window_agg: last",
]
)
schema_path = output_dir / "ovarian_offline_schema.yaml"
schema_path.write_text("\n".join(schema_lines) + "\n", encoding="utf-8")
metadata = {
"dataset": "CPTAC Ovarian (proteomics + transcriptomics + clinical)",
"patients": int(dataset_df["patient_id"].nunique()),
"records": int(len(dataset_df)),
"features": len(state_columns),
"actions": {
"count": len(action_names),
"names": action_names,
"definition": "Risk quantiles derived from the normalized multi-modal feature mean",
},
"reward": {
"range": [-1.0, 1.0],
"definition": "Centered risk score (higher is better) from combined normalized features",
},
"normalization": "minmax" if not args.no_normalize else "none",
"feature_catalog": feature_catalog,
"files": {
"csv": csv_path.name,
"npz": npz_path.name,
"schema": schema_path.name,
},
}
metadata_path = output_dir / "ovarian_offline_metadata.json"
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
print("✔ Dataset prepared")
print(f" • CSV table : {csv_path}")
print(f" • Numpy archive : {npz_path}")
print(f" • Schema YAML : {schema_path}")
print(f" • Metadata JSON : {metadata_path}")
def parse_args(argv: List[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Download and format the CPTAC ovarian cohort for RLDT offline workflows",
)
parser.add_argument(
"--output-dir",
default="RL0910/data/cptac_ovarian",
help="Directory to store the generated dataset files (default: %(default)s)",
)
parser.add_argument(
"--max-proteins",
type=int,
default=75,
help="Maximum number of proteomic features to retain (variance-ranked).",
)
parser.add_argument(
"--max-transcripts",
type=int,
default=75,
help="Maximum number of transcriptomic features to retain (variance-ranked).",
)
parser.add_argument(
"--max-clinical",
type=int,
default=40,
help="Maximum number of engineered clinical features to retain.",
)
parser.add_argument(
"--min-coverage",
type=float,
default=0.65,
help="Minimum fraction of patients required for a feature to be kept (0-1).",
)
parser.add_argument(
"--actions",
type=int,
default=5,
help="Number of action buckets derived from the risk score quantiles.",
)
parser.add_argument(
"--no-normalize",
action="store_true",
help="Disable min-max normalization (features remain on their native scale).",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed used when deriving surrogate actions.",
)
args = parser.parse_args(argv)
if not 0 < args.min_coverage <= 1:
parser.error("--min-coverage must be within (0, 1].")
if args.actions <= 0:
parser.error("--actions must be a positive integer.")
return args
def main(argv: List[str] | None = None) -> int:
args = parse_args(argv)
build_dataset(args)
return 0
if __name__ == "__main__":
raise SystemExit(main())