HPCOpenenv / sysadmin_env /tasks /hpc_munge.py
huggingmenfordays's picture
deploy: ccyloopss/HPCOpenenv — with OPENENV_API_KEY auth guard
bc35a94
from __future__ import annotations
import json
import re
import stat
from pathlib import Path
from sysadmin_env.models import DiagnosticTrigger
from sysadmin_env.models import DifficultyTier
from sysadmin_env.models import TaskMetadata
from sysadmin_env.models import TaskScenarioDefinition
from sysadmin_env.models import TaskScenarioState
from sysadmin_env.tasks import hpc_outage
TASK_ID = "hpc_munge"
COMPLETION_HEALTH = 1.0
SHARED_STATE_PATH = hpc_outage.SHARED_STATE_PATH
NODES_ROOT = hpc_outage.NODES_ROOT
COMPUTE_ROOT = hpc_outage.COMPUTE_ROOT
MUNGE_KEY_RELATIVE = Path("etc/munge/munge.key")
MUNGE_KEY_PATH = COMPUTE_ROOT / MUNGE_KEY_RELATIVE
EXPECTED_KEY_MODE = 0o400
EXPECTED_KEY_BYTES = b"MUNGE_KEY_" + b"A" * 54 + b"\n"
INITIAL_STATE: dict = {
"cluster": "rocky-hpc",
"cores_total": hpc_outage.CLUSTER_CORES_TOTAL,
"cores_per_node": hpc_outage.CLUSTER_CORES_PER_NODE,
"partitions": {
"compute": {"nodes": ["compute-01"], "default": True},
},
"nodes": {
"login": {
"state": "up",
"reason": "",
"cores": hpc_outage.CLUSTER_CORES_PER_NODE,
},
"compute-01": {
"state": "drain",
"reason": "munge authentication failed",
"cores": hpc_outage.CLUSTER_CORES_PER_NODE,
},
},
"services": {
"slurmd@login": "active",
"slurmd@compute-01": "failed",
"slurmctld@login": "active",
"munge@compute-01": "failed",
"munge@login": "active",
},
"jobs": [
{
"id": 8421,
"name": "cfd_simulation",
"user": "engineer",
"state": "PD",
"partition": "compute",
"nodes": "(AuthFail)",
"time": "0:00",
},
],
}
def build_definition(base_filesystem_path: str) -> TaskScenarioDefinition:
metadata = TaskMetadata(
task_id=TASK_ID,
difficulty=DifficultyTier.hard,
description="slurm compute node draining due to munge key permission fault and broken route",
max_steps=90,
time_limit=600.0,
base_filesystem_path=base_filesystem_path,
)
return TaskScenarioDefinition(
metadata=metadata,
requires_network_isolation=False,
allows_nested_sandbox=True,
diagnostic_triggers=diagnostic_triggers(),
)
def diagnostic_triggers() -> list[DiagnosticTrigger]:
return [
DiagnosticTrigger(
fact_id="cluster_queue_inspected",
command_patterns=[r"\bsinfo\b", r"\bsqueue\b"],
reward=0.06,
),
DiagnosticTrigger(
fact_id="compute_node_entered",
command_patterns=[r"\bssh\s+compute-01\b"],
reward=0.07,
),
DiagnosticTrigger(
fact_id="munge_key_inspected",
command_patterns=[r"ls\s+-l\s+.+munge", r"stat\s+.+munge\.key", r"cat\s+.+munge\.key"],
reward=0.05,
),
DiagnosticTrigger(
fact_id="munge_service_checked",
command_patterns=[r"systemctl\s+status\s+munge", r"systemctl\s+is-failed\s+munge"],
reward=0.05,
),
DiagnosticTrigger(
fact_id="ood_portal_probed",
command_patterns=[r"curl\s+.+localhost:8080", r"curl\s+.+127\.0\.0\.1:8080"],
reward=0.05,
),
]
def prepare_filesystem(root: str | Path) -> None:
root_path = Path(root)
hpc_outage.prepare_filesystem(root_path)
_write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE)
(root_path / COMPUTE_ROOT / "etc/munge").mkdir(parents=True, exist_ok=True)
key_path = root_path / MUNGE_KEY_PATH
key_path.write_bytes(EXPECTED_KEY_BYTES)
key_path.chmod(0o644)
def inject_fault(root: str | Path) -> None:
prepare_filesystem(root)
def observe_command(root: str | Path, command: str, _result) -> None:
_ = Path(root)
_ = command
def synchronize(root: str | Path) -> None:
root_path = Path(root)
if not (root_path / SHARED_STATE_PATH).exists():
_write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE)
def grade(root: str | Path) -> TaskScenarioState:
root_path = Path(root)
key_path = root_path / MUNGE_KEY_PATH
key_locked_down = _key_mode_matches(key_path)
key_contents_intact = _key_contents_match(key_path)
munge_key_fixed = key_locked_down and key_contents_intact
state_doc = _read_state(root_path / SHARED_STATE_PATH)
node_state = (
state_doc.get("nodes", {})
.get("compute-01", {})
.get("state", "")
)
munge_service = (
state_doc.get("services", {}).get("munge@compute-01", "")
)
slurmd_service = (
state_doc.get("services", {}).get("slurmd@compute-01", "")
)
auth_restored = munge_service == "active"
node_idle = node_state == "idle" and slurmd_service == "active"
health = 0.0
if munge_key_fixed:
health += 0.3
if auth_restored:
health += 0.3
if node_idle:
health = COMPLETION_HEALTH
done = munge_key_fixed and auth_restored and node_idle
return TaskScenarioState(
health=health,
done=done,
details={
"munge_key_mode_correct": key_locked_down,
"munge_key_contents_correct": key_contents_intact,
"munge_service_active": auth_restored,
"compute_node_idle": node_idle,
"expected_mode_octal": oct(EXPECTED_KEY_MODE),
},
)
def command_reveals_fact(command: str, trigger: DiagnosticTrigger) -> bool:
return any(re.search(pattern, command, flags=re.IGNORECASE) for pattern in trigger.command_patterns)
def _key_mode_matches(path: Path) -> bool:
if not path.exists():
return False
mode = stat.S_IMODE(path.stat().st_mode)
return mode == EXPECTED_KEY_MODE
def _key_contents_match(path: Path) -> bool:
if not path.exists():
return False
return path.read_bytes() == EXPECTED_KEY_BYTES
def _write_state(path: Path, doc: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(doc, indent=2, sort_keys=True) + "\n")
def _read_state(path: Path) -> dict:
if not path.exists():
return {}
try:
return json.loads(path.read_text() or "{}")
except json.JSONDecodeError:
return {}