from __future__ import annotations import json import re from pathlib import Path from sysadmin_env.models import DiagnosticTrigger from sysadmin_env.models import DifficultyTier from sysadmin_env.models import TaskMetadata from sysadmin_env.models import TaskScenarioDefinition from sysadmin_env.models import TaskScenarioState from sysadmin_env.tasks import hpc_outage TASK_ID = "hpc_nfs_stale" COMPLETION_HEALTH = 1.0 SHARED_STATE_PATH = hpc_outage.SHARED_STATE_PATH NODES_ROOT = hpc_outage.NODES_ROOT COMPUTE_ROOT = hpc_outage.COMPUTE_ROOT MOUNT_STALE_RELATIVE = Path("mnt/shared/.mount_stale") MOUNT_VALID_RELATIVE = Path("mnt/shared/.mount_valid") MOUNT_STUB_RELATIVE = Path("usr/local/bin/mount") UMOUNT_STUB_RELATIVE = Path("usr/local/bin/umount") INITIAL_STATE: dict = { "cluster": "rocky-hpc", "cores_total": hpc_outage.CLUSTER_CORES_TOTAL, "cores_per_node": hpc_outage.CLUSTER_CORES_PER_NODE, "partitions": { "compute": {"nodes": ["compute-01"], "default": True}, }, "nodes": { "login": { "state": "up", "reason": "", "cores": hpc_outage.CLUSTER_CORES_PER_NODE, }, "compute-01": { "state": "drain", "reason": "nfs /mnt/shared stale file handle", "cores": hpc_outage.CLUSTER_CORES_PER_NODE, }, }, "services": { "slurmd@login": "active", "slurmd@compute-01": "failed", "slurmctld@login": "active", "rpc-statd@compute-01": "active", }, "jobs": [ { "id": 10244, "name": "cryoem_refine", "user": "structures", "state": "PD", "partition": "compute", "nodes": "(NfsStale)", "time": "0:00", }, ], } def build_definition(base_filesystem_path: str) -> TaskScenarioDefinition: metadata = TaskMetadata( task_id=TASK_ID, difficulty=DifficultyTier.hard, description="compute node drained because the nfs share at /mnt/shared reports stale file handle", max_steps=90, time_limit=600.0, base_filesystem_path=base_filesystem_path, ) return TaskScenarioDefinition( metadata=metadata, requires_network_isolation=False, allows_nested_sandbox=True, diagnostic_triggers=diagnostic_triggers(), ) def diagnostic_triggers() -> list[DiagnosticTrigger]: return [ DiagnosticTrigger( fact_id="cluster_queue_inspected", command_patterns=[r"\bsinfo\b", r"\bsqueue\b"], reward=0.06, ), DiagnosticTrigger( fact_id="compute_node_entered", command_patterns=[r"\bssh\s+compute-01\b"], reward=0.07, ), DiagnosticTrigger( fact_id="share_inspected", command_patterns=[r"ls\s+.+/mnt/shared", r"stat\s+.+/mnt/shared", r"\bmount\b"], reward=0.05, ), DiagnosticTrigger( fact_id="slurmd_service_checked", command_patterns=[r"systemctl\s+status\s+slurmd", r"systemctl\s+is-failed\s+slurmd"], reward=0.05, ), DiagnosticTrigger( fact_id="ood_portal_probed", command_patterns=[r"curl\s+.+localhost:8080", r"curl\s+.+127\.0\.0\.1:8080"], reward=0.05, ), ] def prepare_filesystem(root: str | Path) -> None: root_path = Path(root) hpc_outage.prepare_filesystem(root_path) route_path = root_path / hpc_outage.COMPUTE_ROUTE_PATH route_path.parent.mkdir(parents=True, exist_ok=True) route_path.write_text(hpc_outage.FIXED_ROUTE) valid_path = root_path / MOUNT_VALID_RELATIVE if valid_path.exists(): valid_path.unlink() stale_path = root_path / MOUNT_STALE_RELATIVE stale_path.parent.mkdir(parents=True, exist_ok=True) stale_path.write_text("stale nfs file handle detected at mount time\n") _write_executable(root_path / MOUNT_STUB_RELATIVE, _mount_stub()) _write_executable(root_path / UMOUNT_STUB_RELATIVE, _umount_stub()) compute_bin = root_path / COMPUTE_ROOT / "usr/local/bin" compute_bin.mkdir(parents=True, exist_ok=True) _write_executable(compute_bin / "mount", _mount_stub()) _write_executable(compute_bin / "umount", _umount_stub()) _write_executable(compute_bin / "systemctl", _systemctl_nfs_stub()) _write_executable(root_path / "usr/local/bin/systemctl", _systemctl_nfs_stub()) _write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE) def inject_fault(root: str | Path) -> None: prepare_filesystem(root) def observe_command(root: str | Path, command: str, _result) -> None: _ = Path(root) _ = command def synchronize(root: str | Path) -> None: root_path = Path(root) if not (root_path / SHARED_STATE_PATH).exists(): _write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE) def grade(root: str | Path) -> TaskScenarioState: root_path = Path(root) state_doc = _read_state(root_path / SHARED_STATE_PATH) stale_gone = not (root_path / MOUNT_STALE_RELATIVE).exists() mount_valid = (root_path / MOUNT_VALID_RELATIVE).exists() slurmd_service = state_doc.get("services", {}).get("slurmd@compute-01", "") slurmd_active = slurmd_service == "active" node_state = state_doc.get("nodes", {}).get("compute-01", {}).get("state", "") node_idle = node_state == "idle" health = 0.0 if stale_gone: health += 0.2 if mount_valid: health += 0.3 if slurmd_active: health += 0.2 if stale_gone and mount_valid and slurmd_active and node_idle: health = COMPLETION_HEALTH done = stale_gone and mount_valid and slurmd_active and node_idle return TaskScenarioState( health=health, done=done, details={ "stale_marker_removed": stale_gone, "mount_valid_sentinel_present": mount_valid, "slurmd_service_active": slurmd_active, "compute_node_idle": node_idle, "expected_valid_sentinel": str(MOUNT_VALID_RELATIVE), }, ) def command_reveals_fact(command: str, trigger: DiagnosticTrigger) -> bool: return any(re.search(pattern, command, flags=re.IGNORECASE) for pattern in trigger.command_patterns) def _write_executable(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) path.chmod(0o755) def _write_state(path: Path, doc: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(doc, indent=2, sort_keys=True) + "\n") def _read_state(path: Path) -> dict: if not path.exists(): return {} try: return json.loads(path.read_text() or "{}") except json.JSONDecodeError: return {} def _mount_stub() -> str: return """#!/bin/sh TARGET="" for arg in "$@"; do case "$arg" in -*|-t|nfs|-o) ;; *) TARGET="$arg" ;; esac done if [ -z "$TARGET" ]; then if [ -f /mnt/shared/.mount_stale ]; then echo "shared.hpc.local:/srv/shared on /mnt/shared type nfs4 (rw,stale)" elif [ -f /mnt/shared/.mount_valid ]; then echo "shared.hpc.local:/srv/shared on /mnt/shared type nfs4 (rw,fresh)" else echo "shared.hpc.local:/srv/shared on /mnt/shared type nfs4 (rw,idle)" fi exit 0 fi case "$TARGET" in /mnt/shared|shared) if [ -f /mnt/shared/.mount_stale ]; then echo "mount: /mnt/shared already bound with stale handle. umount first." >&2 exit 32 fi if [ -f /mnt/shared/.mount_valid ]; then echo "mount: /mnt/shared already mounted" >&2 exit 32 fi mkdir -p /mnt/shared printf 'fresh mount handle\\n' > /mnt/shared/.mount_valid echo "mounting shared.hpc.local:/srv/shared on /mnt/shared type nfs4" exit 0 ;; *) echo "mount: $TARGET is not a known mount target in this sandbox" >&2 exit 32 ;; esac """ def _umount_stub() -> str: return """#!/bin/sh LAZY=0 TARGET="" for arg in "$@"; do case "$arg" in -l|--lazy) LAZY=1 ;; -f|--force) ;; -*) ;; *) TARGET="$arg" ;; esac done if [ -z "$TARGET" ]; then echo "umount: missing target" >&2 exit 1 fi case "$TARGET" in /mnt/shared|shared) if [ -f /mnt/shared/.mount_stale ]; then rm -f /mnt/shared/.mount_stale echo "umount: /mnt/shared detached (lazy=$LAZY)" exit 0 fi if [ -f /mnt/shared/.mount_valid ]; then rm -f /mnt/shared/.mount_valid echo "umount: /mnt/shared detached clean (lazy=$LAZY)" exit 0 fi echo "umount: /mnt/shared not mounted" >&2 exit 32 ;; *) echo "umount: $TARGET is not a known mount in this sandbox" >&2 exit 32 ;; esac """ def _systemctl_nfs_stub() -> str: return """#!/usr/bin/env python3 import fcntl import json import os import socket import sys STATE_PATH = "/mnt/shared/slurm_state.json" MOUNT_VALID = "/mnt/shared/.mount_valid" MOUNT_STALE = "/mnt/shared/.mount_stale" def current_hostname(): host = os.environ.get("HOSTNAME") if host: return host.strip() try: return socket.gethostname() except OSError: return "" def mount_is_fresh(): return os.path.isfile(MOUNT_VALID) and not os.path.isfile(MOUNT_STALE) def mutate_state(mutator): with open(STATE_PATH, "r+", encoding="utf-8") as fh: fcntl.flock(fh.fileno(), fcntl.LOCK_EX) try: raw = fh.read() doc = json.loads(raw or "{}") mutator(doc) fh.seek(0) fh.truncate() fh.write(json.dumps(doc, indent=2, sort_keys=True) + "\\n") fh.flush() os.fsync(fh.fileno()) finally: fcntl.flock(fh.fileno(), fcntl.LOCK_UN) def read_state(): with open(STATE_PATH, "r", encoding="utf-8") as fh: fcntl.flock(fh.fileno(), fcntl.LOCK_SH) try: raw = fh.read() finally: fcntl.flock(fh.fileno(), fcntl.LOCK_UN) return json.loads(raw or "{}") def unit_key(unit, host): base = unit.split(".")[0] if "@" in base: return base return f"{base}@{host}" if host else base def handle_status(unit, host): try: doc = read_state() except FileNotFoundError: sys.stderr.write("systemctl: slurm state file is missing\\n") return 3 key = unit_key(unit, host) status = doc.get("services", {}).get(key, "inactive") if status == "active": print(f"{unit} - loaded active (running)") return 0 if status == "failed": print(f"{unit} - loaded failed (Result: exit-code)") return 3 print(f"{unit} - loaded inactive (dead)") return 3 def handle_is_failed(unit, host): try: doc = read_state() except FileNotFoundError: print("unknown") return 1 key = unit_key(unit, host) status = doc.get("services", {}).get(key, "inactive") print(status) return 0 if status == "failed" else 1 def handle_restart(unit, host): base = unit.split(".")[0].split("@")[0] if base != "slurmd": def noop(doc): services = doc.setdefault("services", {}) services[unit_key(unit, host)] = "active" mutate_state(noop) print(f"{unit} restarted") return 0 if host != "compute-01": def remote_restart(doc): services = doc.setdefault("services", {}) services[f"slurmd@{host or 'unknown'}"] = "active" mutate_state(remote_restart) print(f"{unit} restarted on {host or 'unknown'}") return 0 fresh = mount_is_fresh() def apply(doc): services = doc.setdefault("services", {}) nodes = doc.setdefault("nodes", {}) compute = nodes.setdefault("compute-01", {}) if fresh: services["slurmd@compute-01"] = "active" compute["state"] = "idle" compute["reason"] = "" else: services["slurmd@compute-01"] = "failed" compute["state"] = "drain" compute["reason"] = "nfs /mnt/shared stale file handle" mutate_state(apply) if fresh: print("slurmd restarted on compute-01 node returned to idle") return 0 sys.stderr.write("slurmd failed to come up /mnt/shared is stale run umount and mount first\\n") return 1 def main(argv): if len(argv) < 2: sys.stderr.write("systemctl: missing command\\n") return 1 action = argv[1] rest = argv[2:] if action in {"daemon-reload", "list-units"}: print("ok") return 0 if not rest: sys.stderr.write(f"systemctl: {action} requires a unit\\n") return 1 unit = rest[0] host = current_hostname() if action == "status": return handle_status(unit, host) if action == "is-failed": return handle_is_failed(unit, host) if action in {"restart", "start"}: return handle_restart(unit, host) if action == "stop": def stop(doc): services = doc.setdefault("services", {}) services[unit_key(unit, host)] = "inactive" mutate_state(stop) print(f"{unit} stopped") return 0 sys.stderr.write(f"systemctl: unsupported action {action}\\n") return 1 if __name__ == "__main__": sys.exit(main(sys.argv)) """