whats2000 commited on
Commit
75aa70e
·
1 Parent(s): 685b361

feat(eda): update large file processing to support parallel workers and enhance metadata caching

Browse files
README.md CHANGED
@@ -177,7 +177,8 @@ strategy:
177
 
178
  - **slowdown_threshold**: If processing throughput drops below this fraction of baseline (e.g., 0.5 = 50%), the pipeline automatically reduces worker count to prevent memory thrashing
179
  - **min_workers_ratio**: Minimum workers to keep as a fraction of max_workers (e.g., 0.25 = always keep at least 25% of workers)
180
- - **large_file_threshold_gib**: Files larger than this size (in GB) are processed serially during metadata cache building to avoid out-of-memory errors
 
181
  - **chunk_size_multiplier**: Adjusts the chunk size based on dataset category (smaller = safer for large datasets)
182
 
183
  ### Dataset Slicing
@@ -333,7 +334,8 @@ Reduce workers and chunk size in config:
333
  resources:
334
  max_workers: 24
335
  chunk_size: 4096
336
- large_file_threshold_gib: 20.0 # Process larger files serially
 
337
  slicing:
338
  obs_slice_size: 50000
339
  ```
 
177
 
178
  - **slowdown_threshold**: If processing throughput drops below this fraction of baseline (e.g., 0.5 = 50%), the pipeline automatically reduces worker count to prevent memory thrashing
179
  - **min_workers_ratio**: Minimum workers to keep as a fraction of max_workers (e.g., 0.25 = always keep at least 25% of workers)
180
+ - **large_file_threshold_gib**: Files larger than this size (in GB) use a separate worker pool during metadata cache building
181
+ - **large_file_workers**: Number of parallel workers for large files (0 = serial processing, >0 = parallel). With high RAM systems, using 4-8 workers significantly speeds up cache building
182
  - **chunk_size_multiplier**: Adjusts the chunk size based on dataset category (smaller = safer for large datasets)
183
 
184
  ### Dataset Slicing
 
334
  resources:
335
  max_workers: 24
336
  chunk_size: 4096
337
+ large_file_threshold_gib: 20.0 # Process larger files separately
338
+ large_file_workers: 2 # Fewer workers for large files (or 0 for serial)
339
  slicing:
340
  obs_slice_size: 50000
341
  ```
configs/eda_config_template.yaml CHANGED
@@ -14,7 +14,8 @@ resources:
14
  min_workers_ratio: 0.25 # Minimum workers as ratio of max_workers (0.25 = keep at least 25% of workers)
15
 
16
  # File size thresholds for metadata cache building
17
- large_file_threshold_gib: 30.0 # Files larger than this (GB) are processed serially to avoid OOM
 
18
 
19
  # Adaptive scaling parameters for Dask cluster
20
  adaptive_scaling:
 
14
  min_workers_ratio: 0.25 # Minimum workers as ratio of max_workers (0.25 = keep at least 25% of workers)
15
 
16
  # File size thresholds for metadata cache building
17
+ large_file_threshold_gib: 30.0 # Files larger than this (GB) use separate worker pool
18
+ large_file_workers: 4 # Parallel workers for large files (0 = serial, >0 = parallel)
19
 
20
  # Adaptive scaling parameters for Dask cluster
21
  adaptive_scaling:
configs/eda_optimized.yaml CHANGED
@@ -14,7 +14,8 @@ resources:
14
  min_workers_ratio: 0.25 # Minimum workers as ratio of max_workers (0.25 = 1/4)
15
 
16
  # File size thresholds for metadata cache building
17
- large_file_threshold_gib: 30.0 # Files larger than this processed serially
 
18
 
19
  # Adaptive scaling parameters
20
  adaptive_scaling:
 
14
  min_workers_ratio: 0.25 # Minimum workers as ratio of max_workers (0.25 = 1/4)
15
 
16
  # File size thresholds for metadata cache building
17
+ large_file_threshold_gib: 30.0 # Files larger than this use separate worker pool
18
+ large_file_workers: 16 # Parallel workers for large files (0 = serial, >0 = parallel)
19
 
20
  # Adaptive scaling parameters
21
  adaptive_scaling:
scripts/build_metadata_cache.py CHANGED
@@ -79,6 +79,7 @@ def build_enhanced_metadata(
79
  force_rescan: bool = False,
80
  workers: int = 0,
81
  large_file_threshold_gib: float | None = None,
 
82
  config: dict = None,
83
  ) -> pd.DataFrame:
84
  """Build enhanced metadata by combining CELLxGENE metadata with quick scans."""
