| | |
| | """Probe local HPC resources and suggest safe EDA concurrency settings. |
| | |
| | This script dynamically calculates memory per worker from the EDA config file |
| | (configs/eda_optimized.yaml) using max_memory_gib / max_workers. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import json |
| | import os |
| | import platform |
| | import shutil |
| | import sys |
| | from pathlib import Path |
| | import yaml |
| |
|
| |
|
| | def _mem_available_gib() -> float: |
| | meminfo = Path("/proc/meminfo") |
| | if not meminfo.exists(): |
| | return 0.0 |
| | for line in meminfo.read_text().splitlines(): |
| | if line.startswith("MemAvailable:"): |
| | kb = int(line.split()[1]) |
| | return kb / (1024 * 1024) |
| | return 0.0 |
| |
|
| |
|
| | def _mem_total_gib() -> float: |
| | meminfo = Path("/proc/meminfo") |
| | if not meminfo.exists(): |
| | return 0.0 |
| | for line in meminfo.read_text().splitlines(): |
| | if line.startswith("MemTotal:"): |
| | kb = int(line.split()[1]) |
| | return kb / (1024 * 1024) |
| | return 0.0 |
| |
|
| |
|
| | def _recommend_workers(cpu_count: int, mem_available_gib: float, mem_per_worker_gib: float) -> int: |
| | |
| | by_cpu = max(1, int(cpu_count * 0.75)) |
| | by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib))) |
| | return max(1, min(by_cpu, by_mem)) |
| |
|
| |
|
| | def main() -> None: |
| | parser = argparse.ArgumentParser(description=__doc__) |
| | parser.add_argument( |
| | "--config", |
| | type=Path, |
| | default=Path(__file__).parent.parent / "configs" / "eda_optimized.yaml", |
| | help="Path to YAML configuration file.", |
| | ) |
| | parser.add_argument( |
| | "--workdir", |
| | type=Path, |
| | help="Path to check disk usage for (if not specified, uses first input_dir from config).", |
| | ) |
| | args = parser.parse_args() |
| |
|
| | |
| | config_path = args.config |
| | if not config_path.exists(): |
| | print(f"Error: Config file not found: {config_path}", file=sys.stderr) |
| | sys.exit(1) |
| | |
| | with open(config_path) as f: |
| | config = yaml.safe_load(f) |
| | max_memory_gib = config['resources']['max_memory_gib'] |
| | max_workers = config['resources']['max_workers'] |
| | mem_per_worker_gib = max_memory_gib / max_workers |
| | |
| | |
| | workdir = args.workdir |
| | if workdir is None: |
| | input_dirs = config.get('paths', {}).get('input_dirs', []) |
| | if input_dirs: |
| | workdir = Path(input_dirs[0]).parent.parent |
| | else: |
| | workdir = Path.cwd() |
| |
|
| | cpu_count = os.cpu_count() or 1 |
| | mem_total_gib = _mem_total_gib() |
| | mem_available_gib = _mem_available_gib() |
| | disk_total, disk_used, disk_free = shutil.disk_usage(workdir) |
| |
|
| | recommended_workers = _recommend_workers( |
| | cpu_count=cpu_count, |
| | mem_available_gib=mem_available_gib, |
| | mem_per_worker_gib=mem_per_worker_gib, |
| | ) |
| | recommended_shards = max(1, min(8, cpu_count // max(1, recommended_workers))) |
| |
|
| | report = { |
| | "hostname": platform.node(), |
| | "platform": platform.platform(), |
| | "cpu_count": cpu_count, |
| | "memory_total_gib": round(mem_total_gib, 2), |
| | "memory_available_gib": round(mem_available_gib, 2), |
| | "disk_total_gib": round(disk_total / (1024**3), 2), |
| | "disk_used_gib": round(disk_used / (1024**3), 2), |
| | "disk_free_gib": round(disk_free / (1024**3), 2), |
| | "assumptions": {"mem_per_worker_gib": mem_per_worker_gib}, |
| | "recommendation": { |
| | "workers_per_node": recommended_workers, |
| | "num_shards_suggestion": recommended_shards, |
| | "chunk_size_suggestion": 4096, |
| | }, |
| | } |
| | print(json.dumps(report, indent=2)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|