whats2000 commited on
Commit
856e1ba
·
1 Parent(s): 790f422

feat(pipeline): add YAML config, metadata-aware scheduling, and dataset slicing

Browse files

- Add YAML-based configuration system with resource-aware settings
- Implement metadata pre-scan for intelligent dataset categorization
- Add automatic dataset slicing for large files (>75B entries)
- Enable parallel processing with priority ordering across all dataset sizes
- Create pipeline launcher scripts for single-command execution
- Update README with comprehensive usage guide and configuration examples
- Remove obsolete code and consolidate to single distributed_eda.py

README.md CHANGED
@@ -1,6 +1,6 @@
1
  # Distributed EDA for Cell x Gene
2
 
3
- This folder now includes a memory-safe EDA pipeline for large `.h5ad` files.
4
 
5
  All commands below assume your current directory is:
6
 
@@ -8,96 +8,174 @@ All commands below assume your current directory is:
8
  cd /project/GOV108018/whats2000_work/cell_x_gene_visualization
9
  ```
10
 
11
- ## 1) Check resources first
 
 
 
 
12
 
13
  ```bash
14
- uv run python scripts/resource_probe.py
15
  ```
16
 
17
- ## 2) Run EDA (single node, both species by default)
18
 
19
  ```bash
20
- uv run python scripts/distributed_eda.py \
21
- --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
22
- --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
23
- --output-dir output/eda \
24
- --workers 32 \
25
- --chunk-size 8192
26
  ```
27
 
28
- Default output is clean `tqdm` progress only. If you want per-file logs:
 
 
29
 
30
  ```bash
31
- uv run python scripts/distributed_eda.py \
32
- --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
33
- --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
34
- --output-dir output/eda \
35
- --workers 32 \
36
- --chunk-size 8192 \
37
- --log-each-dataset
38
  ```
39
 
40
- If memory pressure appears, fallback to:
 
 
 
 
 
 
 
 
 
 
41
 
42
  ```bash
43
- uv run python scripts/distributed_eda.py \
44
- --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad \
45
- --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad \
46
- --output-dir output/eda \
47
- --workers 24 \
48
- --chunk-size 4096
49
  ```
50
 
51
- Default input directories in the script are absolute:
52
- - `/project/GOV108018/cell_x_gene/homo_sapiens/h5ad`
53
- - `/project/GOV108018/cell_x_gene/mus_musculus/h5ad`
54
 
55
- ## 3) Run EDA as distributed shards (multiple jobs)
 
 
 
 
 
 
 
 
 
 
 
56
 
57
- Example for 4 shards:
58
 
59
  ```bash
60
- # job 0
61
- uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 0
62
- # job 1
63
- uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 1
64
- # job 2
65
- uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 2
66
- # job 3
67
- uv run python scripts/distributed_eda.py --input-dir /project/GOV108018/cell_x_gene/homo_sapiens/h5ad --input-dir /project/GOV108018/cell_x_gene/mus_musculus/h5ad --num-shards 4 --shard-index 3
68
  ```
69
 
70
- Then merge:
 
 
71
 
72
  ```bash
