from __future__ import annotations import re from pathlib import Path from sysadmin_env.models import DiagnosticTrigger from sysadmin_env.models import DifficultyTier from sysadmin_env.models import TaskMetadata from sysadmin_env.models import TaskScenarioDefinition from sysadmin_env.models import TaskScenarioState TASK_ID = "disk_full" COMPLETION_HEALTH = 0.99 MOUNT_PATH = Path("mnt/data") HIDDEN_LOG_PATH = Path("mnt/data/.cache/.rotated/app.trace") CAPACITY_PATH = Path("mnt/data/.capacity") USAGE_PATH = Path("mnt/data/.usage") DISCOVERY_PATH = Path("mnt/data/.diagnosed") def build_definition(base_filesystem_path: str) -> TaskScenarioDefinition: metadata = TaskMetadata( task_id=TASK_ID, difficulty=DifficultyTier.medium, description="hidden sparse log file filling loopback mount", max_steps=55, time_limit=420.0, base_filesystem_path=base_filesystem_path, ) return TaskScenarioDefinition( metadata=metadata, requires_network_isolation=False, diagnostic_triggers=diagnostic_triggers(), ) def diagnostic_triggers() -> list[DiagnosticTrigger]: return [ DiagnosticTrigger( fact_id="disk_usage_checked", command_patterns=[r"df\b", r"df\s+-h"], reward=0.06, ), DiagnosticTrigger( fact_id="large_files_checked", command_patterns=[r"du\b", r"du\s+-sh"], reward=0.05, ), DiagnosticTrigger( fact_id="hidden_files_checked", command_patterns=[r"find\b.*-name", r"find\b.*-type\s+f"], reward=0.06, ), DiagnosticTrigger( fact_id="open_files_checked", command_patterns=[r"lsof\b", r"lsof\b.*deleted"], reward=0.05, ), ] def prepare_filesystem(root: str | Path) -> None: root_path = Path(root) (root_path / MOUNT_PATH / ".cache/.rotated").mkdir(parents=True, exist_ok=True) (root_path / "usr/local/bin").mkdir(parents=True, exist_ok=True) (root_path / "root").mkdir(parents=True, exist_ok=True) (root_path / CAPACITY_PATH).write_text("100\n") (root_path / DISCOVERY_PATH).write_text("unknown\n") (root_path / HIDDEN_LOG_PATH).write_text("x" * 100) _write_executable(root_path / "usr/local/bin/df", _df_stub()) _write_executable(root_path / "usr/local/bin/du", _du_stub()) _write_executable(root_path / "usr/local/bin/lsof", _lsof_stub()) synchronize(root_path) def inject_fault(root: str | Path) -> None: prepare_filesystem(root) def observe_command(root: str | Path, command: str, _result) -> None: root_path = Path(root) current_state = _usage_file_value(root_path / DISCOVERY_PATH) if re.search(r"\bdf\b", command, flags=re.IGNORECASE): current_state = "full" if re.search(r"\b(find|du|lsof|ls)\b", command, flags=re.IGNORECASE): current_state = "found" (root_path / DISCOVERY_PATH).write_text(f"{current_state}\n") synchronize(root_path) def synchronize(root: str | Path) -> None: root_path = Path(root) capacity = int((root_path / CAPACITY_PATH).read_text().strip()) hidden_size = 0 if (root_path / HIDDEN_LOG_PATH).exists(): hidden_size = len((root_path / HIDDEN_LOG_PATH).read_text()) usage = min(hidden_size, capacity) (root_path / USAGE_PATH).write_text(f"{usage}\n") def grade(root: str | Path) -> TaskScenarioState: root_path = Path(root) discovery_state = _usage_file_value(root_path / DISCOVERY_PATH) diagnosis_recorded = discovery_state in {"full", "found"} hidden_file_found = not (root_path / HIDDEN_LOG_PATH).exists() or discovery_state == "found" capacity_free = _free_capacity(root_path) > 0 health = 0.0 if diagnosis_recorded: health += 0.3 if hidden_file_found: health += 0.3 if capacity_free: health += 0.39 if capacity_free: health = COMPLETION_HEALTH return TaskScenarioState( health=health, done=capacity_free, details={ "filesystem_identified": diagnosis_recorded, "hidden_file_found": hidden_file_found, "filesystem_has_capacity": capacity_free, }, ) def command_reveals_fact(command: str, trigger: DiagnosticTrigger) -> bool: return any(re.search(pattern, command, flags=re.IGNORECASE) for pattern in trigger.command_patterns) def _usage_file_value(path: Path) -> str: if not path.exists(): return "" return path.read_text().strip() def _free_capacity(root_path: Path) -> int: capacity = int((root_path / CAPACITY_PATH).read_text().strip()) usage = int((root_path / USAGE_PATH).read_text().strip()) return capacity - usage def _write_executable(path: Path, content: str) -> None: path.write_text(content) path.chmod(0o755) def _df_stub() -> str: return """#!/bin/sh capacity="$(cat /mnt/data/.capacity 2>/dev/null || printf '%s' 100)" usage="$(cat /mnt/data/.usage 2>/dev/null || printf '%s' 0)" avail=$((capacity - usage)) if [ "$avail" -lt 0 ]; then avail=0 fi usep=0 if [ "$capacity" -gt 0 ]; then usep=$((usage * 100 / capacity)) fi printf '%s\n' "filesystem size used avail use% mounted on" printf 'loop0 %sm %sm %sm %s%% /mnt/data\n' "$capacity" "$usage" "$avail" "$usep" """ def _du_stub() -> str: return """#!/bin/sh size=0 if [ -f /mnt/data/.cache/.rotated/app.trace ]; then size=$(wc -c < /mnt/data/.cache/.rotated/app.trace) fi printf '%s\t%s\n' "$size" "/mnt/data/.cache/.rotated/app.trace" printf '%s\t%s\n' "$size" "/mnt/data/.cache/.rotated" printf '%s\t%s\n' "$size" "/mnt/data" """ def _lsof_stub() -> str: return """#!/bin/sh if [ -f /mnt/data/.cache/.rotated/app.trace ]; then printf '%s\n' "python 321 root 3r REG 0 0 0 /mnt/data/.cache/.rotated/app.trace" fi exit 0 """