#!/usr/bin/env python3 """Probe local HPC resources and suggest safe EDA concurrency settings. This script dynamically calculates memory per worker from the EDA config file (configs/eda_optimized.yaml) using max_memory_gib / max_workers. """ from __future__ import annotations import argparse import json import os import platform import shutil import sys from pathlib import Path import yaml def _mem_available_gib() -> float: meminfo = Path("/proc/meminfo") if not meminfo.exists(): return 0.0 for line in meminfo.read_text().splitlines(): if line.startswith("MemAvailable:"): kb = int(line.split()[1]) return kb / (1024 * 1024) return 0.0 def _mem_total_gib() -> float: meminfo = Path("/proc/meminfo") if not meminfo.exists(): return 0.0 for line in meminfo.read_text().splitlines(): if line.startswith("MemTotal:"): kb = int(line.split()[1]) return kb / (1024 * 1024) return 0.0 def _recommend_workers(cpu_count: int, mem_available_gib: float, mem_per_worker_gib: float) -> int: # Fast profile for HPC: use more cores while still leaving headroom. by_cpu = max(1, int(cpu_count * 0.75)) by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib))) return max(1, min(by_cpu, by_mem)) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--config", type=Path, default=Path(__file__).parent.parent / "configs" / "eda_optimized.yaml", help="Path to YAML configuration file.", ) parser.add_argument( "--workdir", type=Path, help="Path to check disk usage for (if not specified, uses first input_dir from config).", ) args = parser.parse_args() # Read config to calculate dynamic mem_per_worker_gib config_path = args.config if not config_path.exists(): print(f"Error: Config file not found: {config_path}", file=sys.stderr) sys.exit(1) with open(config_path) as f: config = yaml.safe_load(f) max_memory_gib = config['resources']['max_memory_gib'] max_workers = config['resources']['max_workers'] mem_per_worker_gib = max_memory_gib / max_workers # Use workdir from args, or fall back to first input_dir from config workdir = args.workdir if workdir is None: input_dirs = config.get('paths', {}).get('input_dirs', []) if input_dirs: workdir = Path(input_dirs[0]).parent.parent # Go up to project root else: workdir = Path.cwd() cpu_count = os.cpu_count() or 1 mem_total_gib = _mem_total_gib() mem_available_gib = _mem_available_gib() disk_total, disk_used, disk_free = shutil.disk_usage(workdir) recommended_workers = _recommend_workers( cpu_count=cpu_count, mem_available_gib=mem_available_gib, mem_per_worker_gib=mem_per_worker_gib, ) recommended_shards = max(1, min(8, cpu_count // max(1, recommended_workers))) report = { "hostname": platform.node(), "platform": platform.platform(), "cpu_count": cpu_count, "memory_total_gib": round(mem_total_gib, 2), "memory_available_gib": round(mem_available_gib, 2), "disk_total_gib": round(disk_total / (1024**3), 2), "disk_used_gib": round(disk_used / (1024**3), 2), "disk_free_gib": round(disk_free / (1024**3), 2), "assumptions": {"mem_per_worker_gib": mem_per_worker_gib}, "recommendation": { "workers_per_node": recommended_workers, "num_shards_suggestion": recommended_shards, "chunk_size_suggestion": 4096, }, } print(json.dumps(report, indent=2)) if __name__ == "__main__": main()