HPCOpenenv / sysadmin_env /overlayfs.py
huggingmenfordays's picture
deploy: ccyloopss/HPCOpenenv — with OPENENV_API_KEY auth guard
bc35a94
import shutil
import subprocess
import tempfile
import time
import uuid
from pathlib import Path
DEFAULT_VOLATILE_ROOT = "/dev/shm"
class OverlayFSManager:
"""manages overlayfs stacks for sub second filesystem state resets"""
def __init__(
self,
base_dir: str | None = None,
*,
volatile_root: str | None = None,
):
"""
base dir is the parent directory where the merged mount point is created
volatile root is the ram backed filesystem where upperdir and workdir live
defaults to /dev/shm so resets never hit persistent disk io
"""
if base_dir is not None:
self._base_dir = Path(base_dir)
self._base_dir.mkdir(parents=True, exist_ok=True)
self._owns_base_dir = False
else:
self._base_dir = Path(tempfile.mkdtemp(prefix="overlayfs_"))
self._owns_base_dir = True
volatile_candidate = Path(volatile_root) if volatile_root is not None else Path(DEFAULT_VOLATILE_ROOT)
self._volatile_base = self._select_volatile_base(volatile_candidate)
self._volatile_dir = self._volatile_base / f"overlay_{uuid.uuid4().hex}"
self._volatile_dir.mkdir(parents=True, exist_ok=True)
print(f"overlay volatile root {self._volatile_dir}")
self._lowerdir: Path | None = None
self._upperdir: Path | None = None
self._workdir: Path | None = None
self._merged: Path | None = None
self._mounted = False
self._mount_type: str | None = None
@property
def lowerdir(self) -> Path | None:
return self._lowerdir
@property
def upperdir(self) -> Path | None:
return self._upperdir
@property
def workdir(self) -> Path | None:
return self._workdir
@property
def merged(self) -> Path | None:
return self._merged
@property
def is_mounted(self) -> bool:
return self._mounted
@property
def mount_type(self) -> str | None:
return self._mount_type
@property
def volatile_dir(self) -> Path:
return self._volatile_dir
def create_stack(self, lowerdir: str | Path) -> Path:
"""
creates the overlay directory stack given a lowerdir path
upperdir and workdir are pinned to the volatile ram disk
returns the path to the merged directory
"""
lowerdir = Path(lowerdir).resolve()
if not lowerdir.is_dir():
raise FileNotFoundError(f"lowerdir does not exist {lowerdir}")
self._lowerdir = lowerdir
self._upperdir = self._volatile_dir / "upper"
self._workdir = self._volatile_dir / "work"
self._merged = self._base_dir / "merged"
self._upperdir.mkdir(exist_ok=True)
self._workdir.mkdir(exist_ok=True)
self._merged.mkdir(exist_ok=True)
print(f"overlay stack created upper {self._upperdir} work {self._workdir} merged {self._merged}")
return self._merged
def mount(self) -> None:
"""
mounts the overlay filesystem trying kernel overlayfs first
then falling back to fuse overlayfs for unprivileged contexts
"""
if self._mounted:
raise RuntimeError("overlay already mounted")
if self._merged is None:
raise RuntimeError("create stack must be called before mount")
try:
print("overlay kernel mount start")
self._mount_kernel()
self._mount_type = "kernel"
print("overlay mounted via kernel overlayfs")
except (PermissionError, OSError, subprocess.CalledProcessError) as exc:
print(f"overlay kernel mount failed {type(exc).__name__.lower()}")
try:
print("overlay fuse mount start")
self._mount_fuse()
self._mount_type = "fuse"
print("overlay mounted via fuse overlayfs")
except (FileNotFoundError, OSError, subprocess.CalledProcessError) as fuse_exc:
print(f"overlay fuse mount failed {type(fuse_exc).__name__.lower()}")
self._mount_copy()
self._mount_type = "copy"
print("overlay mounted via copy fallback")
self._mounted = True
def _mount_copy(self) -> None:
if self._lowerdir is None or self._merged is None:
raise RuntimeError("copy fallback requires lowerdir and merged path")
self._clear_directory(self._merged)
shutil.copytree(self._lowerdir, self._merged, dirs_exist_ok=True, symlinks=True)
def _mount_kernel(self) -> None:
mount_opts = (
f"lowerdir={self._lowerdir},"
f"upperdir={self._upperdir},"
f"workdir={self._workdir}"
)
result = subprocess.run(
["mount", "-t", "overlay", "overlay", "-o", mount_opts, str(self._merged)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
raise PermissionError(f"kernel mount failed {result.stderr.strip()}")
def _mount_fuse(self) -> None:
fuse_bin = shutil.which("fuse-overlayfs")
if fuse_bin is None:
raise FileNotFoundError("fuse-overlayfs binary not found in path")
print(f"overlay fuse binary {fuse_bin}")
mount_opts = (
f"lowerdir={self._lowerdir},"
f"upperdir={self._upperdir},"
f"workdir={self._workdir}"
)
result = subprocess.run(
[fuse_bin, "-o", mount_opts, str(self._merged)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
raise OSError(f"fuse overlayfs mount failed {result.stderr.strip()}")
def reset(self) -> float:
"""
resets the overlay by clearing upperdir contents and recreating workdir
upperdir/workdir live on tmpfs so this stays sub 10ms on warm kernels
returns the reset latency in milliseconds
"""
if not self._mounted:
raise RuntimeError("overlay is not mounted")
start = time.perf_counter()
mount_type = self._mount_type
if mount_type == "copy":
if self._merged is None:
raise RuntimeError("copy fallback merged path missing")
self._mount_copy()
self._mount_type = "copy"
else:
self.unmount()
self._purge_volatile_pair()
if self._merged is not None:
self._merged.mkdir(exist_ok=True)
if mount_type == "kernel":
self._mount_kernel()
self._mount_type = "kernel"
else:
self._mount_fuse()
self._mount_type = "fuse"
self._mounted = True
elapsed_ms = (time.perf_counter() - start) * 1000.0
print(f"overlay reset {elapsed_ms:.1f}ms")
return elapsed_ms
def unmount(self) -> None:
"""unmounts the overlay filesystem"""
if not self._mounted:
return
if self._mount_type == "copy":
self._mounted = False
self._mount_type = None
print("overlay unmounted")
return
if self._mount_type == "fuse":
result = subprocess.run(
["fusermount", "-u", str(self._merged)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
subprocess.run(
["fusermount3", "-u", str(self._merged)],
capture_output=True,
text=True,
timeout=10,
)
else:
subprocess.run(
["umount", str(self._merged)],
capture_output=True,
text=True,
timeout=10,
)
self._mounted = False
self._mount_type = None
print("overlay unmounted")
def _purge_volatile_pair(self) -> None:
"""wipes upperdir and workdir trees from the volatile ram disk"""
for target in (self._upperdir, self._workdir):
if target is None:
continue
if target.exists():
shutil.rmtree(target, ignore_errors=True)
target.mkdir(parents=True, exist_ok=True)
def _clear_directory(self, directory: Path) -> None:
directory.mkdir(parents=True, exist_ok=True)
for entry in directory.iterdir():
if entry.is_dir() and not entry.is_symlink():
shutil.rmtree(entry)
else:
entry.unlink()
def _select_volatile_base(self, preferred: Path) -> Path:
"""picks a ram backed root or falls back to the system temp dir"""
candidates: list[Path] = [preferred]
if preferred != Path(DEFAULT_VOLATILE_ROOT):
candidates.append(Path(DEFAULT_VOLATILE_ROOT))
candidates.append(Path(tempfile.gettempdir()))
for candidate in candidates:
try:
candidate.mkdir(parents=True, exist_ok=True)
probe = candidate / f".probe_{uuid.uuid4().hex}"
probe.touch()
probe.unlink()
return candidate
except OSError as exc:
print(f"overlay volatile candidate rejected {candidate} {type(exc).__name__.lower()}")
continue
raise RuntimeError("no writable volatile root available")
def cleanup(self) -> None:
"""unmounts if mounted and recursively deletes all overlay directories"""
self.unmount()
for d in [self._upperdir, self._workdir, self._merged]:
if d is not None and d.exists():
shutil.rmtree(d, ignore_errors=True)
if self._volatile_dir.exists():
shutil.rmtree(self._volatile_dir, ignore_errors=True)
if self._owns_base_dir and self._base_dir.exists():
shutil.rmtree(self._base_dir, ignore_errors=True)
self._lowerdir = None
self._upperdir = None
self._workdir = None
self._merged = None
self._mount_type = None
print("overlay cleanup complete")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return False