|
|
|
|
|
"""Generate an offline-ready dataset from the CPTAC ovarian cohort. |
|
|
|
|
|
This utility downloads (if necessary) the CPTAC ovarian multi-modal dataset |
|
|
and converts the proteomics, transcriptomics, and clinical tables into a |
|
|
normalized feature table compatible with the RLDT offline pipeline. |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, List, Tuple |
|
|
|
|
|
|
|
|
|
|
|
os.environ.setdefault("PANDAS_USE_PYARROW_BACKEND", "0") |
|
|
os.environ.setdefault("PANDAS_USE_PYARROW_EXTENSION_ARRAY", "0") |
|
|
|
|
|
import numpy as np |
|
|
from RL0910.pandas_compat import get_pandas |
|
|
pd = get_pandas() |
|
|
|
|
|
|
|
|
def _import_cptac(): |
|
|
"""Import the cptac package with helpful diagnostics.""" |
|
|
try: |
|
|
import cptac |
|
|
except Exception as exc: |
|
|
if exc.__class__.__name__ == "NoInternetError": |
|
|
print( |
|
|
"[ERROR] CPTAC index missing. Connect to the internet and rerun,\n" |
|
|
"or execute `python -m cptac download` once to bootstrap the index.", |
|
|
file=sys.stderr, |
|
|
) |
|
|
sys.exit(1) |
|
|
raise |
|
|
|
|
|
from cptac.exceptions import ( |
|
|
NoInternetError, |
|
|
DataSourceNotFoundError, |
|
|
MissingFileError, |
|
|
) |
|
|
|
|
|
return cptac, NoInternetError, DataSourceNotFoundError, MissingFileError |
|
|
|
|
|
|
|
|
def _ensure_dataset( |
|
|
cptac_module, |
|
|
no_internet_error, |
|
|
datasource_error, |
|
|
missing_file_error, |
|
|
): |
|
|
"""Load the CPTAC ovarian dataset, downloading it if necessary.""" |
|
|
download_attempted = False |
|
|
try: |
|
|
dataset = cptac_module.Ov() |
|
|
return dataset |
|
|
except no_internet_error as err: |
|
|
print(f"[ERROR] {err}", file=sys.stderr) |
|
|
print( |
|
|
"Please connect to the CPTAC data portal and rerun, or download the dataset manually.", |
|
|
file=sys.stderr, |
|
|
) |
|
|
sys.exit(1) |
|
|
except (datasource_error, missing_file_error, FileNotFoundError): |
|
|
download_attempted = True |
|
|
except Exception as err: |
|
|
print(f"[WARN] Initial CPTAC load failed: {err}", file=sys.stderr) |
|
|
download_attempted = True |
|
|
|
|
|
if download_attempted: |
|
|
print("Attempting to download CPTAC ovarian assets (requires internet)...") |
|
|
last_error: Exception | None = None |
|
|
for cancer_key in ("ov", "ovarian", "Ovarian"): |
|
|
try: |
|
|
cptac_module.download_cancer(cancer_key) |
|
|
last_error = None |
|
|
break |
|
|
except Exception as inner: |
|
|
last_error = inner |
|
|
|
|
|
if last_error is not None: |
|
|
print(f"[ERROR] Unable to download CPTAC ovarian dataset: {last_error}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
dataset = cptac_module.Ov() |
|
|
return dataset |
|
|
except Exception as err: |
|
|
print(f"[ERROR] CPTAC dataset still unavailable after download: {err}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
raise RuntimeError("Unexpected CPTAC import state") |
|
|
|
|
|
|
|
|
def _standardize_index(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Return a copy with a flat string index keyed by patient identifier.""" |
|
|
out = df.copy() |
|
|
if isinstance(out.index, pd.MultiIndex): |
|
|
out.index = out.index.get_level_values(0) |
|
|
out.index = out.index.astype(str) |
|
|
out = out[~out.index.duplicated(keep="first")] |
|
|
return out |
|
|
|
|
|
|
|
|
def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Flatten MultiIndex columns to single strings.""" |
|
|
if isinstance(df.columns, pd.MultiIndex): |
|
|
flat = [] |
|
|
for col in df.columns: |
|
|
pieces = [str(level) for level in col if level not in (None, "", "nan")] |
|
|
label = "_".join(pieces).strip("_") or "feature" |
|
|
flat.append(label) |
|
|
df = df.copy() |
|
|
df.columns = flat |
|
|
else: |
|
|
df = df.copy() |
|
|
df.columns = [str(c) for c in df.columns] |
|
|
return df |
|
|
|
|
|
|
|
|
def _safe_label(label: str) -> str: |
|
|
"""Create a filesystem and YAML friendly slug.""" |
|
|
cleaned = "".join(ch if ch.isalnum() else "_" for ch in label) |
|
|
cleaned = cleaned.strip("_").lower() |
|
|
return cleaned or "feature" |
|
|
|
|
|
|
|
|
def _unique_name(base: str, registry: set[str]) -> str: |
|
|
name = base |
|
|
counter = 2 |
|
|
while name in registry: |
|
|
name = f"{base}_{counter}" |
|
|
counter += 1 |
|
|
registry.add(name) |
|
|
return name |
|
|
|
|
|
|
|
|
def _select_top_variance(df: pd.DataFrame, top_n: int) -> pd.DataFrame: |
|
|
if top_n <= 0 or df.empty: |
|
|
return df |
|
|
if df.shape[1] <= top_n: |
|
|
return df |
|
|
variances = df.var(axis=0, skipna=True) |
|
|
keep = variances.sort_values(ascending=False).head(top_n).index |
|
|
return df.loc[:, keep] |
|
|
|
|
|
|
|
|
def _prepare_modality( |
|
|
raw_df: pd.DataFrame, |
|
|
modality_name: str, |
|
|
prefix: str, |
|
|
top_n: int, |
|
|
min_coverage: float, |
|
|
name_registry: set[str], |
|
|
) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float | str]]]: |
|
|
"""Convert a CPTAC table into numeric features and collect statistics.""" |
|
|
if raw_df is None or raw_df.empty: |
|
|
return pd.DataFrame(), {} |
|
|
|
|
|
df = _flatten_columns(_standardize_index(raw_df)) |
|
|
if df.empty: |
|
|
return df, {} |
|
|
|
|
|
numeric = {} |
|
|
for col in df.columns: |
|
|
series = df[col] |
|
|
if pd.api.types.is_numeric_dtype(series): |
|
|
numeric[col] = pd.to_numeric(series, errors="coerce") |
|
|
else: |
|
|
numeric[col] = series.astype("category").cat.codes.replace(-1, np.nan) |
|
|
numeric_df = pd.DataFrame(numeric, index=df.index).astype(float) |
|
|
if numeric_df.empty: |
|
|
return numeric_df, {} |
|
|
|
|
|
coverage = numeric_df.notna().mean() |
|
|
keep_cols = coverage[coverage >= min_coverage].index.tolist() |
|
|
numeric_df = numeric_df.loc[:, keep_cols] |
|
|
if numeric_df.empty: |
|
|
return numeric_df, {} |
|
|
|
|
|
medians = numeric_df.median() |
|
|
filled = numeric_df.fillna(medians) |
|
|
filtered = _select_top_variance(filled, top_n) |
|
|
medians = medians.loc[filtered.columns] |
|
|
coverage = coverage.loc[filtered.columns] |
|
|
variances = filtered.var(axis=0, skipna=True) |
|
|
|
|
|
rename_map: Dict[str, str] = {} |
|
|
stats: Dict[str, Dict[str, float | str]] = {} |
|
|
for original in filtered.columns: |
|
|
base = _safe_label(original) |
|
|
unique = _unique_name(f"{prefix}_{base}", name_registry) |
|
|
state_name = f"state_{unique}" |
|
|
rename_map[original] = state_name |
|
|
stats[state_name] = { |
|
|
"original_name": original, |
|
|
"modality": modality_name, |
|
|
"coverage": float(coverage.get(original, float("nan"))), |
|
|
"median": float(medians.get(original, float("nan"))), |
|
|
"variance": float(variances.get(original, float("nan"))), |
|
|
} |
|
|
|
|
|
prepared = filtered.rename(columns=rename_map) |
|
|
return prepared, stats |
|
|
|
|
|
|
|
|
def _combine_modalities(frames: Iterable[pd.DataFrame]) -> pd.DataFrame: |
|
|
indices = pd.Index([]) |
|
|
dataframes = [df for df in frames if not df.empty] |
|
|
for df in dataframes: |
|
|
indices = indices.union(df.index) |
|
|
combined = pd.DataFrame(index=indices.sort_values()) |
|
|
for df in dataframes: |
|
|
combined = combined.join(df, how="left") |
|
|
return combined |
|
|
|
|
|
|
|
|
def _normalize_features( |
|
|
df: pd.DataFrame, |
|
|
apply_minmax: bool, |
|
|
) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: |
|
|
stats: Dict[str, Dict[str, float]] = {} |
|
|
if df.empty: |
|
|
return df, stats |
|
|
|
|
|
if apply_minmax: |
|
|
mins = df.min(axis=0) |
|
|
maxs = df.max(axis=0) |
|
|
denom = (maxs - mins).replace(0, np.nan) |
|
|
scaled = (df - mins) / denom |
|
|
scaled = scaled.clip(0.0, 1.0) |
|
|
scaled = scaled.fillna(0.5) |
|
|
for col in scaled.columns: |
|
|
stats[col] = { |
|
|
"min": float(mins.get(col, float("nan"))), |
|
|
"max": float(maxs.get(col, float("nan"))), |
|
|
} |
|
|
return scaled, stats |
|
|
|
|
|
filled = df.copy() |
|
|
medians = filled.median(axis=0) |
|
|
filled = filled.fillna(medians) |
|
|
for col in filled.columns: |
|
|
stats[col] = { |
|
|
"mean": float(filled[col].mean()), |
|
|
"std": float(filled[col].std(ddof=0)), |
|
|
} |
|
|
return filled, stats |
|
|
|
|
|
|
|
|
def _derive_actions( |
|
|
risk: pd.Series, |
|
|
desired_actions: int, |
|
|
) -> Tuple[pd.Series, List[str]]: |
|
|
if risk.empty: |
|
|
return pd.Series(dtype=int), ["Undefined"] |
|
|
|
|
|
unique_values = risk.nunique(dropna=True) |
|
|
n_actions = max(1, min(desired_actions, unique_values)) |
|
|
|
|
|
if n_actions <= 1: |
|
|
actions = pd.Series(0, index=risk.index, dtype=int) |
|
|
return actions, ["Single bucket"] |
|
|
|
|
|
quantiles = pd.qcut( |
|
|
risk.rank(method="first"), |
|
|
q=n_actions, |
|
|
labels=False, |
|
|
duplicates="drop", |
|
|
).astype(int) |
|
|
action_names = [f"Risk quantile {i + 1}/{len(quantiles.unique())}" for i in range(len(quantiles.unique()))] |
|
|
return quantiles, action_names |
|
|
|
|
|
|
|
|
def _yaml_quote(value: str) -> str: |
|
|
if value == "": |
|
|
return "''" |
|
|
if any(ch in value for ch in ":{}[]&,#|>!-?%*@\"'\t\n\r "): |
|
|
return json.dumps(value, ensure_ascii=False) |
|
|
return value |
|
|
|
|
|
|
|
|
def build_dataset(args: argparse.Namespace) -> None: |
|
|
cptac_module, no_internet_error, datasource_error, missing_file_error = _import_cptac() |
|
|
dataset = _ensure_dataset(cptac_module, no_internet_error, datasource_error, missing_file_error) |
|
|
|
|
|
np.random.seed(args.seed) |
|
|
|
|
|
proteomics = dataset.get_proteomics() |
|
|
transcriptomics = dataset.get_transcriptomics() |
|
|
clinical = dataset.get_clinical() |
|
|
|
|
|
name_registry: set[str] = set() |
|
|
prot_df, prot_stats = _prepare_modality( |
|
|
proteomics, |
|
|
"proteomics", |
|
|
"prot", |
|
|
args.max_proteins, |
|
|
args.min_coverage, |
|
|
name_registry, |
|
|
) |
|
|
rna_df, rna_stats = _prepare_modality( |
|
|
transcriptomics, |
|
|
"transcriptomics", |
|
|
"rna", |
|
|
args.max_transcripts, |
|
|
args.min_coverage, |
|
|
name_registry, |
|
|
) |
|
|
clin_df, clin_stats = _prepare_modality( |
|
|
clinical, |
|
|
"clinical", |
|
|
"clin", |
|
|
args.max_clinical, |
|
|
max(0.35, args.min_coverage * 0.5), |
|
|
name_registry, |
|
|
) |
|
|
|
|
|
combined = _combine_modalities([prot_df, rna_df, clin_df]) |
|
|
if combined.empty: |
|
|
print( |
|
|
"[ERROR] No features satisfied the filtering criteria. Consider lowering --min-coverage or increasing --max-* limits.", |
|
|
file=sys.stderr, |
|
|
) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
medians = combined.median(axis=0) |
|
|
combined = combined.fillna(medians) |
|
|
|
|
|
normalized, scaling_stats = _normalize_features(combined, not args.no_normalize) |
|
|
state_columns = list(normalized.columns) |
|
|
if not state_columns: |
|
|
print("[ERROR] Normalized feature matrix is empty after preprocessing.", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
risk_score = normalized[state_columns].mean(axis=1) |
|
|
actions, action_names = _derive_actions(risk_score, args.actions) |
|
|
rewards = (0.5 - risk_score) * 2.0 |
|
|
rewards = rewards.clip(-1.0, 1.0) |
|
|
|
|
|
output_dir = Path(args.output_dir).expanduser().resolve() |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
dataset_df = pd.DataFrame( |
|
|
{ |
|
|
"patient_id": normalized.index.astype(str), |
|
|
"timestep": 0, |
|
|
"action": actions.reindex(normalized.index).fillna(0).astype(int), |
|
|
"reward": rewards.reindex(normalized.index).astype(float), |
|
|
"terminal": True, |
|
|
} |
|
|
) |
|
|
dataset_df = dataset_df.join(normalized[state_columns]) |
|
|
|
|
|
csv_path = output_dir / "ovarian_offline_dataset.csv" |
|
|
dataset_df.to_csv(csv_path, index=False) |
|
|
|
|
|
npz_path = output_dir / "ovarian_offline_dataset.npz" |
|
|
np.savez( |
|
|
npz_path, |
|
|
states=normalized[state_columns].to_numpy(dtype=np.float32), |
|
|
actions=dataset_df["action"].to_numpy(dtype=np.int64), |
|
|
rewards=dataset_df["reward"].to_numpy(dtype=np.float32), |
|
|
terminals=np.ones(len(dataset_df), dtype=np.bool_), |
|
|
patient_ids=dataset_df["patient_id"].to_numpy(dtype="U"), |
|
|
timesteps=dataset_df["timestep"].to_numpy(dtype=np.int64), |
|
|
state_columns=np.array(state_columns), |
|
|
action_names=np.array(action_names, dtype="U"), |
|
|
risk_scores=risk_score.reindex(normalized.index).to_numpy(dtype=np.float32), |
|
|
) |
|
|
|
|
|
feature_catalog = {**prot_stats, **rna_stats, **clin_stats} |
|
|
for column, stats in feature_catalog.items(): |
|
|
stats.update(scaling_stats.get(column, {})) |
|
|
|
|
|
feature_labels = { |
|
|
column: f"{stats['modality']}::{stats['original_name']}" |
|
|
for column, stats in feature_catalog.items() |
|
|
} |
|
|
|
|
|
schema_lines = [ |
|
|
"data_type: tabular", |
|
|
"mapping:", |
|
|
" trajectory_id: patient_id", |
|
|
" timestep: timestep", |
|
|
" action: action", |
|
|
" reward: reward", |
|
|
" terminal: terminal", |
|
|
" feature_cols:", |
|
|
] |
|
|
for col in state_columns: |
|
|
schema_lines.append(f" - {col}") |
|
|
|
|
|
if args.no_normalize: |
|
|
schema_lines.extend([ |
|
|
"normalization:", |
|
|
" method: none", |
|
|
]) |
|
|
else: |
|
|
schema_lines.extend([ |
|
|
"normalization:", |
|
|
" method: minmax", |
|
|
" clip_min: 0.0", |
|
|
" clip_max: 1.0", |
|
|
]) |
|
|
|
|
|
schema_lines.append("action_names:") |
|
|
for name in action_names: |
|
|
schema_lines.append(f" - {_yaml_quote(name)}") |
|
|
|
|
|
schema_lines.append("feature_names:") |
|
|
for col in state_columns: |
|
|
schema_lines.append(f" - {_yaml_quote(feature_labels.get(col, col))}") |
|
|
|
|
|
schema_lines.extend( |
|
|
[ |
|
|
"reward_spec:", |
|
|
" expression: normalized_risk_score", |
|
|
" window_agg: last", |
|
|
] |
|
|
) |
|
|
|
|
|
schema_path = output_dir / "ovarian_offline_schema.yaml" |
|
|
schema_path.write_text("\n".join(schema_lines) + "\n", encoding="utf-8") |
|
|
|
|
|
metadata = { |
|
|
"dataset": "CPTAC Ovarian (proteomics + transcriptomics + clinical)", |
|
|
"patients": int(dataset_df["patient_id"].nunique()), |
|
|
"records": int(len(dataset_df)), |
|
|
"features": len(state_columns), |
|
|
"actions": { |
|
|
"count": len(action_names), |
|
|
"names": action_names, |
|
|
"definition": "Risk quantiles derived from the normalized multi-modal feature mean", |
|
|
}, |
|
|
"reward": { |
|
|
"range": [-1.0, 1.0], |
|
|
"definition": "Centered risk score (higher is better) from combined normalized features", |
|
|
}, |
|
|
"normalization": "minmax" if not args.no_normalize else "none", |
|
|
"feature_catalog": feature_catalog, |
|
|
"files": { |
|
|
"csv": csv_path.name, |
|
|
"npz": npz_path.name, |
|
|
"schema": schema_path.name, |
|
|
}, |
|
|
} |
|
|
|
|
|
metadata_path = output_dir / "ovarian_offline_metadata.json" |
|
|
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
|
|
|
|
|
print("✔ Dataset prepared") |
|
|
print(f" • CSV table : {csv_path}") |
|
|
print(f" • Numpy archive : {npz_path}") |
|
|
print(f" • Schema YAML : {schema_path}") |
|
|
print(f" • Metadata JSON : {metadata_path}") |
|
|
|
|
|
|
|
|
def parse_args(argv: List[str] | None = None) -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Download and format the CPTAC ovarian cohort for RLDT offline workflows", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output-dir", |
|
|
default="RL0910/data/cptac_ovarian", |
|
|
help="Directory to store the generated dataset files (default: %(default)s)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-proteins", |
|
|
type=int, |
|
|
default=75, |
|
|
help="Maximum number of proteomic features to retain (variance-ranked).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-transcripts", |
|
|
type=int, |
|
|
default=75, |
|
|
help="Maximum number of transcriptomic features to retain (variance-ranked).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-clinical", |
|
|
type=int, |
|
|
default=40, |
|
|
help="Maximum number of engineered clinical features to retain.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--min-coverage", |
|
|
type=float, |
|
|
default=0.65, |
|
|
help="Minimum fraction of patients required for a feature to be kept (0-1).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--actions", |
|
|
type=int, |
|
|
default=5, |
|
|
help="Number of action buckets derived from the risk score quantiles.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--no-normalize", |
|
|
action="store_true", |
|
|
help="Disable min-max normalization (features remain on their native scale).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--seed", |
|
|
type=int, |
|
|
default=42, |
|
|
help="Random seed used when deriving surrogate actions.", |
|
|
) |
|
|
|
|
|
args = parser.parse_args(argv) |
|
|
if not 0 < args.min_coverage <= 1: |
|
|
parser.error("--min-coverage must be within (0, 1].") |
|
|
if args.actions <= 0: |
|
|
parser.error("--actions must be a positive integer.") |
|
|
return args |
|
|
|
|
|
|
|
|
def main(argv: List[str] | None = None) -> int: |
|
|
args = parse_args(argv) |
|
|
build_dataset(args) |
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
raise SystemExit(main()) |
|
|
|