whats2000 commited on
Commit
dddcc0f
·
1 Parent(s): cca1e2a

Initial commit: distributed EDA pipeline, max non-zero reporting, and notebook

Browse files
.gitignore ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ .venv/
2
+ __pycache__/
3
+ *.pyc
4
+ .ipynb_checkpoints/
5
+ output/eda/per_dataset/
6
+ output/eda/*.csv
7
+ output/eda/*.json
8
+ output/eda/*.txt
README.md ADDED
@@ -0,0 +1,125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Distributed EDA for Cell x Gene
2
+
3
+ This folder now includes a memory-safe EDA pipeline for large `.h5ad` files.
4
+
5
+ All commands below assume your current directory is:
6
+
7
+ ```bash
8
+ cd /project/GOV108018/whats2000_work/cell_x_gene_visualization
9
+ ```
10
+
11
+ ## 1) Check resources first
12
+
13
+ ```bash
14
+ uv run python scripts/resource_probe.py
15
+ ```
16
+
17
+ ## 2) Run EDA (single node, both species by default)
18
+
19
+ ```bash
20
+ uv run python scripts/distributed_eda.py \
21
+ --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
22
+ --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
23
+ --output-dir output/eda \
24
+ --workers 32 \
25
+ --chunk-size 8192
26
+ ```
27
+
28
+ Default output is clean `tqdm` progress only. If you want per-file logs:
29
+
30
+ ```bash
31
+ uv run python scripts/distributed_eda.py \
32
+ --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
33
+ --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
34
+ --output-dir output/eda \
35
+ --workers 32 \
36
+ --chunk-size 8192 \
37
+ --log-each-dataset
38
+ ```
39
+
40
+ If memory pressure appears, fallback to:
41
+
42
+ ```bash
43
+ uv run python scripts/distributed_eda.py \
44
+ --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
45
+ --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
46
+ --output-dir output/eda \
47
+ --workers 24 \
48
+ --chunk-size 4096
49
+ ```
50
+
51
+ Default input directories in the script are absolute:
52
+ - `/project/GOV108018/cell_x_gene/homo_sapiens/h5ad`
53
+ - `/project/GOV108018/cell_x_gene/mus_musculus/h5ad`
54
+
55
+ ## 3) Run EDA as distributed shards (multiple jobs)
56
+
57
+ Example for 4 shards:
58
+
59
+ ```bash
60
+ # job 0
61
+ uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 0
62
+ # job 1
63
+ uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 1
64
+ # job 2
65
+ uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 2
66
+ # job 3
67
+ uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 3
68
+ ```
69
+
70
+ Then merge:
71
+
72
+ ```bash
73
+ uv run python scripts/merge_eda_shards.py
74
+ ```
75
+
76
+ ## 4) Report only global max non-zero gene count
77
+
78
+ After merge, the script automatically writes:
79
+
80
+ - `output/eda/max_nonzero_gene_count_all_cells.csv`
81
+ - `output/eda/max_nonzero_gene_count_all_cells.json`
82
+
83
+ These files contain the single dataset row with the highest `cell_nnz_max` (max non-zero genes in any one cell).
84
+
85
+ ## 5) Visualization notebook
86
+
87
+ Open and run:
88
+
89
+ - `notebooks/max_nonzero_gene_report.ipynb`
90
+
91
+ Or launch Jupyter with `uv run`:
92
+
93
+ ```bash
94
+ uv run jupyter lab
95
+ ```
96
+
97
+ The notebook:
98
+ - shows the global max row,
99
+ - plots top datasets by `cell_nnz_max`,
100
+ - plots the distribution of `cell_nnz_max`.
101
+
102
+ ## Outputs
103
+
104
+ - Per shard summary CSV:
105
+ - `output/eda/eda_summary_shard_XXX_of_YYY.csv`
106
+ - Per shard failure log:
107
+ - `output/eda/eda_failures_shard_XXX_of_YYY.json`
108
+ - Per dataset JSON details:
109
+ - `output/eda/per_dataset/*.json`
110
+ - Merged summary:
111
+ - `output/eda/eda_summary_all_shards.csv`
112
+ - Global max-only report:
113
+ - `output/eda/max_nonzero_gene_count_all_cells.csv`
114
+ - `output/eda/max_nonzero_gene_count_all_cells.json`
115
+
116
+ ## Notes on large data safety
117
+
118
+ - Uses `anndata.read_h5ad(..., backed="r")` so matrices are not fully loaded.
119
+ - Scans expression matrix in chunks with `chunked_X`.
120
+ - Uses process-level parallelism with configurable worker count.
121
+ - Includes shard mode for cross-job distribution on HPC queues.
122
+ - Shows a simple dataset-level `tqdm` progress bar during processing.
123
+ - Per-dataset JSON now includes explicit schema blocks:
124
+ - `obs_schema` with all obs column names and dtypes
125
+ - `var_schema` with all var column names and dtypes
launch_jupyter.sh ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+ # Launch Jupyter Lab for EDA
3
+
4
+ cd /project/GOV108018/whats2000_work/cell_x_gene_visualization
5
+
6
+ echo "Starting Jupyter Lab..."
7
+ echo "Access at: http://localhost:8888"
8
+ echo ""
9
+
10
+ uv run jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root
notebooks/max_nonzero_gene_report.ipynb ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "cells": [
3
+ {
4
+ "cell_type": "markdown",
5
+ "metadata": {},
6
+ "source": [
7
+ "# Max Non-zero Gene Count Report\n",
8
+ "\n",
9
+ "This notebook reports only the global maximum non-zero gene count across all cells from all processed datasets."
10
+ ]
11
+ },
12
+ {
13
+ "cell_type": "code",
14
+ "execution_count": null,
15
+ "metadata": {},
16
+ "outputs": [],
17
+ "source": [
18
+ "from pathlib import Path\n",
19
+ "import pandas as pd\n",
20
+ "import plotly.express as px\n",
21
+ "\n",
22
+ "summary_path = Path('../output/eda/eda_summary_all_shards.csv')\n",
23
+ "if not summary_path.exists():\n",
24
+ " raise FileNotFoundError(f'Missing merged summary: {summary_path}')\n",
25
+ "\n",
26
+ "df = pd.read_csv(summary_path)\n",
27
+ "if 'cell_nnz_max' not in df.columns:\n",
28
+ " raise KeyError(\"Column 'cell_nnz_max' not found. Run distributed_eda.py + merge_eda_shards.py first.\")\n",
29
+ "\n",
30
+ "df['cell_nnz_max'] = pd.to_numeric(df['cell_nnz_max'], errors='coerce')\n",
31
+ "df = df.dropna(subset=['cell_nnz_max']).copy()\n",
32
+ "df.shape"
33
+ ]
34
+ },
35
+ {
36
+ "cell_type": "code",
37
+ "execution_count": null,
38
+ "metadata": {},
39
+ "outputs": [],
40
+ "source": [
41
+ "max_row = df.loc[df['cell_nnz_max'].idxmax()].copy()\n",
42
+ "report_cols = [c for c in ['dataset_file', 'dataset_path', 'cell_nnz_max', 'n_obs', 'n_vars', 'file_size_gib'] if c in df.columns]\n",
43
+ "max_report = max_row[report_cols].to_frame().T\n",
44
+ "max_report"
45
+ ]
46
+ },
47
+ {
48
+ "cell_type": "code",
49
+ "execution_count": null,
50
+ "metadata": {},
51
+ "outputs": [],
52
+ "source": [
53
+ "top_n = 20\n",
54
+ "plot_df = df.nlargest(top_n, 'cell_nnz_max')[['dataset_file', 'cell_nnz_max']].copy()\n",
55
+ "plot_df = plot_df.sort_values('cell_nnz_max', ascending=True)\n",
56
+ "\n",
57
+ "fig = px.bar(\n",
58
+ " plot_df,\n",
59
+ " x='cell_nnz_max',\n",
60
+ " y='dataset_file',\n",
61
+ " orientation='h',\n",
62
+ " title=f'Top {top_n} Datasets by Max Non-zero Gene Count per Cell',\n",
63
+ " labels={'cell_nnz_max': 'Max non-zero genes in one cell', 'dataset_file': 'Dataset'}\n",
64
+ ")\n",
65
+ "fig.update_layout(height=700)\n",
66
+ "fig.show()"
67
+ ]
68
+ },
69
+ {
70
+ "cell_type": "code",
71
+ "execution_count": null,
72
+ "metadata": {},
73
+ "outputs": [],
74
+ "source": [
75
+ "fig2 = px.histogram(\n",
76
+ " df,\n",
77
+ " x='cell_nnz_max',\n",
78
+ " nbins=50,\n",
79
+ " title='Distribution of Dataset-level Max Non-zero Gene Count per Cell',\n",
80
+ " labels={'cell_nnz_max': 'Max non-zero genes in one cell'}\n",
81
+ ")\n",
82
+ "fig2.show()"
83
+ ]
84
+ },
85
+ {
86
+ "cell_type": "code",
87
+ "execution_count": null,
88
+ "metadata": {},
89
+ "outputs": [],
90
+ "source": [
91
+ "out_dir = Path('../output/eda')\n",
92
+ "out_dir.mkdir(parents=True, exist_ok=True)\n",
93
+ "max_report.to_csv(out_dir / 'max_nonzero_gene_count_all_cells_from_notebook.csv', index=False)\n",
94
+ "max_report"
95
+ ]
96
+ }
97
+ ],
98
+ "metadata": {
99
+ "kernelspec": {
100
+ "display_name": "Python 3",
101
+ "language": "python",
102
+ "name": "python3"
103
+ },
104
+ "language_info": {
105
+ "name": "python",
106
+ "version": "3.10"
107
+ }
108
+ },
109
+ "nbformat": 4,
110
+ "nbformat_minor": 5
111
+ }
pyproject.toml ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [project]
2
+ name = "cell-x-gene-visualization"
3
+ version = "0.1.0"
4
+ description = "Visualization for Cell x Gene dataset"
5
+ requires-python = ">=3.10"
6
+ dependencies = [
7
+ "scanpy>=1.10.0",
8
+ "anndata>=0.10.0",
9
+ "pandas>=2.0.0",
10
+ "matplotlib>=3.8.0",
11
+ "seaborn>=0.13.0",
12
+ "numpy>=1.24.0",
13
+ "jupyter>=1.0.0",
14
+ "ipykernel>=6.25.0",
15
+ "plotly>=6.5.2",
16
+ "kaleido>=1.2.0",
17
+ "numba>=0.58.0",
18
+ "tqdm>=4.66.0",
19
+ "joblib>=1.3.0",
20
+ ]
scripts/distributed_eda.py ADDED
@@ -0,0 +1,377 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Distributed and memory-safe EDA for large Cell x Gene .h5ad datasets."""
3
+
4
+ from __future__ import annotations
5
+
6
+ import argparse
7
+ import concurrent.futures
8
+ import hashlib
9
+ import json
10
+ import math
11
+ import os
12
+ import time
13
+ from dataclasses import dataclass
14
+ from pathlib import Path
15
+ from typing import Iterable
16
+
17
+ import anndata as ad
18
+ import numpy as np
19
+ import pandas as pd
20
+ from scipy import sparse
21
+ from tqdm import tqdm
22
+
23
+
24
+ @dataclass
25
+ class RunningStats:
26
+ count: int = 0
27
+ sum_value: float = 0.0
28
+ sum_sq: float = 0.0
29
+ min_value: float = math.inf
30
+ max_value: float = -math.inf
31
+
32
+ def update(self, values: np.ndarray) -> None:
33
+ arr = np.asarray(values, dtype=np.float64)
34
+ if arr.size == 0:
35
+ return
36
+ self.count += int(arr.size)
37
+ self.sum_value += float(arr.sum())
38
+ self.sum_sq += float(np.square(arr).sum())
39
+ self.min_value = min(self.min_value, float(arr.min()))
40
+ self.max_value = max(self.max_value, float(arr.max()))
41
+
42
+ def finalize(self) -> dict[str, float | int | None]:
43
+ if self.count == 0:
44
+ return {"count": 0, "mean": None, "std": None, "min": None, "max": None}
45
+ mean = self.sum_value / self.count
46
+ var = max(0.0, self.sum_sq / self.count - mean * mean)
47
+ return {
48
+ "count": self.count,
49
+ "mean": mean,
50
+ "std": math.sqrt(var),
51
+ "min": self.min_value,
52
+ "max": self.max_value,
53
+ }
54
+
55
+
56
+ class ReservoirSampler:
57
+ def __init__(self, k: int, seed: int = 42) -> None:
58
+ self.k = k
59
+ self.values = np.empty((k,), dtype=np.float64)
60
+ self.filled = 0
61
+ self.seen = 0
62
+ self.rng = np.random.default_rng(seed)
63
+
64
+ def update(self, arr: np.ndarray) -> None:
65
+ vals = np.asarray(arr, dtype=np.float64).ravel()
66
+ for value in vals:
67
+ self.seen += 1
68
+ if self.filled < self.k:
69
+ self.values[self.filled] = value
70
+ self.filled += 1
71
+ else:
72
+ j = int(self.rng.integers(0, self.seen))
73
+ if j < self.k:
74
+ self.values[j] = value
75
+
76
+ def quantiles(self, q: Iterable[float]) -> dict[str, float | None]:
77
+ if self.filled == 0:
78
+ return {f"q{int(x * 100)}": None for x in q}
79
+ sample = self.values[: self.filled]
80
+ out = np.quantile(sample, list(q))
81
+ return {f"q{int(k * 100)}": float(v) for k, v in zip(q, out)}
82
+
83
+
84
+ def safe_name(path: Path) -> str:
85
+ digest = hashlib.md5(str(path).encode("utf-8"), usedforsecurity=False).hexdigest()[:10]
86
+ stem = path.stem.replace(" ", "_")
87
+ if len(stem) > 80:
88
+ stem = stem[:80]
89
+ return f"{stem}_{digest}"
90
+
91
+
92
+ def auto_workers(mem_per_worker_gib: float) -> int:
93
+ cpu = os.cpu_count() or 1
94
+ mem_available_gib = 0.0
95
+ meminfo = Path("/proc/meminfo")
96
+ if meminfo.exists():
97
+ for line in meminfo.read_text().splitlines():
98
+ if line.startswith("MemAvailable:"):
99
+ kb = int(line.split()[1])
100
+ mem_available_gib = kb / (1024 * 1024)
101
+ break
102
+ # Fast profile for HPC nodes: higher core utilization.
103
+ by_cpu = max(1, int(cpu * 0.75))
104
+ by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib)))
105
+ return max(1, min(by_cpu, by_mem))
106
+
107
+
108
+ def summarize_metadata(df: pd.DataFrame, max_cols: int, max_categories: int) -> dict[str, dict]:
109
+ if df.empty:
110
+ return {}
111
+
112
+ preferred = ["cell_type", "assay", "tissue", "disease", "sex", "donor_id"]
113
+ selected: list[str] = [c for c in preferred if c in df.columns]
114
+ for col in df.columns:
115
+ if col not in selected:
116
+ selected.append(col)
117
+ if len(selected) >= max_cols:
118
+ break
119
+
120
+ out: dict[str, dict] = {}
121
+ n_rows = max(1, len(df))
122
+ for col in selected:
123
+ s = df[col]
124
+ summary = {
125
+ "dtype": str(s.dtype),
126
+ "missing_fraction": float(s.isna().sum()) / n_rows,
127
+ }
128
+ if isinstance(s.dtype, pd.CategoricalDtype):
129
+ summary["n_unique"] = int(len(s.cat.categories))
130
+ vc = s.value_counts(dropna=False).head(max_categories)
131
+ summary["top_values"] = {str(k): int(v) for k, v in vc.items()}
132
+ elif pd.api.types.is_string_dtype(s.dtype) or s.dtype == object:
133
+ sample = s.dropna().astype(str).head(200_000)
134
+ summary["n_unique_sample"] = int(sample.nunique())
135
+ vc = sample.value_counts(dropna=False).head(max_categories)
136
+ summary["top_values_sample"] = {str(k): int(v) for k, v in vc.items()}
137
+ out[col] = summary
138
+ return out
139
+
140
+
141
+ def extract_schema(df: pd.DataFrame) -> dict[str, object]:
142
+ return {
143
+ "n_columns": int(len(df.columns)),
144
+ "columns": [str(c) for c in df.columns],
145
+ "dtypes": {str(c): str(df[c].dtype) for c in df.columns},
146
+ }
147
+
148
+
149
+ def process_dataset(path: Path, chunk_size: int, max_meta_cols: int, max_categories: int) -> dict:
150
+ t0 = time.time()
151
+ row: dict[str, object] = {
152
+ "dataset_path": str(path),
153
+ "dataset_file": path.name,
154
+ "file_size_gib": round(path.stat().st_size / (1024**3), 4),
155
+ }
156
+
157
+ adata = ad.read_h5ad(path, backed="r")
158
+ try:
159
+ n_obs = int(adata.n_obs)
160
+ n_vars = int(adata.n_vars)
161
+ total_entries = n_obs * n_vars
162
+
163
+ row.update(
164
+ {
165
+ "n_obs": n_obs,
166
+ "n_vars": n_vars,
167
+ "obs_columns": int(len(adata.obs.columns)),
168
+ "var_columns": int(len(adata.var.columns)),
169
+ "layers_count": int(len(adata.layers.keys())),
170
+ "obsm_count": int(len(adata.obsm.keys())),
171
+ "varm_count": int(len(adata.varm.keys())),
172
+ }
173
+ )
174
+ row["obs_schema"] = extract_schema(adata.obs)
175
+ row["var_schema"] = extract_schema(adata.var)
176
+
177
+ nnz_total = 0
178
+ x_sum = 0.0
179
+ x_sum_sq = 0.0
180
+ cell_sum_stats = RunningStats()
181
+ cell_nnz_stats = RunningStats()
182
+ cell_sum_sample = ReservoirSampler(k=200_000, seed=17)
183
+ cell_nnz_sample = ReservoirSampler(k=200_000, seed=23)
184
+
185
+ for chunk, start, end in adata.chunked_X(chunk_size):
186
+ if sparse.issparse(chunk):
187
+ nnz = int(chunk.nnz)
188
+ csr = chunk if sparse.isspmatrix_csr(chunk) else chunk.tocsr(copy=False)
189
+ data = csr.data.astype(np.float64, copy=False)
190
+ part_sum = float(data.sum())
191
+ part_sum_sq = float(np.square(data).sum())
192
+ row_sums = np.asarray(csr.sum(axis=1)).ravel()
193
+ row_nnz = np.diff(csr.indptr)
194
+ else:
195
+ arr = np.asarray(chunk)
196
+ arr64 = arr.astype(np.float64, copy=False)
197
+ nnz = int(np.count_nonzero(arr64))
198
+ part_sum = float(arr64.sum())
199
+ part_sum_sq = float(np.square(arr64).sum())
200
+ row_sums = np.sum(arr64, axis=1)
201
+ row_nnz = np.count_nonzero(arr64, axis=1)
202
+
203
+ nnz_total += nnz
204
+ x_sum += part_sum
205
+ x_sum_sq += part_sum_sq
206
+
207
+ cell_sum_stats.update(row_sums)
208
+ cell_nnz_stats.update(row_nnz)
209
+ cell_sum_sample.update(row_sums)
210
+ cell_nnz_sample.update(row_nnz)
211
+
212
+ row["nnz"] = int(nnz_total)
213
+ row["sparsity"] = float(1.0 - (nnz_total / total_entries)) if total_entries else None
214
+ row["x_mean"] = float(x_sum / total_entries) if total_entries else None
215
+ if total_entries:
216
+ var = max(0.0, x_sum_sq / total_entries - (x_sum / total_entries) ** 2)
217
+ row["x_std"] = float(math.sqrt(var))
218
+ else:
219
+ row["x_std"] = None
220
+
221
+ cell_sum_quantiles = cell_sum_sample.quantiles([0.05, 0.5, 0.95])
222
+ cell_nnz_quantiles = cell_nnz_sample.quantiles([0.05, 0.5, 0.95])
223
+ for key, value in cell_sum_stats.finalize().items():
224
+ row[f"cell_sum_{key}"] = value
225
+ for key, value in cell_nnz_stats.finalize().items():
226
+ row[f"cell_nnz_{key}"] = value
227
+ for key, value in cell_sum_quantiles.items():
228
+ row[f"cell_sum_{key}_approx"] = value
229
+ for key, value in cell_nnz_quantiles.items():
230
+ row[f"cell_nnz_{key}_approx"] = value
231
+
232
+ row["metadata_obs_summary"] = summarize_metadata(
233
+ adata.obs, max_cols=max_meta_cols, max_categories=max_categories
234
+ )
235
+ row["metadata_var_summary"] = summarize_metadata(
236
+ adata.var, max_cols=max_meta_cols, max_categories=max_categories
237
+ )
238
+
239
+ row["status"] = "ok"
240
+ finally:
241
+ adata.file.close()
242
+
243
+ row["elapsed_sec"] = round(time.time() - t0, 2)
244
+ return row
245
+
246
+
247
+ def discover_h5ad(input_dirs: list[Path]) -> list[Path]:
248
+ files: list[Path] = []
249
+ for root in input_dirs:
250
+ if root.exists():
251
+ files.extend(sorted(root.rglob("*.h5ad")))
252
+ files = sorted(set(files))
253
+ return files
254
+
255
+
256
+ def main() -> None:
257
+ parser = argparse.ArgumentParser(description=__doc__)
258
+ parser.add_argument(
259
+ "--input-dir",
260
+ action="append",
261
+ default=[],
262
+ help="Input folder(s) containing .h5ad files. Can be repeated.",
263
+ )
264
+ parser.add_argument(
265
+ "--output-dir",
266
+ type=Path,
267
+ default=Path("whats2000_work/cell_x_gene_visualization/output/eda"),
268
+ )
269
+ parser.add_argument("--workers", type=int, default=0, help="0 means auto.")
270
+ parser.add_argument("--chunk-size", type=int, default=4096)
271
+ parser.add_argument("--mem-per-worker-gib", type=float, default=8.0)
272
+ parser.add_argument("--num-shards", type=int, default=1)
273
+ parser.add_argument("--shard-index", type=int, default=0)
274
+ parser.add_argument("--max-meta-cols", type=int, default=20)
275
+ parser.add_argument("--max-categories", type=int, default=8)
276
+ parser.add_argument(
277
+ "--log-each-dataset",
278
+ action="store_true",
279
+ help="Print per-dataset success logs. Default is off for clean tqdm output.",
280
+ )
281
+ args = parser.parse_args()
282
+
283
+ if not args.input_dir:
284
+ args.input_dir = [
285
+ "/project/GOV108018/cell_x_gene/homo_sapiens/h5ad",
286
+ "/project/GOV108018/cell_x_gene/mus_musculus/h5ad",
287
+ ]
288
+
289
+ roots = [Path(p) for p in args.input_dir]
290
+ all_files = discover_h5ad(roots)
291
+ if not all_files:
292
+ raise SystemExit("No .h5ad files found in input directories.")
293
+
294
+ if args.num_shards < 1:
295
+ raise SystemExit("--num-shards must be >= 1")
296
+ if args.shard_index < 0 or args.shard_index >= args.num_shards:
297
+ raise SystemExit("--shard-index must satisfy 0 <= shard-index < num-shards")
298
+
299
+ shard_files = [p for i, p in enumerate(all_files) if i % args.num_shards == args.shard_index]
300
+ if not shard_files:
301
+ raise SystemExit("No files assigned to this shard.")
302
+
303
+ workers = args.workers if args.workers > 0 else auto_workers(args.mem_per_worker_gib)
304
+ workers = min(workers, len(shard_files))
305
+
306
+ args.output_dir.mkdir(parents=True, exist_ok=True)
307
+ per_dataset_dir = args.output_dir / "per_dataset"
308
+ per_dataset_dir.mkdir(parents=True, exist_ok=True)
309
+
310
+ manifest_path = args.output_dir / f"manifest_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.txt"
311
+ manifest_path.write_text("\n".join(str(x) for x in shard_files) + "\n")
312
+
313
+ summary_rows: list[dict] = []
314
+ failures: list[dict] = []
315
+
316
+ print(
317
+ json.dumps(
318
+ {
319
+ "total_files": len(all_files),
320
+ "files_in_shard": len(shard_files),
321
+ "workers": workers,
322
+ "chunk_size": args.chunk_size,
323
+ "num_shards": args.num_shards,
324
+ "shard_index": args.shard_index,
325
+ }
326
+ )
327
+ )
328
+
329
+ with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as ex:
330
+ futures = {
331
+ ex.submit(
332
+ process_dataset,
333
+ path,
334
+ args.chunk_size,
335
+ args.max_meta_cols,
336
+ args.max_categories,
337
+ ): path
338
+ for path in shard_files
339
+ }
340
+ with tqdm(total=len(futures), desc="Datasets", unit="dataset") as pbar:
341
+ for fut in concurrent.futures.as_completed(futures):
342
+ path = futures[fut]
343
+ try:
344
+ row = fut.result()
345
+ summary_rows.append(row)
346
+ payload_name = safe_name(path) + ".json"
347
+ (per_dataset_dir / payload_name).write_text(json.dumps(row, indent=2))
348
+ if args.log_each_dataset:
349
+ tqdm.write(f"[ok] {path.name} ({row.get('elapsed_sec', 'na')}s)")
350
+ except Exception as exc: # noqa: BLE001
351
+ msg = {"dataset_path": str(path), "error": repr(exc), "status": "failed"}
352
+ failures.append(msg)
353
+ tqdm.write(f"[failed] {path.name}: {exc}")
354
+ finally:
355
+ pbar.update(1)
356
+
357
+ summary_df = pd.DataFrame(summary_rows)
358
+ summary_csv = args.output_dir / f"eda_summary_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.csv"
359
+ summary_df.to_csv(summary_csv, index=False)
360
+
361
+ failures_path = args.output_dir / f"eda_failures_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.json"
362
+ failures_path.write_text(json.dumps(failures, indent=2))
363
+
364
+ print(
365
+ json.dumps(
366
+ {
367
+ "summary_csv": str(summary_csv),
368
+ "failures_json": str(failures_path),
369
+ "ok_count": len(summary_rows),
370
+ "failed_count": len(failures),
371
+ }
372
+ )
373
+ )
374
+
375
+
376
+ if __name__ == "__main__":
377
+ main()
scripts/merge_eda_shards.py ADDED
@@ -0,0 +1,72 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Merge shard-level EDA outputs into one master summary."""
3
+
4
+ from __future__ import annotations
5
+
6
+ import argparse
7
+ import json
8
+ from pathlib import Path
9
+
10
+ import pandas as pd
11
+
12
+
13
+ def main() -> None:
14
+ parser = argparse.ArgumentParser(description=__doc__)
15
+ parser.add_argument(
16
+ "--output-dir",
17
+ type=Path,
18
+ default=Path("whats2000_work/cell_x_gene_visualization/output/eda"),
19
+ )
20
+ args = parser.parse_args()
21
+
22
+ csv_files = sorted(args.output_dir.glob("eda_summary_shard_*_of_*.csv"))
23
+ fail_files = sorted(args.output_dir.glob("eda_failures_shard_*_of_*.json"))
24
+
25
+ if not csv_files:
26
+ raise SystemExit(f"No shard summary files found in {args.output_dir}")
27
+
28
+ merged = pd.concat([pd.read_csv(p) for p in csv_files], ignore_index=True)
29
+ merged = merged.sort_values(by=["dataset_path"]).drop_duplicates(subset=["dataset_path"], keep="first")
30
+
31
+ merged_csv = args.output_dir / "eda_summary_all_shards.csv"
32
+ merged.to_csv(merged_csv, index=False)
33
+
34
+ failures = []
35
+ for p in fail_files:
36
+ failures.extend(json.loads(p.read_text()))
37
+
38
+ dedup_failures = {}
39
+ for item in failures:
40
+ dedup_failures[item["dataset_path"]] = item
41
+ merged_failures = list(dedup_failures.values())
42
+
43
+ merged_failures_path = args.output_dir / "eda_failures_all_shards.json"
44
+ merged_failures_path.write_text(json.dumps(merged_failures, indent=2))
45
+
46
+ max_report_csv = None
47
+ max_report_json = None
48
+ if "cell_nnz_max" in merged.columns and not merged["cell_nnz_max"].dropna().empty:
49
+ max_idx = merged["cell_nnz_max"].astype(float).idxmax()
50
+ max_row = merged.loc[max_idx].to_dict()
51
+ max_report_csv = args.output_dir / "max_nonzero_gene_count_all_cells.csv"
52
+ pd.DataFrame([max_row]).to_csv(max_report_csv, index=False)
53
+ max_report_json = args.output_dir / "max_nonzero_gene_count_all_cells.json"
54
+ max_report_json.write_text(json.dumps(max_row, indent=2, default=str))
55
+
56
+ print(
57
+ json.dumps(
58
+ {
59
+ "merged_csv": str(merged_csv),
60
+ "merged_failures": str(merged_failures_path),
61
+ "max_nonzero_report_csv": str(max_report_csv) if max_report_csv else None,
62
+ "max_nonzero_report_json": str(max_report_json) if max_report_json else None,
63
+ "rows": int(len(merged)),
64
+ "failures": int(len(merged_failures)),
65
+ },
66
+ indent=2,
67
+ )
68
+ )
69
+
70
+
71
+ if __name__ == "__main__":
72
+ main()
scripts/resource_probe.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Probe local HPC resources and suggest safe EDA concurrency settings."""
3
+
4
+ from __future__ import annotations
5
+
6
+ import argparse
7
+ import json
8
+ import os
9
+ import platform
10
+ import shutil
11
+ from pathlib import Path
12
+
13
+
14
+ def _mem_available_gib() -> float:
15
+ meminfo = Path("/proc/meminfo")
16
+ if not meminfo.exists():
17
+ return 0.0
18
+ for line in meminfo.read_text().splitlines():
19
+ if line.startswith("MemAvailable:"):
20
+ kb = int(line.split()[1])
21
+ return kb / (1024 * 1024)
22
+ return 0.0
23
+
24
+
25
+ def _mem_total_gib() -> float:
26
+ meminfo = Path("/proc/meminfo")
27
+ if not meminfo.exists():
28
+ return 0.0
29
+ for line in meminfo.read_text().splitlines():
30
+ if line.startswith("MemTotal:"):
31
+ kb = int(line.split()[1])
32
+ return kb / (1024 * 1024)
33
+ return 0.0
34
+
35
+
36
+ def _recommend_workers(cpu_count: int, mem_available_gib: float, mem_per_worker_gib: float) -> int:
37
+ # Fast profile for HPC: use more cores while still leaving headroom.
38
+ by_cpu = max(1, int(cpu_count * 0.75))
39
+ by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib)))
40
+ return max(1, min(by_cpu, by_mem))
41
+
42
+
43
+ def main() -> None:
44
+ parser = argparse.ArgumentParser(description=__doc__)
45
+ parser.add_argument(
46
+ "--workdir",
47
+ type=Path,
48
+ default=Path("/project/GOV108018"),
49
+ help="Path to check disk usage for.",
50
+ )
51
+ parser.add_argument(
52
+ "--mem-per-worker-gib",
53
+ type=float,
54
+ default=8.0,
55
+ help="Memory budget per EDA worker to compute a safe recommendation.",
56
+ )
57
+ args = parser.parse_args()
58
+
59
+ cpu_count = os.cpu_count() or 1
60
+ mem_total_gib = _mem_total_gib()
61
+ mem_available_gib = _mem_available_gib()
62
+ disk_total, disk_used, disk_free = shutil.disk_usage(args.workdir)
63
+
64
+ recommended_workers = _recommend_workers(
65
+ cpu_count=cpu_count,
66
+ mem_available_gib=mem_available_gib,
67
+ mem_per_worker_gib=args.mem_per_worker_gib,
68
+ )
69
+ recommended_shards = max(1, min(8, cpu_count // max(1, recommended_workers)))
70
+
71
+ report = {
72
+ "hostname": platform.node(),
73
+ "platform": platform.platform(),
74
+ "cpu_count": cpu_count,
75
+ "memory_total_gib": round(mem_total_gib, 2),
76
+ "memory_available_gib": round(mem_available_gib, 2),
77
+ "disk_total_gib": round(disk_total / (1024**3), 2),
78
+ "disk_used_gib": round(disk_used / (1024**3), 2),
79
+ "disk_free_gib": round(disk_free / (1024**3), 2),
80
+ "assumptions": {"mem_per_worker_gib": args.mem_per_worker_gib},
81
+ "recommendation": {
82
+ "workers_per_node": recommended_workers,
83
+ "num_shards_suggestion": recommended_shards,
84
+ "chunk_size_suggestion": 4096,
85
+ },
86
+ }
87
+ print(json.dumps(report, indent=2))
88
+
89
+
90
+ if __name__ == "__main__":
91
+ main()
uv.lock ADDED
The diff for this file is too large to render. See raw diff