whats2000 commited on
Commit
32516b1
·
1 Parent(s): 5910420

feat(eda): enhance resource utilization by optimizing worker allocation and processing parameters

Browse files
configs/eda_optimized.yaml CHANGED
@@ -3,9 +3,9 @@
3
  # Aggressively optimized for maximum throughput on medium/large/xlarge datasets
4
 
5
  resources:
6
- max_memory_gib: 5000 # Up to 5TB available for processing
7
- max_workers: 80 # Increased from 48 to utilize more cores (80/112 = 71%)
8
- chunk_size: 150000 # Doubled to reduce overhead and increase throughput
9
 
10
  paths:
11
  input_dirs:
@@ -31,7 +31,7 @@ dataset_thresholds:
31
 
32
  slicing:
33
  enabled: true
34
- obs_slice_size: 500000 # Larger slices = fewer file opens, faster processing
35
  overlap: 0
36
  merge_strategy: "combine"
37
 
 
3
  # Aggressively optimized for maximum throughput on medium/large/xlarge datasets
4
 
5
  resources:
6
+ max_memory_gib: 5800 # Use nearly all 6TB available
7
+ max_workers: 100 # More workers = more parallel tasks (90% of 112 cores)
8
+ chunk_size: 200000 # Large chunks to fill memory - load more data per operation
9
 
10
  paths:
11
  input_dirs:
 
31
 
32
  slicing:
33
  enabled: true
34
+ obs_slice_size: 250000 # Smaller slices = MORE parallel tasks to feed workers
35
  overlap: 0
36
  merge_strategy: "combine"
37
 
scripts/distributed_eda.py CHANGED
@@ -1041,7 +1041,8 @@ def main() -> None:
1041
  # Cluster setup for large datasets
1042
  max_memory_gib = config["resources"]["max_memory_gib"]
1043
  max_workers = config["resources"]["max_workers"]
1044
- min_workers = min(4, max_workers)
 
1045
 
1046
  memory_per_worker_gib = max(2.0, max_memory_gib / max_workers)
1047
 
@@ -1070,21 +1071,21 @@ def main() -> None:
1070
 
1071
  cluster = LocalCluster(
1072
  n_workers=min_workers,
1073
- threads_per_worker=1,
1074
  processes=True,
1075
  memory_limit=f"{memory_per_worker_gib}GiB",
1076
  silence_logs=True,
1077
  dashboard_address=None,
1078
- lifetime="120 minutes",
1079
- lifetime_stagger="15 minutes",
1080
  )
1081
 
1082
  cluster.adapt(
1083
  minimum=min_workers,
1084
  maximum=max_workers,
1085
- target_duration="30s",
1086
- wait_count=3,
1087
- interval="2s",
1088
  )
1089
 
1090
  client = Client(cluster)
 
1041
  # Cluster setup for large datasets
1042
  max_memory_gib = config["resources"]["max_memory_gib"]
1043
  max_workers = config["resources"]["max_workers"]
1044
+ # Start with ALL workers immediately for maximum resource utilization
1045
+ min_workers = max_workers # Start with 100% workers from the beginning
1046
 
1047
  memory_per_worker_gib = max(2.0, max_memory_gib / max_workers)
1048
 
 
1071
 
1072
  cluster = LocalCluster(
1073
  n_workers=min_workers,
1074
+ threads_per_worker=1, # 1 thread = more processes = better parallelism
1075
  processes=True,
1076
  memory_limit=f"{memory_per_worker_gib}GiB",
1077
  silence_logs=True,
1078
  dashboard_address=None,
1079
+ lifetime="180 minutes",
1080
+ lifetime_stagger="20 minutes",
1081
  )
1082
 
1083
  cluster.adapt(
1084
  minimum=min_workers,
1085
  maximum=max_workers,
1086
+ target_duration="5s", # Very aggressive scaling for large datasets
1087
+ wait_count=1, # React immediately to workload
1088
+ interval="1s", # Check frequently
1089
  )
1090
 
1091
  client = Client(cluster)