73
- uv run python scripts/merge_eda_shards.py
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  ```
75
 
76
- ## 4) Report only global max non-zero gene count
77
 
78
- After merge, the script automatically writes:
 
 
 
 
79
 
80
- - `output/eda/max_nonzero_gene_count_all_cells.csv`
81
- - `output/eda/max_nonzero_gene_count_all_cells.json`
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
- These files contain the single dataset row with the highest `cell_nnz_max` (max non-zero genes in any one cell).
84
 
85
- ## 5) Visualization notebook
86
 
87
- Open and run:
 
 
 
 
 
 
88
 
89
- - `notebooks/max_nonzero_gene_report.ipynb`
90
 
91
- Or launch Jupyter with `uv run`:
92
 
93
- ```bash
94
- uv run jupyter lab
 
 
 
 
95
  ```
96
 
97
- The notebook:
98
- - shows the global max row,
99
- - plots top datasets by `cell_nnz_max`,
100
- - plots the distribution of `cell_nnz_max`.
 
 
 
 
 
 
 
 
 
101
 
102
  ## Outputs
103
 
@@ -107,19 +185,68 @@ The notebook:
107
  - `output/eda/eda_failures_shard_XXX_of_YYY.json`
108
  - Per dataset JSON details:
109
  - `output/eda/per_dataset/*.json`
110
- - Merged summary:
111
  - `output/eda/eda_summary_all_shards.csv`
112
- - Global max-only report:
113
  - `output/eda/max_nonzero_gene_count_all_cells.csv`
114
  - `output/eda/max_nonzero_gene_count_all_cells.json`
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
116
- ## Notes on large data safety
117
 
118
- - Uses `anndata.read_h5ad(..., backed="r")` so matrices are not fully loaded.
119
- - Scans expression matrix in chunks with `chunked_X`.
120
- - Uses process-level parallelism with configurable worker count.
121
- - Includes shard mode for cross-job distribution on HPC queues.
122
- - Shows a simple dataset-level `tqdm` progress bar during processing.
123
- - Per-dataset JSON now includes explicit schema blocks:
124
- - `obs_schema` with all obs column names and dtypes
125
- - `var_schema` with all var column names and dtypes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # Distributed EDA for Cell x Gene
2
 
3
+ This folder includes a metadata-aware EDA pipeline for large `.h5ad` files with YAML-based configuration.
4
 
5
  All commands below assume your current directory is:
6
 
 
8
  cd /project/GOV108018/whats2000_work/cell_x_gene_visualization
9
  ```
10
 
11
+ ## Quick Start
12
+
13
+ ### 1) Configure pipeline
14
+
15
+ Use the optimized config (auto-generated for your system: 394 GB RAM, 56 cores):
16
 
17
  ```bash
18
+ cat configs/eda_optimized.yaml
19
  ```
20
 
21
+ Or create your own based on the template:
22
 
23
  ```bash
24
+ cp configs/eda_config_template.yaml configs/my_config.yaml
25
+ # Edit my_config.yaml with your paths and resource limits
 
 
 
 
26
  ```
27
 
28
+ ### 2) Build metadata cache
29
+
30
+ Pre-scan all datasets to determine sizes and enable intelligent scheduling:
31
 
32
  ```bash
33
+ uv run python scripts/build_metadata_cache.py --config configs/eda_optimized.yaml
 
 
 
 
 
 
34
  ```
35
 
36
+ This creates `output/cache/enhanced_metadata.parquet` with:
37
+ - Dataset dimensions (n_obs × n_vars)
38
+ - File sizes
39
+ - Size categories (small/medium/large/xlarge)
40
+ - Estimated memory requirements
41
+
42
+ Cache is incremental - only new/changed files are rescanned. Use `--force-rescan` to rebuild.
43
+
44
+ ### 3) Run EDA pipeline
45
+
46
+ Single command to run everything:
47
 
48
  ```bash
49
+ uv run python scripts/run_eda_pipeline.py --config configs/eda_optimized.yaml
 
 
 
 
 
50
  ```
51
 
52
+ Or run steps individually:
 
 
53
 
54
+ ```bash
55
+ # Step 1: Build metadata (if not done)
56
+ uv run python scripts/run_eda_pipeline.py --config configs/eda_optimized.yaml --step metadata
57
+
58
+ # Step 2: Run EDA
59
+ uv run python scripts/run_eda_pipeline.py --config configs/eda_optimized.yaml --step eda
60
+
61
+ # Step 3: Merge shards (if using sharding)
62
+ uv run python scripts/run_eda_pipeline.py --config configs/eda_optimized.yaml --step merge
63
+ ```
64
+
65
+ ### 4) Direct script usage
66
 
67
+ For more control:
68
 
69
  ```bash
70
+ # Build metadata cache
71
+ uv run python scripts/build_metadata_cache.py --config configs/eda_optimized.yaml
72
+
73
+ # Run EDA with all workers
74
+ uv run python scripts/distributed_eda.py --config configs/eda_optimized.yaml
75
+
76
+ # Override worker count
77
+ uv run python scripts/distributed_eda.py --config configs/eda_optimized.yaml --force-workers 32
78
  ```
79
 
80
+ ## Distributed Processing (SLURM)
81
+
82
+ For multi-node HPC clusters, use array jobs:
83
 
84
  ```bash
85
+ # Submit 4 parallel jobs
86
+ sbatch --array=0-3 scripts/run_eda_slurm.sh configs/eda_optimized.yaml 4
87
+
88
+ # After all jobs complete, merge results
89
+ uv run python scripts/merge_eda_shards.py --output-dir output/eda
90
+ ```
91
+
92
+ Or configure sharding in YAML:
93
+
94
+ ```yaml
95
+ sharding:
96
+ enabled: true
97
+ num_shards: 4
98
+ shard_index: 0 # Override with --shard-index on command line
99
+ strategy: "size_balanced" # Distribute by size for load balancing
100
  ```
101
 
102
+ Then run each shard:
103
 
104
+ ```bash
105
+ uv run python scripts/distributed_eda.py --config configs/eda_optimized.yaml --num-shards 4 --shard-index 0
106
+ uv run python scripts/distributed_eda.py --config configs/eda_optimized.yaml --num-shards 4 --shard-index 1
107
+ # ... etc
108
+ ```
109
 
110
+ ## Configuration Guide
111
+
112
+ ### Resource Management
113
+
114
+ The pipeline respects your resource limits and adapts processing strategy by dataset size:
115
+
116
+ ```yaml
117
+ resources:
118
+ max_memory_gib: 240 # Total memory available
119
+ max_workers: 42 # Maximum parallel workers
120
+ mem_per_worker_gib: 5.5 # Memory per worker
121
+ chunk_size: 12288 # Matrix chunk size
122
+
123
+ dataset_thresholds:
124
+ small: 2_000_000_000 # < 2B entries
125
+ medium: 15_000_000_000 # < 15B entries
126
+ large: 75_000_000_000 # < 75B entries
127
+ max_entries: 200_000_000_000 # Reject larger datasets
128
+
129
+ strategy:
130
+ small:
131
+ workers_fraction: 1.0 # Use all workers
132
+ chunk_size_multiplier: 1.0
133
+ priority: 1 # Process first
134
+
135
+ large:
136
+ workers_fraction: 0.4 # Fewer workers
137
+ chunk_size_multiplier: 0.6
138
+ priority: 3
139
+ require_slicing: true # Slice into chunks
140
+ ```
141
 
142
+ ### Dataset Slicing
143
 
144
+ Large datasets are automatically sliced to respect memory limits:
145
 
146
+ ```yaml
147
+ slicing:
148
+ enabled: true
149
+ obs_slice_size: 75000 # Process 75k cells at a time
150
+ overlap: 0
151
+ merge_strategy: "combine" # Combine slice statistics
152
+ ```
153
 
154
+ ### Metadata Integration
155
 
156
+ Point to CELLxGENE metadata CSVs for enhanced context:
157
 
158
+ ```yaml
159
+ paths:
160
+ metadata_csvs:
161
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_homo_sapiens.csv
162
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_mus_musculus.csv
163
+ enhanced_metadata_cache: output/cache/enhanced_metadata.parquet
164
  ```
165
 
166
+ The pipeline merges this with quick-scanned dimensions for intelligent scheduling.
167
+
168
+ ## Processing Strategy
169
+
170
+ The pipeline uses **parallel processing with priority ordering**:
171
+
172
+ 1. **Pre-scan phase**: Quick metadata scan (no matrix loading) categorizes datasets by size
173
+ 2. **Parallel execution**: All datasets process in parallel using full worker pool
174
+ 3. **Smart ordering**: Small datasets (priority 1) start first for quick wins
175
+ 4. **Automatic slicing**: Large datasets split into memory-safe chunks
176
+ 5. **Resource-aware**: Strategies adapt chunk sizes based on dataset category
177
+
178
+ This approach fully leverages all available cores throughout the entire pipeline.
179
 
180
  ## Outputs
181
 
 
185
  - `output/eda/eda_failures_shard_XXX_of_YYY.json`
186
  - Per dataset JSON details:
187
  - `output/eda/per_dataset/*.json`
188
+ - Merged summary (after sharding):
189
  - `output/eda/eda_summary_all_shards.csv`
190
+ - Global max report:
191
  - `output/eda/max_nonzero_gene_count_all_cells.csv`
192
  - `output/eda/max_nonzero_gene_count_all_cells.json`
193
+ - Metadata cache:
194
+ - `output/cache/enhanced_metadata.parquet`
195
+
196
+ ## Output Schema
197
+
198
+ Each dataset result includes:
199
+
200
+ - **Dimensions**: n_obs, n_vars, total_entries
201
+ - **Sparsity**: nnz, sparsity
202
+ - **Cell statistics**: cell_sum_*, cell_nnz_* (mean/std/min/max/quantiles)
203
+ - **Matrix statistics**: x_mean, x_std
204
+ - **Metadata summaries**: obs/var column types and top values
205
+ - **Schema**: Complete column names and dtypes
206
+ - **Processing info**: size_category, processing_mode (full/sliced), elapsed_sec
207
+
208
+ ## Visualization
209
+
210
+ Open the notebook:
211
+
212
+ ```bash
213
+ uv run jupyter lab notebooks/max_nonzero_gene_report.ipynb
214
+ ```
215
+
216
+ The notebook provides:
217
+ - Global max non-zero gene count
218
+ - Distribution of cell-level statistics
219
+ - Dataset size analysis
220
+ - Processing time comparisons
221
 
222
+ ## Troubleshooting
223
 
224
+ ### Metadata cache not found
225
+
226
+ ```bash
227
+ # Build it first
228
+ uv run python scripts/build_metadata_cache.py --config configs/eda_optimized.yaml
229
+ ```
230
+
231
+ ### Memory errors
232
+
233
+ Reduce workers and chunk size in config:
234
+
235
+ ```yaml
236
+ resources:
237
+ max_workers: 24
238
+ chunk_size: 4096
239
+ slicing:
240
+ obs_slice_size: 50000
241
+ ```
242
+
243
+ ### Dataset too large
244
+
245
+ Adjust thresholds or enable more aggressive slicing:
246
+
247
+ ```yaml
248
+ dataset_thresholds:
249
+ max_entries: 50_000_000_000 # Lower limit
250
+ slicing:
251
+ obs_slice_size: 30000 # Smaller slices
252
+ ```
configs/eda_config_template.yaml ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # EDA Pipeline Configuration Template
2
+ # This file defines resource limits, dataset filtering, and processing strategies
3
+
4
+ # Resource Limits
5
+ resources:
6
+ max_memory_gib: 256 # Total memory available
7
+ max_workers: 32 # Maximum concurrent workers
8
+ mem_per_worker_gib: 8.0 # Memory per worker process
9
+ chunk_size: 8192 # Chunk size for reading X matrix
10
+
11
+ # Input/Output Paths
12
+ paths:
13
+ input_dirs:
14
+ - /project/GOV108018/cell_x_gene/homo_sapiens/h5ad
15
+ - /project/GOV108018/cell_x_gene/mus_musculus/h5ad
16
+ output_dir: output/eda
17
+ cache_dir: output/cache # Store metadata cache
18
+
19
+ # Dataset metadata for intelligent scheduling
20
+ # These CSVs contain dataset_h5ad_path and dataset_total_cell_count
21
+ metadata_csvs:
22
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_homo_sapiens.csv
23
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_mus_musculus.csv
24
+
25
+ # Enhanced metadata cache (n_obs, n_vars, file_size) built by pre-scan
26
+ enhanced_metadata_cache: output/cache/enhanced_metadata.parquet
27
+
28
+ # Dataset Size Thresholds (based on n_obs * n_vars)
29
+ # Categorize datasets to apply different processing strategies
30
+ dataset_thresholds:
31
+ small: 1_000_000_000 # < 1B entries: process normally
32
+ medium: 10_000_000_000 # < 10B entries: reduce workers
33
+ large: 50_000_000_000 # < 50B entries: slice into chunks
34
+ max_entries: 100_000_000_000 # > 100B entries: skip or special handling
35
+
36
+ # Slicing Strategy for Large Datasets
37
+ slicing:
38
+ enabled: true
39
+ obs_slice_size: 50000 # Process 50k cells at a time for large datasets
40
+ overlap: 0 # No overlap between slices
41
+ merge_strategy: "combine" # How to combine stats from slices
42
+
43
+ # Processing Strategy by Dataset Size
44
+ strategy:
45
+ small:
46
+ workers_fraction: 1.0 # Use full worker pool
47
+ chunk_size_multiplier: 1.0
48
+ priority: 1 # Process first (fastest)
49
+
50
+ medium:
51
+ workers_fraction: 0.5 # Reduce workers to save memory
52
+ chunk_size_multiplier: 0.5
53
+ priority: 2
54
+
55
+ large:
56
+ workers_fraction: 0.25 # Minimal workers, use slicing
57
+ chunk_size_multiplier: 0.25
58
+ priority: 3
59
+ require_slicing: true
60
+
61
+ # Sharding Configuration (for distributed jobs)
62
+ sharding:
63
+ enabled: false
64
+ num_shards: 1
65
+ shard_index: 0
66
+ strategy: "round_robin" # or "size_balanced"
67
+
68
+ # Metadata Extraction Settings
69
+ metadata:
70
+ max_meta_cols: 20
71
+ max_categories: 8
72
+ extract_schemas: true
73
+
74
+ # Behavior Flags
75
+ behavior:
76
+ log_each_dataset: false # Clean tqdm output
77
+ skip_failures: true # Continue on errors
78
+ save_per_dataset_json: true
79
+ pre_scan_enabled: true # Scan metadata before processing
80
+ cache_metadata: true # Cache dataset dimensions
81
+
82
+ # Output Options
83
+ output:
84
+ summary_csv: true
85
+ failures_json: true
86
+ global_max_report: true # Report with max non-zero gene count
87
+ per_dataset_details: true
configs/eda_optimized.yaml ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Optimized EDA Configuration for Current System
2
+ # System specs: 394 GB RAM, 56 cores, ~250 GB available
3
+ # Balanced for maximum speed while respecting resource limits
4
+
5
+ resources:
6
+ max_memory_gib: 240 # Leave ~10 GB buffer for system
7
+ max_workers: 42 # 75% of cores for stability
8
+ mem_per_worker_gib: 5.5 # ~231 GB total worker memory
9
+ chunk_size: 12288 # Good balance for large matrices
10
+
11
+ paths:
12
+ input_dirs:
13
+ - /project/GOV108018/cell_x_gene/homo_sapiens/h5ad
14
+ - /project/GOV108018/cell_x_gene/mus_musculus/h5ad
15
+ output_dir: output/eda
16
+ cache_dir: output/cache
17
+
18
+ # Dataset metadata for intelligent scheduling
19
+ # These CSVs contain dataset_h5ad_path and dataset_total_cell_count
20
+ metadata_csvs:
21
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_homo_sapiens.csv
22
+ - /project/GOV108018/cell_x_gene/metadata/dataset_metadata_mus_musculus.csv
23
+
24
+ # Enhanced metadata cache (n_obs, n_vars, file_size) built by pre-scan
25
+ enhanced_metadata_cache: output/cache/enhanced_metadata.parquet
26
+
27
+ dataset_thresholds:
28
+ small: 2_000_000_000 # < 2B entries: full speed
29
+ medium: 15_000_000_000 # < 15B entries: moderate
30
+ large: 75_000_000_000 # < 75B entries: slice required
31
+ max_entries: 200_000_000_000 # Max 200B entries
32
+
33
+ slicing:
34
+ enabled: true
35
+ obs_slice_size: 75000 # 75k cells per slice for large datasets
36
+ overlap: 0
37
+ merge_strategy: "combine"
38
+
39
+ strategy:
40
+ small:
41
+ workers_fraction: 1.0 # Use all 42 workers
42
+ chunk_size_multiplier: 1.0
43
+ priority: 1
44
+
45
+ medium:
46
+ workers_fraction: 0.7 # ~30 workers
47
+ chunk_size_multiplier: 0.85
48
+ priority: 2
49
+
50
+ large:
51
+ workers_fraction: 0.4 # ~17 workers with slicing
52
+ chunk_size_multiplier: 0.6
53
+ priority: 3
54
+ require_slicing: true
55
+
56
+ sharding:
57
+ enabled: false
58
+ num_shards: 1
59
+ shard_index: 0
60
+ strategy: "size_balanced"
61
+
62
+ metadata:
63
+ max_meta_cols: 20
64
+ max_categories: 8
65
+ extract_schemas: true
66
+
67
+ behavior:
68
+ log_each_dataset: false # Clean tqdm output
69
+ skip_failures: true
70
+ save_per_dataset_json: true
71
+ pre_scan_enabled: true # Pre-scan to categorize by size
72
+ cache_metadata: true
73
+
74
+ output:
75
+ summary_csv: true
76
+ failures_json: true
77
+ global_max_report: true
78
+ per_dataset_details: true
scripts/build_metadata_cache.py ADDED
@@ -0,0 +1,237 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Pre-scan datasets to build enhanced metadata for intelligent job scheduling."""
3
+
4
+ from __future__ import annotations
5
+
6
+ import argparse
7
+ import json
8
+ import time
9
+ from pathlib import Path
10
+ from typing import Any
11
+
12
+ import anndata as ad
13
+ import pandas as pd
14
+ from tqdm import tqdm
15
+
16
+
17
+ def quick_scan_dataset(h5ad_path: Path) -> dict[str, Any]:
18
+ """Quickly extract dimensions and size without loading data matrices."""
19
+ try:
20
+ t0 = time.time()
21
+ file_size_bytes = h5ad_path.stat().st_size
22
+
23
+ # Open in backed mode - only reads metadata, not matrices
24
+ adata = ad.read_h5ad(h5ad_path, backed="r")
25
+ try:
26
+ n_obs = int(adata.n_obs)
27
+ n_vars = int(adata.n_vars)
28
+ total_entries = n_obs * n_vars
29
+
30
+ result = {
31
+ "dataset_path": str(h5ad_path),
32
+ "dataset_file": h5ad_path.name,
33
+ "dataset_id": h5ad_path.stem,
34
+ "n_obs": n_obs,
35
+ "n_vars": n_vars,
36
+ "total_entries": total_entries,
37
+ "file_size_bytes": file_size_bytes,
38
+ "file_size_gib": round(file_size_bytes / (1024**3), 4),
39
+ "obs_columns": len(adata.obs.columns),
40
+ "var_columns": len(adata.var.columns),
41
+ "scan_time_sec": round(time.time() - t0, 3),
42
+ "status": "ok",
43
+ }
44
+ finally:
45
+ adata.file.close()
46
+
47
+ return result
48
+ except Exception as e:
49
+ return {
50
+ "dataset_path": str(h5ad_path),
51
+ "dataset_file": h5ad_path.name,
52
+ "dataset_id": h5ad_path.stem,
53
+ "error": str(e),
54
+ "status": "failed",
55
+ }
56
+
57
+
58
+ def load_cellxgene_metadata(csv_paths: list[Path]) -> pd.DataFrame:
59
+ """Load and combine CELLxGENE metadata CSVs."""
60
+ dfs = []
61
+ for csv_path in csv_paths:
62
+ if csv_path.exists():
63
+ df = pd.read_csv(csv_path)
64
+ dfs.append(df)
65
+
66
+ if not dfs:
67
+ return pd.DataFrame()
68
+
69
+ combined = pd.concat(dfs, ignore_index=True)
70
+ return combined
71
+
72
+
73
+ def build_enhanced_metadata(
74
+ input_dirs: list[Path],
75
+ cellxgene_metadata_csvs: list[Path],
76
+ output_path: Path,
77
+ force_rescan: bool = False,
78
+ ) -> pd.DataFrame:
79
+ """Build enhanced metadata by combining CELLxGENE metadata with quick scans."""
80
+
81
+ # Discover all h5ad files
82
+ all_files = []
83
+ for root in input_dirs:
84
+ if root.exists():
85
+ all_files.extend(root.rglob("*.h5ad"))
86
+ all_files = sorted(set(all_files))
87
+
88
+ if not all_files:
89
+ raise ValueError("No .h5ad files found in input directories")
90
+
91
+ # Load existing enhanced metadata if available
92
+ existing_metadata = pd.DataFrame()
93
+ if output_path.exists() and not force_rescan:
94
+ try:
95
+ existing_metadata = pd.read_parquet(output_path)
96
+ print(f"Loaded existing metadata: {len(existing_metadata)} records")
97
+ except Exception as e:
98
+ print(f"Could not load existing metadata: {e}")
99
+
100
+ # Load CELLxGENE metadata
101
+ cellxgene_meta = load_cellxgene_metadata(cellxgene_metadata_csvs)
102
+ print(f"Loaded CELLxGENE metadata: {len(cellxgene_meta)} records")
103
+
104
+ # Determine which files need scanning
105
+ scanned_paths = set(existing_metadata["dataset_path"].values) if not existing_metadata.empty else set()
106
+ files_to_scan = [f for f in all_files if str(f) not in scanned_paths or force_rescan]
107
+
108
+ if not files_to_scan:
109
+ print("All files already scanned. Use --force-rescan to rescan.")
110
+ return existing_metadata
111
+
112
+ print(f"Scanning {len(files_to_scan)} new/changed datasets...")
113
+
114
+ # Quick scan new files
115
+ scan_results = []
116
+ for h5ad_path in tqdm(files_to_scan, desc="Quick scan", unit="file"):
117
+ scan_results.append(quick_scan_dataset(h5ad_path))
118
+
119
+ new_scans_df = pd.DataFrame(scan_results)
120
+
121
+ # Combine with existing metadata
122
+ if not existing_metadata.empty:
123
+ # Remove re-scanned paths from existing
124
+ existing_metadata = existing_metadata[~existing_metadata["dataset_path"].isin(new_scans_df["dataset_path"])]
125
+ enhanced_df = pd.concat([existing_metadata, new_scans_df], ignore_index=True)
126
+ else:
127
+ enhanced_df = new_scans_df
128
+
129
+ # Merge with CELLxGENE metadata if available
130
+ if not cellxgene_meta.empty and "dataset_h5ad_path" in cellxgene_meta.columns:
131
+ enhanced_df["dataset_h5ad_filename"] = enhanced_df["dataset_file"]
132
+ cellxgene_meta_subset = cellxgene_meta[["dataset_h5ad_path", "dataset_total_cell_count", "organism", "collection_name", "dataset_title"]].copy()
133
+ cellxgene_meta_subset = cellxgene_meta_subset.rename(columns={"dataset_h5ad_path": "dataset_h5ad_filename"})
134
+ enhanced_df = enhanced_df.merge(cellxgene_meta_subset, on="dataset_h5ad_filename", how="left", suffixes=("", "_cellxgene"))
135
+
136
+ # Categorize by size
137
+ def categorize_size(row):
138
+ if row.get("status") != "ok":
139
+ return "failed"
140
+ entries = row.get("total_entries", 0)
141
+ if entries < 2_000_000_000:
142
+ return "small"
143
+ elif entries < 15_000_000_000:
144
+ return "medium"
145
+ elif entries < 75_000_000_000:
146
+ return "large"
147
+ else:
148
+ return "xlarge"
149
+
150
+ enhanced_df["size_category"] = enhanced_df.apply(categorize_size, axis=1)
151
+
152
+ # Add estimated memory requirement (rough)
153
+ enhanced_df["estimated_mem_gib"] = (enhanced_df["total_entries"] * 4 / (1024**3)).fillna(0).round(2)
154
+
155
+ # Save
156
+ output_path.parent.mkdir(parents=True, exist_ok=True)
157
+ enhanced_df.to_parquet(output_path, index=False)
158
+ print(f"Saved enhanced metadata: {output_path}")
159
+
160
+ # Print summary
161
+ print("\nDataset size distribution:")
162
+ print(enhanced_df["size_category"].value_counts().sort_index())
163
+
164
+ return enhanced_df
165
+
166
+
167
+ def main():
168
+ parser = argparse.ArgumentParser(description=__doc__)
169
+ parser.add_argument(
170
+ "--config",
171
+ type=Path,
172
+ help="YAML config file with metadata paths",
173
+ )
174
+ parser.add_argument(
175
+ "--input-dir",
176
+ action="append",
177
+ default=[],
178
+ help="Input directory with .h5ad files (can repeat)",
179
+ )
180
+ parser.add_argument(
181
+ "--metadata-csv",
182
+ action="append",
183
+ default=[],
184
+ help="CELLxGENE metadata CSV (can repeat)",
185
+ )
186
+ parser.add_argument(
187
+ "--output",
188
+ type=Path,
189
+ default=Path("output/cache/enhanced_metadata.parquet"),
190
+ help="Output parquet file",
191
+ )
192
+ parser.add_argument(
193
+ "--force-rescan",
194
+ action="store_true",
195
+ help="Force rescan of all datasets",
196
+ )
197
+ args = parser.parse_args()
198
+
199
+ # Load from config if provided
200
+ if args.config:
201
+ import yaml
202
+ with open(args.config) as f:
203
+ config = yaml.safe_load(f)
204
+
205
+ input_dirs = [Path(p) for p in config["paths"]["input_dirs"]]
206
+ metadata_csvs = [Path(p) for p in config["paths"].get("metadata_csvs", [])]
207
+ output_path = Path(config["paths"].get("enhanced_metadata_cache", args.output))
208
+ else:
209
+ if not args.input_dir:
210
+ args.input_dir = [
211
+ "/project/GOV108018/cell_x_gene/homo_sapiens/h5ad",
212
+ "/project/GOV108018/cell_x_gene/mus_musculus/h5ad",
213
+ ]
214
+ if not args.metadata_csv:
215
+ args.metadata_csv = [
216
+ "/project/GOV108018/cell_x_gene/metadata/dataset_metadata_homo_sapiens.csv",
217
+ "/project/GOV108018/cell_x_gene/metadata/dataset_metadata_mus_musculus.csv",
218
+ ]
219
+
220
+ input_dirs = [Path(p) for p in args.input_dir]
221
+ metadata_csvs = [Path(p) for p in args.metadata_csv]
222
+ output_path = args.output
223
+
224
+ enhanced_df = build_enhanced_metadata(
225
+ input_dirs=input_dirs,
226
+ cellxgene_metadata_csvs=metadata_csvs,
227
+ output_path=output_path,
228
+ force_rescan=args.force_rescan,
229
+ )
230
+
231
+ print(f"\nTotal datasets: {len(enhanced_df)}")
232
+ print(f"Successfully scanned: {(enhanced_df['status'] == 'ok').sum()}")
233
+ print(f"Failed: {(enhanced_df['status'] == 'failed').sum()}")
234
+
235
+
236
+ if __name__ == "__main__":
237
+ main()
scripts/distributed_eda.py CHANGED
@@ -1,5 +1,5 @@
1
  #!/usr/bin/env python3
2
- """Distributed and memory-safe EDA for large Cell x Gene .h5ad datasets."""
3
 
4
  from __future__ import annotations
5
 
@@ -12,11 +12,12 @@ import os
12
  import time
13
  from dataclasses import dataclass
14
  from pathlib import Path
15
- from typing import Iterable
16
 
17
  import anndata as ad
18
  import numpy as np
19
  import pandas as pd
 
20
  from concurrent.futures.process import BrokenProcessPool
21
  from scipy import sparse
22
  from tqdm import tqdm
@@ -90,22 +91,6 @@ def safe_name(path: Path) -> str:
90
  return f"{stem}_{digest}"
91
 
92
 
93
- def auto_workers(mem_per_worker_gib: float) -> int:
94
- cpu = os.cpu_count() or 1
95
- mem_available_gib = 0.0
96
- meminfo = Path("/proc/meminfo")
97
- if meminfo.exists():
98
- for line in meminfo.read_text().splitlines():
99
- if line.startswith("MemAvailable:"):
100
- kb = int(line.split()[1])
101
- mem_available_gib = kb / (1024 * 1024)
102
- break
103
- # Fast profile for HPC nodes: higher core utilization.
104
- by_cpu = max(1, int(cpu * 0.75))
105
- by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib)))
106
- return max(1, min(by_cpu, by_mem))
107
-
108
-
109
  def summarize_metadata(df: pd.DataFrame, max_cols: int, max_categories: int) -> dict[str, dict]:
110
  if df.empty:
111
  return {}
@@ -147,33 +132,45 @@ def extract_schema(df: pd.DataFrame) -> dict[str, object]:
147
  }
148
 
149
 
150
- def process_dataset(path: Path, chunk_size: int, max_meta_cols: int, max_categories: int) -> dict:
 
 
 
 
 
 
 
 
151
  t0 = time.time()
152
  row: dict[str, object] = {
153
  "dataset_path": str(path),
154
  "dataset_file": path.name,
155
- "file_size_gib": round(path.stat().st_size / (1024**3), 4),
 
156
  }
157
 
158
  adata = ad.read_h5ad(path, backed="r")
159
  try:
160
- n_obs = int(adata.n_obs)
161
  n_vars = int(adata.n_vars)
 
 
 
 
 
 
 
 
 
162
  total_entries = n_obs * n_vars
163
 
164
- row.update(
165
- {
166
- "n_obs": n_obs,
167
- "n_vars": n_vars,
168
- "obs_columns": int(len(adata.obs.columns)),
169
- "var_columns": int(len(adata.var.columns)),
170
- "layers_count": int(len(adata.layers.keys())),
171
- "obsm_count": int(len(adata.obsm.keys())),
172
- "varm_count": int(len(adata.varm.keys())),
173
- }
174
- )
175
- row["obs_schema"] = extract_schema(adata.obs)
176
- row["var_schema"] = extract_schema(adata.var)
177
 
178
  nnz_total = 0
179
  x_sum = 0.0
@@ -183,7 +180,11 @@ def process_dataset(path: Path, chunk_size: int, max_meta_cols: int, max_categor
183
  cell_sum_sample = ReservoirSampler(k=200_000, seed=17)
184
  cell_nnz_sample = ReservoirSampler(k=200_000, seed=23)
185
 
186
- for chunk, start, end in adata.chunked_X(chunk_size):
 
 
 
 
187
  if sparse.issparse(chunk):
188
  nnz = int(chunk.nnz)
189
  csr = chunk if sparse.isspmatrix_csr(chunk) else chunk.tocsr(copy=False)
@@ -230,12 +231,16 @@ def process_dataset(path: Path, chunk_size: int, max_meta_cols: int, max_categor
230
  for key, value in cell_nnz_quantiles.items():
231
  row[f"cell_nnz_{key}_approx"] = value
232
 
233
- row["metadata_obs_summary"] = summarize_metadata(
234
- adata.obs, max_cols=max_meta_cols, max_categories=max_categories
235
- )
236
- row["metadata_var_summary"] = summarize_metadata(
237
- adata.var, max_cols=max_meta_cols, max_categories=max_categories
238
- )
 
 
 
 
239
 
240
  row["status"] = "ok"
241
  finally:
@@ -245,223 +250,286 @@ def process_dataset(path: Path, chunk_size: int, max_meta_cols: int, max_categor
245
  return row
246
 
247
 
248
- def discover_h5ad(input_dirs: list[Path]) -> list[Path]:
249
- files: list[Path] = []
250
- for root in input_dirs:
251
- if root.exists():
252
- files.extend(sorted(root.rglob("*.h5ad")))
253
- files = sorted(set(files))
254
- return files
255
-
256
 
257
- def run_parallel_batch(
258
- paths: list[Path],
259
- workers: int,
260
- chunk_size: int,
261
- max_meta_cols: int,
262
- max_categories: int,
263
- per_dataset_dir: Path,
264
- summary_rows: list[dict],
265
- failures: list[dict],
266
- pbar: tqdm,
267
- log_each_dataset: bool,
268
- ) -> list[Path]:
269
- remaining: list[Path] = []
270
- finished_paths: set[Path] = set()
271
-
272
- with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as ex:
273
- futures = {
274
- ex.submit(
275
- process_dataset,
276
- path,
277
- chunk_size,
278
- max_meta_cols,
279
- max_categories,
280
- ): path
281
- for path in paths
282
- }
283
 
284
- for fut in concurrent.futures.as_completed(futures):
285
- path = futures[fut]
286
- try:
287
- row = fut.result()
288
- summary_rows.append(row)
289
- payload_name = safe_name(path) + ".json"
290
- (per_dataset_dir / payload_name).write_text(json.dumps(row, indent=2))
291
- if log_each_dataset:
292
- tqdm.write(f"[ok] {path.name} ({row.get('elapsed_sec', 'na')}s)")
293
- finished_paths.add(path)
294
- pbar.update(1)
295
- except BrokenProcessPool as exc:
296
- msg = (
297
- f"worker pool crashed while handling {path.name}; "
298
- "switching remaining datasets to isolated retries"
299
- )
300
- tqdm.write(f"[pool-broken] {msg}: {exc}")
301
- # Re-run this path and all unfinished paths in isolated mode.
302
- remaining = [path] + [p for p in paths if p not in finished_paths and p != path]
303
- break
304
- except Exception as exc: # noqa: BLE001
305
- msg = {"dataset_path": str(path), "error": repr(exc), "status": "failed"}
306
- failures.append(msg)
307
- finished_paths.add(path)
308
- tqdm.write(f"[failed] {path.name}: {exc}")
309
- pbar.update(1)
310
- return remaining
311
-
312
-
313
- def run_isolated_retries(
314
- paths: list[Path],
315
- chunk_size: int,
316
- max_meta_cols: int,
317
- max_categories: int,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
318
  per_dataset_dir: Path,
319
- summary_rows: list[dict],
320
- failures: list[dict],
321
- pbar: tqdm,
322
- log_each_dataset: bool,
323
- ) -> None:
324
- # One fresh process per dataset avoids "one crash poisons all remaining futures".
325
- for path in paths:
326
- try:
327
- with concurrent.futures.ProcessPoolExecutor(max_workers=1) as ex:
328
- fut = ex.submit(
329
- process_dataset,
330
- path,
331
- chunk_size,
332
- max_meta_cols,
333
- max_categories,
 
 
 
 
 
 
334
  )
335
- row = fut.result()
336
- summary_rows.append(row)
337
- payload_name = safe_name(path) + ".json"
338
- (per_dataset_dir / payload_name).write_text(json.dumps(row, indent=2))
339
- if log_each_dataset:
340
- tqdm.write(f"[ok] {path.name} ({row.get('elapsed_sec', 'na')}s)")
341
- except Exception as exc: # noqa: BLE001
342
- msg = {"dataset_path": str(path), "error": repr(exc), "status": "failed"}
343
- failures.append(msg)
344
- tqdm.write(f"[failed] {path.name}: {exc}")
345
- finally:
346
- pbar.update(1)
 
 
 
 
 
 
 
 
347
 
348
 
349
  def main() -> None:
350
  parser = argparse.ArgumentParser(description=__doc__)
351
  parser.add_argument(
352
- "--input-dir",
353
- action="append",
354
- default=[],
355
- help="Input folder(s) containing .h5ad files. Can be repeated.",
356
  )
357
  parser.add_argument(
358
- "--output-dir",
359
- type=Path,
360
- default=Path("whats2000_work/cell_x_gene_visualization/output/eda"),
361
  )
362
- parser.add_argument("--workers", type=int, default=0, help="0 means auto.")
363
- parser.add_argument("--chunk-size", type=int, default=4096)
364
- parser.add_argument("--mem-per-worker-gib", type=float, default=8.0)
365
- parser.add_argument("--num-shards", type=int, default=1)
366
- parser.add_argument("--shard-index", type=int, default=0)
367
- parser.add_argument("--max-meta-cols", type=int, default=20)
368
- parser.add_argument("--max-categories", type=int, default=8)
369
  parser.add_argument(
370
- "--log-each-dataset",
371
- action="store_true",
372
- help="Print per-dataset success logs. Default is off for clean tqdm output.",
 
 
 
 
 
373
  )
374
  args = parser.parse_args()
375
-
376
- if not args.input_dir:
377
- args.input_dir = [
378
- "/project/GOV108018/cell_x_gene/homo_sapiens/h5ad",
379
- "/project/GOV108018/cell_x_gene/mus_musculus/h5ad",
380
- ]
381
-
382
- roots = [Path(p) for p in args.input_dir]
383
- all_files = discover_h5ad(roots)
384
- if not all_files:
385
- raise SystemExit("No .h5ad files found in input directories.")
386
-
387
- if args.num_shards < 1:
388
- raise SystemExit("--num-shards must be >= 1")
389
- if args.shard_index < 0 or args.shard_index >= args.num_shards:
390
- raise SystemExit("--shard-index must satisfy 0 <= shard-index < num-shards")
391
-
392
- shard_files = [p for i, p in enumerate(all_files) if i % args.num_shards == args.shard_index]
393
- if not shard_files:
394
- raise SystemExit("No files assigned to this shard.")
395
-
396
- workers = args.workers if args.workers > 0 else auto_workers(args.mem_per_worker_gib)
397
- workers = min(workers, len(shard_files))
398
-
399
- args.output_dir.mkdir(parents=True, exist_ok=True)
400
- per_dataset_dir = args.output_dir / "per_dataset"
 
 
 
 
 
 
 
 
 
 
401
  per_dataset_dir.mkdir(parents=True, exist_ok=True)
402
-
403
- manifest_path = args.output_dir / f"manifest_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.txt"
404
- manifest_path.write_text("\n".join(str(x) for x in shard_files) + "\n")
405
-
 
 
 
 
 
 
 
 
 
406
  summary_rows: list[dict] = []
407
  failures: list[dict] = []
408
-
409
- print(
410
- json.dumps(
411
- {
412
- "total_files": len(all_files),
413
- "files_in_shard": len(shard_files),
414
- "workers": workers,
415
- "chunk_size": args.chunk_size,
416
- "num_shards": args.num_shards,
417
- "shard_index": args.shard_index,
 
 
 
 
 
 
 
 
 
 
418
  }
419
- )
420
- )
421
-
422
- with tqdm(total=len(shard_files), desc="Datasets", unit="dataset") as pbar:
423
- remaining_paths = run_parallel_batch(
424
- paths=shard_files,
425
- workers=workers,
426
- chunk_size=args.chunk_size,
427
- max_meta_cols=args.max_meta_cols,
428
- max_categories=args.max_categories,
429
- per_dataset_dir=per_dataset_dir,
430
- summary_rows=summary_rows,
431
- failures=failures,
432
- pbar=pbar,
433
- log_each_dataset=args.log_each_dataset,
434
- )
435
- if remaining_paths:
436
- run_isolated_retries(
437
- paths=remaining_paths,
438
- chunk_size=args.chunk_size,
439
- max_meta_cols=args.max_meta_cols,
440
- max_categories=args.max_categories,
441
- per_dataset_dir=per_dataset_dir,
442
- summary_rows=summary_rows,
443
- failures=failures,
444
- pbar=pbar,
445
- log_each_dataset=args.log_each_dataset,
446
- )
447
-
448
  summary_df = pd.DataFrame(summary_rows)
449
- summary_csv = args.output_dir / f"eda_summary_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.csv"
450
  summary_df.to_csv(summary_csv, index=False)
451
-
452
- failures_path = args.output_dir / f"eda_failures_shard_{args.shard_index:03d}_of_{args.num_shards:03d}.json"
453
  failures_path.write_text(json.dumps(failures, indent=2))
454
-
455
- print(
456
- json.dumps(
457
- {
458
- "summary_csv": str(summary_csv),
459
- "failures_json": str(failures_path),
460
- "ok_count": len(summary_rows),
461
- "failed_count": len(failures),
462
- }
463
- )
464
- )
465
 
466
 
467
  if __name__ == "__main__":
 
1
  #!/usr/bin/env python3
2
+ """Metadata-aware distributed EDA with YAML configuration and intelligent scheduling."""
3
 
4
  from __future__ import annotations
5
 
 
12
  import time
13
  from dataclasses import dataclass
14
  from pathlib import Path
15
+ from typing import Any, Iterable
16
 
17
  import anndata as ad
18
  import numpy as np
19
  import pandas as pd
20
+ import yaml
21
  from concurrent.futures.process import BrokenProcessPool
22
  from scipy import sparse
23
  from tqdm import tqdm
 
91
  return f"{stem}_{digest}"
92
 
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  def summarize_metadata(df: pd.DataFrame, max_cols: int, max_categories: int) -> dict[str, dict]:
95
  if df.empty:
96
  return {}
 
132
  }
133
 
134
 
135
+ def process_dataset_slice(
136
+ path: Path,
137
+ obs_start: int,
138
+ obs_end: int,
139
+ chunk_size: int,
140
+ max_meta_cols: int,
141
+ max_categories: int,
142
+ ) -> dict:
143
+ """Process a slice of a dataset (obs_start:obs_end rows)."""
144
  t0 = time.time()
145
  row: dict[str, object] = {
146
  "dataset_path": str(path),
147
  "dataset_file": path.name,
148
+ "obs_slice_start": obs_start,
149
+ "obs_slice_end": obs_end,
150
  }
151
 
152
  adata = ad.read_h5ad(path, backed="r")
153
  try:
154
+ n_obs_full = int(adata.n_obs)
155
  n_vars = int(adata.n_vars)
156
+
157
+ # Adjust slice bounds
158
+ obs_end = min(obs_end, n_obs_full)
159
+ n_obs = obs_end - obs_start
160
+
161
+ if n_obs <= 0:
162
+ row["status"] = "empty_slice"
163
+ return row
164
+
165
  total_entries = n_obs * n_vars
166
 
167
+ row.update({
168
+ "n_obs": n_obs,
169
+ "n_obs_full": n_obs_full,
170
+ "n_vars": n_vars,
171
+ "obs_columns": int(len(adata.obs.columns)),
172
+ "var_columns": int(len(adata.var.columns)),
173
+ })
 
 
 
 
 
 
174
 
175
  nnz_total = 0
176
  x_sum = 0.0
 
180
  cell_sum_sample = ReservoirSampler(k=200_000, seed=17)
181
  cell_nnz_sample = ReservoirSampler(k=200_000, seed=23)
182
 
183
+ # Process slice in chunks
184
+ for start_chunk in range(obs_start, obs_end, chunk_size):
185
+ end_chunk = min(start_chunk + chunk_size, obs_end)
186
+ chunk = adata.X[start_chunk:end_chunk, :]
187
+
188
  if sparse.issparse(chunk):
189
  nnz = int(chunk.nnz)
190
  csr = chunk if sparse.isspmatrix_csr(chunk) else chunk.tocsr(copy=False)
 
231
  for key, value in cell_nnz_quantiles.items():
232
  row[f"cell_nnz_{key}_approx"] = value
233
 
234
+ # Only extract metadata for first slice
235
+ if obs_start == 0:
236
+ row["metadata_obs_summary"] = summarize_metadata(
237
+ adata.obs, max_cols=max_meta_cols, max_categories=max_categories
238
+ )
239
+ row["metadata_var_summary"] = summarize_metadata(
240
+ adata.var, max_cols=max_meta_cols, max_categories=max_categories
241
+ )
242
+ row["obs_schema"] = extract_schema(adata.obs)
243
+ row["var_schema"] = extract_schema(adata.var)
244
 
245
  row["status"] = "ok"
246
  finally:
 
250
  return row
251
 
252
 
253
+ def process_dataset_full(path: Path, chunk_size: int, max_meta_cols: int, max_categories: int) -> dict:
254
+ """Process entire dataset (wrapper for backwards compatibility)."""
255
+ adata = ad.read_h5ad(path, backed="r")
256
+ n_obs = int(adata.n_obs)
257
+ adata.file.close()
258
+
259
+ return process_dataset_slice(path, 0, n_obs, chunk_size, max_meta_cols, max_categories)
 
260
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
 
262
+ def merge_slice_results(slice_results: list[dict]) -> dict:
263
+ """Merge statistics from multiple slices of the same dataset."""
264
+ if not slice_results:
265
+ return {}
266
+
267
+ if len(slice_results) == 1:
268
+ result = slice_results[0].copy()
269
+ result.pop("obs_slice_start", None)
270
+ result.pop("obs_slice_end", None)
271
+ return result
272
+
273
+ # Merge strategy: combine running stats
274
+ merged = slice_results[0].copy()
275
+ merged["n_obs"] = merged["n_obs_full"]
276
+ merged.pop("obs_slice_start", None)
277
+ merged.pop("obs_slice_end", None)
278
+
279
+ # Sum/max/min across slices
280
+ merged["nnz"] = sum(r["nnz"] for r in slice_results)
281
+ merged["cell_nnz_max"] = max(r.get("cell_nnz_max", 0) for r in slice_results)
282
+ merged["cell_nnz_min"] = min(r.get("cell_nnz_min", float('inf')) for r in slice_results)
283
+ merged["cell_sum_max"] = max(r.get("cell_sum_max", 0) for r in slice_results)
284
+ merged["cell_sum_min"] = min(r.get("cell_sum_min", float('inf')) for r in slice_results)
285
+
286
+ # Weighted average for means
287
+ total_cells = sum(r["n_obs"] for r in slice_results)
288
+ if total_cells > 0:
289
+ merged["cell_nnz_mean"] = sum(r["n_obs"] * r.get("cell_nnz_mean", 0) for r in slice_results) / total_cells
290
+ merged["cell_sum_mean"] = sum(r["n_obs"] * r.get("cell_sum_mean", 0) for r in slice_results) / total_cells
291
+
292
+ merged["elapsed_sec"] = sum(r.get("elapsed_sec", 0) for r in slice_results)
293
+ merged["num_slices_processed"] = len(slice_results)
294
+
295
+ return merged
296
+
297
+
298
+ def load_config(config_path: Path) -> dict:
299
+ """Load YAML configuration."""
300
+ with open(config_path) as f:
301
+ return yaml.safe_load(f)
302
+
303
+
304
+ def load_enhanced_metadata(cache_path: Path) -> pd.DataFrame:
305
+ """Load enhanced metadata cache."""
306
+ if not cache_path.exists():
307
+ raise FileNotFoundError(
308
+ f"Enhanced metadata cache not found: {cache_path}\n"
309
+ "Run: uv run python scripts/build_metadata_cache.py --config <config.yaml>"
310
+ )
311
+ return pd.read_parquet(cache_path)
312
+
313
+
314
+ def schedule_datasets(
315
+ metadata_df: pd.DataFrame,
316
+ config: dict,
317
+ num_shards: int,
318
+ shard_index: int,
319
+ ) -> list[tuple[Path, str, dict]]:
320
+ """
321
+ Schedule datasets based on size category and resource constraints.
322
+ Returns: list of (path, size_category, strategy) tuples
323
+ """
324
+ # Filter to this shard
325
+ if num_shards > 1:
326
+ if config["sharding"].get("strategy") == "size_balanced":
327
+ # Sort by size, distribute round-robin
328
+ metadata_df = metadata_df.sort_values("total_entries", ascending=False).reset_index(drop=True)
329
+ shard_df = metadata_df[metadata_df.index % num_shards == shard_index].copy()
330
+ else:
331
+ shard_df = metadata_df.copy()
332
+
333
+ # Filter successful scans only
334
+ shard_df = shard_df[shard_df["status"] == "ok"].copy()
335
+
336
+ # Filter by max entries threshold
337
+ max_entries = config["dataset_thresholds"]["max_entries"]
338
+ shard_df = shard_df[shard_df["total_entries"] <= max_entries].copy()
339
+
340
+ # Sort by priority (small first for fast initial progress)
341
+ priority_map = {"small": 1, "medium": 2, "large": 3, "xlarge": 4}
342
+ shard_df["priority"] = shard_df["size_category"].map(priority_map).fillna(99)
343
+ shard_df = shard_df.sort_values("priority").reset_index(drop=True)
344
+
345
+ # Build schedule
346
+ schedule = []
347
+ for _, row in shard_df.iterrows():
348
+ path = Path(row["dataset_path"])
349
+ size_cat = row["size_category"]
350
+ strategy = config["strategy"].get(size_cat, config["strategy"]["small"])
351
+ schedule.append((path, size_cat, strategy))
352
+
353
+ return schedule
354
+
355
+
356
+ def run_with_strategy(
357
+ path: Path,
358
+ size_category: str,
359
+ strategy: dict,
360
+ config: dict,
361
  per_dataset_dir: Path,
362
+ ) -> dict:
363
+ """Run EDA on a single dataset with specified strategy."""
364
+ chunk_size = int(config["resources"]["chunk_size"] * strategy["chunk_size_multiplier"])
365
+ max_meta_cols = config["metadata"]["max_meta_cols"]
366
+ max_categories = config["metadata"]["max_categories"]
367
+
368
+ try:
369
+ # Check if slicing is required
370
+ if strategy.get("require_slicing") and config["slicing"]["enabled"]:
371
+ # Process in slices
372
+ adata = ad.read_h5ad(path, backed="r")
373
+ n_obs = int(adata.n_obs)
374
+ adata.file.close()
375
+
376
+ obs_slice_size = config["slicing"]["obs_slice_size"]
377
+ slice_results = []
378
+
379
+ for start in range(0, n_obs, obs_slice_size):
380
+ end = min(start + obs_slice_size, n_obs)
381
+ slice_result = process_dataset_slice(
382
+ path, start, end, chunk_size, max_meta_cols, max_categories
383
  )
384
+ slice_results.append(slice_result)
385
+
386
+ # Merge slices
387
+ row = merge_slice_results(slice_results)
388
+ row["processing_mode"] = "sliced"
389
+ else:
390
+ # Process whole dataset
391
+ row = process_dataset_full(path, chunk_size, max_meta_cols, max_categories)
392
+ row["processing_mode"] = "full"
393
+
394
+ row["size_category"] = size_category
395
+ row["file_size_gib"] = round(path.stat().st_size / (1024**3), 4)
396
+
397
+ payload_name = safe_name(path) + ".json"
398
+ (per_dataset_dir / payload_name).write_text(json.dumps(row, indent=2))
399
+
400
+ return row
401
+
402
+ except Exception as exc:
403
+ raise RuntimeError(f"Failed to process {path}: {exc}") from exc
404
 
405
 
406
  def main() -> None:
407
  parser = argparse.ArgumentParser(description=__doc__)
408
  parser.add_argument(
409
+ "--config",
410
+ type=Path,
411
+ required=True,
412
+ help="YAML configuration file",
413
  )
414
  parser.add_argument(
415
+ "--num-shards",
416
+ type=int,
417
+ help="Override num_shards from config",
418
  )
 
 
 
 
 
 
 
419
  parser.add_argument(
420
+ "--shard-index",
421
+ type=int,
422
+ help="Override shard_index from config",
423
+ )
424
+ parser.add_argument(
425
+ "--force-workers",
426
+ type=int,
427
+ help="Override worker count",
428
  )
429
  args = parser.parse_args()
430
+
431
+ # Load config
432
+ config = load_config(args.config)
433
+
434
+ # Override sharding if specified
435
+ if args.num_shards is not None:
436
+ config["sharding"]["num_shards"] = args.num_shards
437
+ config["sharding"]["enabled"] = args.num_shards > 1
438
+ if args.shard_index is not None:
439
+ config["sharding"]["shard_index"] = args.shard_index
440
+
441
+ num_shards = config["sharding"]["num_shards"]
442
+ shard_index = config["sharding"]["shard_index"]
443
+
444
+ # Load enhanced metadata
445
+ cache_path = Path(config["paths"]["enhanced_metadata_cache"])
446
+ if not cache_path.is_absolute():
447
+ cache_path = Path(args.config).parent.parent / cache_path
448
+
449
+ print(f"Loading metadata from: {cache_path}")
450
+ metadata_df = load_enhanced_metadata(cache_path)
451
+
452
+ # Schedule datasets
453
+ schedule = schedule_datasets(metadata_df, config, num_shards, shard_index)
454
+
455
+ if not schedule:
456
+ print("No datasets scheduled for this shard.")
457
+ return
458
+
459
+ # Setup output
460
+ output_dir = Path(config["paths"]["output_dir"])
461
+ if not output_dir.is_absolute():
462
+ output_dir = Path(args.config).parent.parent / output_dir
463
+ output_dir.mkdir(parents=True, exist_ok=True)
464
+
465
+ per_dataset_dir = output_dir / "per_dataset"
466
  per_dataset_dir.mkdir(parents=True, exist_ok=True)
467
+
468
+ # Print schedule summary
469
+ schedule_summary = {}
470
+ for _, size_cat, _ in schedule:
471
+ schedule_summary[size_cat] = schedule_summary.get(size_cat, 0) + 1
472
+
473
+ print(json.dumps({
474
+ "total_datasets": len(schedule),
475
+ "by_size": schedule_summary,
476
+ "shard_index": shard_index,
477
+ "num_shards": num_shards,
478
+ }, indent=2))
479
+
480
  summary_rows: list[dict] = []
481
  failures: list[dict] = []
482
+
483
+ # Process all datasets in parallel with full worker pool
484
+ # Priority ordering ensures small datasets finish first while large ones process in parallel
485
+ max_workers = args.force_workers or config["resources"]["max_workers"]
486
+
487
+ print(f"\nProcessing {len(schedule)} datasets with up to {max_workers} workers...")
488
+ print("Strategy: Processing all sizes in parallel with priority ordering\n")
489
+
490
+ with tqdm(total=len(schedule), desc="All datasets", unit="dataset") as pbar:
491
+ with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex:
492
+ futures = {
493
+ ex.submit(
494
+ run_with_strategy,
495
+ path,
496
+ size_cat,
497
+ strategy,
498
+ config,
499
+ per_dataset_dir,
500
+ ): (path, size_cat)
501
+ for path, size_cat, strategy in schedule
502
  }
503
+
504
+ for fut in concurrent.futures.as_completed(futures):
505
+ path, size_cat = futures[fut]
506
+ try:
507
+ row = fut.result()
508
+ summary_rows.append(row)
509
+ if config["behavior"]["log_each_dataset"]:
510
+ elapsed = row.get("elapsed_sec", "?")
511
+ tqdm.write(f"[ok] {path.name} ({size_cat}, {elapsed}s)")
512
+ except Exception as exc:
513
+ msg = {"dataset_path": str(path), "error": repr(exc), "status": "failed", "size_category": size_cat}
514
+ failures.append(msg)
515
+ tqdm.write(f"[failed] {path.name} ({size_cat}): {exc}")
516
+ finally:
517
+ pbar.update(1)
518
+
519
+ # Save results
 
 
 
 
 
 
 
 
 
 
 
 
520
  summary_df = pd.DataFrame(summary_rows)
521
+ summary_csv = output_dir / f"eda_summary_shard_{shard_index:03d}_of_{num_shards:03d}.csv"
522
  summary_df.to_csv(summary_csv, index=False)
523
+
524
+ failures_path = output_dir / f"eda_failures_shard_{shard_index:03d}_of_{num_shards:03d}.json"
525
  failures_path.write_text(json.dumps(failures, indent=2))
526
+
527
+ print(json.dumps({
528
+ "summary_csv": str(summary_csv),
529
+ "failures_json": str(failures_path),
530
+ "ok_count": len(summary_rows),
531
+ "failed_count": len(failures),
532
+ }, indent=2))
 
 
 
 
533
 
534
 
535
  if __name__ == "__main__":
scripts/merge_eda_shards.py CHANGED
File without changes
scripts/resource_probe.py CHANGED
File without changes
scripts/run_eda_pipeline.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Launcher script for YAML-configured EDA pipeline."""
3
+
4
+ import argparse
5
+ import subprocess
6
+ import sys
7
+ from pathlib import Path
8
+
9
+ import yaml
10
+
11
+
12
+ def run_command(cmd: list[str], description: str) -> None:
13
+ """Run a command and handle errors."""
14
+ print(f"\n{'='*80}")
15
+ print(f"{description}")
16
+ print(f"{'='*80}")
17
+ print(f"Command: {' '.join(cmd)}\n")
18
+
19
+ result = subprocess.run(cmd)
20
+ if result.returncode != 0:
21
+ print(f"\n[ERROR] {description} failed with exit code {result.returncode}")
22
+ sys.exit(result.returncode)
23
+
24
+
25
+ def main():
26
+ parser = argparse.ArgumentParser(description=__doc__)
27
+ parser.add_argument(
28
+ "--config",
29
+ type=Path,
30
+ required=True,
31
+ help="YAML configuration file",
32
+ )
33
+ parser.add_argument(
34
+ "--step",
35
+ choices=["metadata", "eda", "merge", "all"],
36
+ default="all",
37
+ help="Which step to run",
38
+ )
39
+ parser.add_argument(
40
+ "--num-shards",
41
+ type=int,
42
+ help="Number of shards for distributed processing",
43
+ )
44
+ parser.add_argument(
45
+ "--shard-index",
46
+ type=int,
47
+ help="Shard index to process (0-based)",
48
+ )
49
+ parser.add_argument(
50
+ "--force-rescan",
51
+ action="store_true",
52
+ help="Force metadata rescan",
53
+ )
54
+ args = parser.parse_args()
55
+
56
+ if not args.config.exists():
57
+ print(f"[ERROR] Config file not found: {args.config}")
58
+ sys.exit(1)
59
+
60
+ # Load config to check paths
61
+ with open(args.config) as f:
62
+ config = yaml.safe_load(f)
63
+
64
+ # Step 1: Build metadata cache
65
+ if args.step in ["metadata", "all"]:
66
+ cmd = [
67
+ "uv", "run", "python",
68
+ "scripts/build_metadata_cache.py",
69
+ "--config", str(args.config),
70
+ ]
71
+ if args.force_rescan:
72
+ cmd.append("--force-rescan")
73
+
74
+ run_command(cmd, "Step 1: Building metadata cache")
75
+
76
+ # Step 2: Run EDA
77
+ if args.step in ["eda", "all"]:
78
+ cmd = [
79
+ "uv", "run", "python",
80
+ "scripts/distributed_eda.py",
81
+ "--config", str(args.config),
82
+ ]
83
+ if args.num_shards is not None:
84
+ cmd.extend(["--num-shards", str(args.num_shards)])
85
+ if args.shard_index is not None:
86
+ cmd.extend(["--shard-index", str(args.shard_index)])
87
+
88
+ run_command(cmd, "Step 2: Running EDA")
89
+
90
+ # Step 3: Merge shards (if sharding was used)
91
+ if args.step in ["merge", "all"]:
92
+ if args.num_shards and args.num_shards > 1 and args.shard_index is None:
93
+ # Only merge if we're running all shards or explicitly asked
94
+ print("\n[INFO] Sharding enabled but not merging (specify --step merge to merge manually)")
95
+ elif args.step == "merge":
96
+ cmd = [
97
+ "uv", "run", "python",
98
+ "scripts/merge_eda_shards.py",
99
+ "--output-dir", config["paths"]["output_dir"],
100
+ ]
101
+ run_command(cmd, "Step 3: Merging shard results")
102
+ elif args.step == "all" and not args.num_shards:
103
+ print("\n[INFO] Single shard run, no merge needed")
104
+
105
+ print(f"\n{'='*80}")
106
+ print("Pipeline completed successfully!")
107
+ print(f"{'='*80}")
108
+ print(f"\nResults written to: {config['paths']['output_dir']}")
109
+
110
+
111
+ if __name__ == "__main__":
112
+ main()
scripts/run_eda_slurm.sh ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+ #SBATCH --job-name=eda_pipeline
3
+ #SBATCH --output=logs/eda_%A_%a.out
4
+ #SBATCH --error=logs/eda_%A_%a.err
5
+ #SBATCH --time=24:00:00
6
+ #SBATCH --mem=256G
7
+ #SBATCH --cpus-per-task=42
8
+ #SBATCH --array=0-3
9
+
10
+ # SLURM batch script for distributed EDA with YAML config
11
+ # Usage: sbatch --array=0-N scripts/run_eda_slurm.sh configs/eda_optimized.yaml
12
+ # where N is num_shards - 1
13
+
14
+ CONFIG_FILE=${1:-configs/eda_optimized.yaml}
15
+ NUM_SHARDS=${2:-4}
16
+ SHARD_INDEX=${SLURM_ARRAY_TASK_ID}
17
+
18
+ echo "========================================="
19
+ echo "EDA Pipeline - Shard ${SHARD_INDEX}/${NUM_SHARDS}"
20
+ echo "Config: ${CONFIG_FILE}"
21
+ echo "========================================="
22
+
23
+ cd /project/GOV108018/whats2000_work/cell_x_gene_visualization
24
+
25
+ # Build metadata cache (only first job)
26
+ if [ ${SHARD_INDEX} -eq 0 ]; then
27
+ echo "Building metadata cache..."
28
+ uv run python scripts/build_metadata_cache.py --config "${CONFIG_FILE}"
29
+
30
+ # Wait a bit for cache to be written
31
+ sleep 30
32
+ else
33
+ # Wait for first job to build cache
34
+ echo "Waiting for metadata cache..."
35
+ CACHE_PATH=$(python -c "import yaml; c=yaml.safe_load(open('${CONFIG_FILE}')); print(c['paths']['enhanced_metadata_cache'])")
36
+
37
+ # Wait up to 10 minutes for cache
38
+ for i in {1..60}; do
39
+ if [ -f "${CACHE_PATH}" ]; then
40
+ echo "Cache found!"
41
+ break
42
+ fi
43
+ echo "Waiting for cache... ($i/60)"
44
+ sleep 10
45
+ done
46
+
47
+ if [ ! -f "${CACHE_PATH}" ]; then
48
+ echo "ERROR: Metadata cache not found after waiting"
49
+ exit 1
50
+ fi
51
+ fi
52
+
53
+ # Run EDA for this shard
54
+ echo "Running EDA for shard ${SHARD_INDEX}..."
55
+ uv run python scripts/distributed_eda.py \
56
+ --config "${CONFIG_FILE}" \
57
+ --num-shards "${NUM_SHARDS}" \
58
+ --shard-index "${SHARD_INDEX}"
59
+
60
+ echo "Shard ${SHARD_INDEX} completed!"