@@ -90,14 +91,19 @@ def build_enhanced_metadata(
90
  medium_threshold = thresholds.get("medium", 15_000_000_000)
91
  large_threshold = thresholds.get("large", 40_000_000_000)
92
  # Get large file threshold from config resources
 
93
  if large_file_threshold_gib is None:
94
- large_file_threshold_gib = config.get("resources", {}).get("large_file_threshold_gib", 30.0)
 
 
95
  else:
96
  small_threshold = 2_000_000_000
97
  medium_threshold = 15_000_000_000
98
  large_threshold = 40_000_000_000
99
  if large_file_threshold_gib is None:
100
  large_file_threshold_gib = 30.0
 
 
101
 
102
  # Auto-detect workers if not specified
103
  if workers <= 0:
@@ -162,7 +168,10 @@ def build_enhanced_metadata(
162
 
163
  print(f"\nScan strategy:")
164
  print(f" Small/medium files (<={large_file_threshold_gib} GB): {len(small_files)} files -> parallel with {workers} workers")
165
- print(f" Large files (>{large_file_threshold_gib} GB): {len(large_files)} files -> serial processing")
 
 
 
166
  print()
167
 
168
  scan_results = []
@@ -190,28 +199,59 @@ def build_enhanced_metadata(
190
  finally:
191
  pbar.update(1)
192
 
193
- # Phase 2: Serial processing of large files (one at a time to avoid OOM)
194
  if large_files:
195
- print(f"\nPhase 2: Scanning {len(large_files)} large files serially (one at a time)...")
196
- for i, path in enumerate(large_files, 1):
197
- size_gib = path.stat().st_size / (1024**3)
198
- print(f" [{i}/{len(large_files)}] {path.name} ({size_gib:.1f} GB)...", end=" ", flush=True)
199
- try:
200
- result = quick_scan_dataset(path)
201
- scan_results.append(result)
202
- if result["status"] == "ok":
203
- print(f"✓ ({result.get('scan_time_sec', '?')}s)")
204
- else:
205
- print(f"✗ {result.get('error', 'Unknown error')[:50]}")
206
- except Exception as e:
207
- scan_results.append({
208
- "dataset_path": str(path),
209
- "dataset_file": path.name,
210
- "dataset_id": path.stem,
211
- "error": str(e),
212
- "status": "failed",
213
- })
214
- print(f"✗ {str(e)[:50]}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
 
216
  new_scans_df = pd.DataFrame(scan_results)
217
 
 
79
  force_rescan: bool = False,
80
  workers: int = 0,
81
  large_file_threshold_gib: float | None = None,
82
+ large_file_workers: int | None = None,
83
  config: dict = None,
84
  ) -> pd.DataFrame:
85
  """Build enhanced metadata by combining CELLxGENE metadata with quick scans."""
 
91
  medium_threshold = thresholds.get("medium", 15_000_000_000)
92
  large_threshold = thresholds.get("large", 40_000_000_000)
93
  # Get large file threshold from config resources
94
+ resources = config.get("resources", {})
95
  if large_file_threshold_gib is None:
96
+ large_file_threshold_gib = resources.get("large_file_threshold_gib", 30.0)
97
+ if large_file_workers is None:
98
+ large_file_workers = resources.get("large_file_workers", 0)
99
  else:
100
  small_threshold = 2_000_000_000
101
  medium_threshold = 15_000_000_000
102
  large_threshold = 40_000_000_000
103
  if large_file_threshold_gib is None:
104
  large_file_threshold_gib = 30.0
105
+ if large_file_workers is None:
106
+ large_file_workers = 0
107
 
108
  # Auto-detect workers if not specified
109
  if workers <= 0:
 
168
 
169
  print(f"\nScan strategy:")
170
  print(f" Small/medium files (<={large_file_threshold_gib} GB): {len(small_files)} files -> parallel with {workers} workers")
171
+ if large_file_workers > 0:
172
+ print(f" Large files (>{large_file_threshold_gib} GB): {len(large_files)} files -> parallel with {large_file_workers} workers")
173
+ else:
174
+ print(f" Large files (>{large_file_threshold_gib} GB): {len(large_files)} files -> serial processing")
175
  print()
176
 
177
  scan_results = []
 
199
  finally:
200
  pbar.update(1)
201
 
202
+ # Phase 2: Processing large files (parallel or serial based on config)
203
  if large_files:
204
+ if large_file_workers > 0:
205
+ # Parallel processing with limited workers
206
+ print(f"\nPhase 2: Scanning {len(large_files)} large files in parallel ({large_file_workers} workers)...")
207
+ with tqdm(total=len(large_files), desc="Large files", unit="file") as pbar:
208
+ with concurrent.futures.ProcessPoolExecutor(max_workers=large_file_workers) as executor:
209
+ futures = {executor.submit(quick_scan_dataset, path): path for path in large_files}
210
+
211
+ for future in concurrent.futures.as_completed(futures):
212
+ try:
213
+ result = future.result()
214
+ scan_results.append(result)
215
+ # Show file size in progress
216
+ path = futures[future]
217
+ size_gib = path.stat().st_size / (1024**3)
218
+ status = "✓" if result.get("status") == "ok" else "✗"
219
+ elapsed = result.get("scan_time_sec", "?")
220
+ tqdm.write(f" {status} {path.name} ({size_gib:.1f} GB, {elapsed}s)")
221
+ except Exception as e:
222
+ path = futures[future]
223
+ scan_results.append({
224
+ "dataset_path": str(path),
225
+ "dataset_file": path.name,
226
+ "dataset_id": path.stem,
227
+ "error": str(e),
228
+ "status": "failed",
229
+ })
230
+ tqdm.write(f" ✗ {path.name} - {str(e)[:50]}")
231
+ finally:
232
+ pbar.update(1)
233
+ else:
234
+ # Serial processing (original behavior)
235
+ print(f"\nPhase 2: Scanning {len(large_files)} large files serially (one at a time)...")
236
+ for i, path in enumerate(large_files, 1):
237
+ size_gib = path.stat().st_size / (1024**3)
238
+ print(f" [{i}/{len(large_files)}] {path.name} ({size_gib:.1f} GB)...", end=" ", flush=True)
239
+ try:
240
+ result = quick_scan_dataset(path)
241
+ scan_results.append(result)
242
+ if result["status"] == "ok":
243
+ print(f"✓ ({result.get('scan_time_sec', '?')}s)")
244
+ else:
245
+ print(f"✗ {result.get('error', 'Unknown error')[:50]}")
246
+ except Exception as e:
247
+ scan_results.append({
248
+ "dataset_path": str(path),
249
+ "dataset_file": path.name,
250
+ "dataset_id": path.stem,
251
+ "error": str(e),
252
+ "status": "failed",
253
+ })
254
+ print(f"✗ {str(e)[:50]}")
255
 
256
  new_scans_df = pd.DataFrame(scan_results)
257