whats2000's picture
feat(config): fix some hard code config and docs
685b361
#!/usr/bin/env python3
"""Probe local HPC resources and suggest safe EDA concurrency settings.
This script dynamically calculates memory per worker from the EDA config file
(configs/eda_optimized.yaml) using max_memory_gib / max_workers.
"""
from __future__ import annotations
import argparse
import json
import os
import platform
import shutil
import sys
from pathlib import Path
import yaml
def _mem_available_gib() -> float:
meminfo = Path("/proc/meminfo")
if not meminfo.exists():
return 0.0
for line in meminfo.read_text().splitlines():
if line.startswith("MemAvailable:"):
kb = int(line.split()[1])
return kb / (1024 * 1024)
return 0.0
def _mem_total_gib() -> float:
meminfo = Path("/proc/meminfo")
if not meminfo.exists():
return 0.0
for line in meminfo.read_text().splitlines():
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
return kb / (1024 * 1024)
return 0.0
def _recommend_workers(cpu_count: int, mem_available_gib: float, mem_per_worker_gib: float) -> int:
# Fast profile for HPC: use more cores while still leaving headroom.
by_cpu = max(1, int(cpu_count * 0.75))
by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib)))
return max(1, min(by_cpu, by_mem))
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--config",
type=Path,
default=Path(__file__).parent.parent / "configs" / "eda_optimized.yaml",
help="Path to YAML configuration file.",
)
parser.add_argument(
"--workdir",
type=Path,
help="Path to check disk usage for (if not specified, uses first input_dir from config).",
)
args = parser.parse_args()
# Read config to calculate dynamic mem_per_worker_gib
config_path = args.config
if not config_path.exists():
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with open(config_path) as f:
config = yaml.safe_load(f)
max_memory_gib = config['resources']['max_memory_gib']
max_workers = config['resources']['max_workers']
mem_per_worker_gib = max_memory_gib / max_workers
# Use workdir from args, or fall back to first input_dir from config
workdir = args.workdir
if workdir is None:
input_dirs = config.get('paths', {}).get('input_dirs', [])
if input_dirs:
workdir = Path(input_dirs[0]).parent.parent # Go up to project root
else:
workdir = Path.cwd()
cpu_count = os.cpu_count() or 1
mem_total_gib = _mem_total_gib()
mem_available_gib = _mem_available_gib()
disk_total, disk_used, disk_free = shutil.disk_usage(workdir)
recommended_workers = _recommend_workers(
cpu_count=cpu_count,
mem_available_gib=mem_available_gib,
mem_per_worker_gib=mem_per_worker_gib,
)
recommended_shards = max(1, min(8, cpu_count // max(1, recommended_workers)))
report = {
"hostname": platform.node(),
"platform": platform.platform(),
"cpu_count": cpu_count,
"memory_total_gib": round(mem_total_gib, 2),
"memory_available_gib": round(mem_available_gib, 2),
"disk_total_gib": round(disk_total / (1024**3), 2),
"disk_used_gib": round(disk_used / (1024**3), 2),
"disk_free_gib": round(disk_free / (1024**3), 2),
"assumptions": {"mem_per_worker_gib": mem_per_worker_gib},
"recommendation": {
"workers_per_node": recommended_workers,
"num_shards_suggestion": recommended_shards,
"chunk_size_suggestion": 4096,
},
}
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()