Spaces:
Paused
Paused
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| DEFAULT_VOLATILE_ROOT = "/dev/shm" | |
| class OverlayFSManager: | |
| """manages overlayfs stacks for sub second filesystem state resets""" | |
| def __init__( | |
| self, | |
| base_dir: str | None = None, | |
| *, | |
| volatile_root: str | None = None, | |
| ): | |
| """ | |
| base dir is the parent directory where the merged mount point is created | |
| volatile root is the ram backed filesystem where upperdir and workdir live | |
| defaults to /dev/shm so resets never hit persistent disk io | |
| """ | |
| if base_dir is not None: | |
| self._base_dir = Path(base_dir) | |
| self._base_dir.mkdir(parents=True, exist_ok=True) | |
| self._owns_base_dir = False | |
| else: | |
| self._base_dir = Path(tempfile.mkdtemp(prefix="overlayfs_")) | |
| self._owns_base_dir = True | |
| volatile_candidate = Path(volatile_root) if volatile_root is not None else Path(DEFAULT_VOLATILE_ROOT) | |
| self._volatile_base = self._select_volatile_base(volatile_candidate) | |
| self._volatile_dir = self._volatile_base / f"overlay_{uuid.uuid4().hex}" | |
| self._volatile_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"overlay volatile root {self._volatile_dir}") | |
| self._lowerdir: Path | None = None | |
| self._upperdir: Path | None = None | |
| self._workdir: Path | None = None | |
| self._merged: Path | None = None | |
| self._mounted = False | |
| self._mount_type: str | None = None | |
| def lowerdir(self) -> Path | None: | |
| return self._lowerdir | |
| def upperdir(self) -> Path | None: | |
| return self._upperdir | |
| def workdir(self) -> Path | None: | |
| return self._workdir | |
| def merged(self) -> Path | None: | |
| return self._merged | |
| def is_mounted(self) -> bool: | |
| return self._mounted | |
| def mount_type(self) -> str | None: | |
| return self._mount_type | |
| def volatile_dir(self) -> Path: | |
| return self._volatile_dir | |
| def create_stack(self, lowerdir: str | Path) -> Path: | |
| """ | |
| creates the overlay directory stack given a lowerdir path | |
| upperdir and workdir are pinned to the volatile ram disk | |
| returns the path to the merged directory | |
| """ | |
| lowerdir = Path(lowerdir).resolve() | |
| if not lowerdir.is_dir(): | |
| raise FileNotFoundError(f"lowerdir does not exist {lowerdir}") | |
| self._lowerdir = lowerdir | |
| self._upperdir = self._volatile_dir / "upper" | |
| self._workdir = self._volatile_dir / "work" | |
| self._merged = self._base_dir / "merged" | |
| self._upperdir.mkdir(exist_ok=True) | |
| self._workdir.mkdir(exist_ok=True) | |
| self._merged.mkdir(exist_ok=True) | |
| print(f"overlay stack created upper {self._upperdir} work {self._workdir} merged {self._merged}") | |
| return self._merged | |
| def mount(self) -> None: | |
| """ | |
| mounts the overlay filesystem trying kernel overlayfs first | |
| then falling back to fuse overlayfs for unprivileged contexts | |
| """ | |
| if self._mounted: | |
| raise RuntimeError("overlay already mounted") | |
| if self._merged is None: | |
| raise RuntimeError("create stack must be called before mount") | |
| try: | |
| print("overlay kernel mount start") | |
| self._mount_kernel() | |
| self._mount_type = "kernel" | |
| print("overlay mounted via kernel overlayfs") | |
| except (PermissionError, OSError, subprocess.CalledProcessError) as exc: | |
| print(f"overlay kernel mount failed {type(exc).__name__.lower()}") | |
| try: | |
| print("overlay fuse mount start") | |
| self._mount_fuse() | |
| self._mount_type = "fuse" | |
| print("overlay mounted via fuse overlayfs") | |
| except (FileNotFoundError, OSError, subprocess.CalledProcessError) as fuse_exc: | |
| print(f"overlay fuse mount failed {type(fuse_exc).__name__.lower()}") | |
| self._mount_copy() | |
| self._mount_type = "copy" | |
| print("overlay mounted via copy fallback") | |
| self._mounted = True | |
| def _mount_copy(self) -> None: | |
| if self._lowerdir is None or self._merged is None: | |
| raise RuntimeError("copy fallback requires lowerdir and merged path") | |
| self._clear_directory(self._merged) | |
| shutil.copytree(self._lowerdir, self._merged, dirs_exist_ok=True, symlinks=True) | |
| def _mount_kernel(self) -> None: | |
| mount_opts = ( | |
| f"lowerdir={self._lowerdir}," | |
| f"upperdir={self._upperdir}," | |
| f"workdir={self._workdir}" | |
| ) | |
| result = subprocess.run( | |
| ["mount", "-t", "overlay", "overlay", "-o", mount_opts, str(self._merged)], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| if result.returncode != 0: | |
| raise PermissionError(f"kernel mount failed {result.stderr.strip()}") | |
| def _mount_fuse(self) -> None: | |
| fuse_bin = shutil.which("fuse-overlayfs") | |
| if fuse_bin is None: | |
| raise FileNotFoundError("fuse-overlayfs binary not found in path") | |
| print(f"overlay fuse binary {fuse_bin}") | |
| mount_opts = ( | |
| f"lowerdir={self._lowerdir}," | |
| f"upperdir={self._upperdir}," | |
| f"workdir={self._workdir}" | |
| ) | |
| result = subprocess.run( | |
| [fuse_bin, "-o", mount_opts, str(self._merged)], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| if result.returncode != 0: | |
| raise OSError(f"fuse overlayfs mount failed {result.stderr.strip()}") | |
| def reset(self) -> float: | |
| """ | |
| resets the overlay by clearing upperdir contents and recreating workdir | |
| upperdir/workdir live on tmpfs so this stays sub 10ms on warm kernels | |
| returns the reset latency in milliseconds | |
| """ | |
| if not self._mounted: | |
| raise RuntimeError("overlay is not mounted") | |
| start = time.perf_counter() | |
| mount_type = self._mount_type | |
| if mount_type == "copy": | |
| if self._merged is None: | |
| raise RuntimeError("copy fallback merged path missing") | |
| self._mount_copy() | |
| self._mount_type = "copy" | |
| else: | |
| self.unmount() | |
| self._purge_volatile_pair() | |
| if self._merged is not None: | |
| self._merged.mkdir(exist_ok=True) | |
| if mount_type == "kernel": | |
| self._mount_kernel() | |
| self._mount_type = "kernel" | |
| else: | |
| self._mount_fuse() | |
| self._mount_type = "fuse" | |
| self._mounted = True | |
| elapsed_ms = (time.perf_counter() - start) * 1000.0 | |
| print(f"overlay reset {elapsed_ms:.1f}ms") | |
| return elapsed_ms | |
| def unmount(self) -> None: | |
| """unmounts the overlay filesystem""" | |
| if not self._mounted: | |
| return | |
| if self._mount_type == "copy": | |
| self._mounted = False | |
| self._mount_type = None | |
| print("overlay unmounted") | |
| return | |
| if self._mount_type == "fuse": | |
| result = subprocess.run( | |
| ["fusermount", "-u", str(self._merged)], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| if result.returncode != 0: | |
| subprocess.run( | |
| ["fusermount3", "-u", str(self._merged)], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| else: | |
| subprocess.run( | |
| ["umount", str(self._merged)], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| self._mounted = False | |
| self._mount_type = None | |
| print("overlay unmounted") | |
| def _purge_volatile_pair(self) -> None: | |
| """wipes upperdir and workdir trees from the volatile ram disk""" | |
| for target in (self._upperdir, self._workdir): | |
| if target is None: | |
| continue | |
| if target.exists(): | |
| shutil.rmtree(target, ignore_errors=True) | |
| target.mkdir(parents=True, exist_ok=True) | |
| def _clear_directory(self, directory: Path) -> None: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| for entry in directory.iterdir(): | |
| if entry.is_dir() and not entry.is_symlink(): | |
| shutil.rmtree(entry) | |
| else: | |
| entry.unlink() | |
| def _select_volatile_base(self, preferred: Path) -> Path: | |
| """picks a ram backed root or falls back to the system temp dir""" | |
| candidates: list[Path] = [preferred] | |
| if preferred != Path(DEFAULT_VOLATILE_ROOT): | |
| candidates.append(Path(DEFAULT_VOLATILE_ROOT)) | |
| candidates.append(Path(tempfile.gettempdir())) | |
| for candidate in candidates: | |
| try: | |
| candidate.mkdir(parents=True, exist_ok=True) | |
| probe = candidate / f".probe_{uuid.uuid4().hex}" | |
| probe.touch() | |
| probe.unlink() | |
| return candidate | |
| except OSError as exc: | |
| print(f"overlay volatile candidate rejected {candidate} {type(exc).__name__.lower()}") | |
| continue | |
| raise RuntimeError("no writable volatile root available") | |
| def cleanup(self) -> None: | |
| """unmounts if mounted and recursively deletes all overlay directories""" | |
| self.unmount() | |
| for d in [self._upperdir, self._workdir, self._merged]: | |
| if d is not None and d.exists(): | |
| shutil.rmtree(d, ignore_errors=True) | |
| if self._volatile_dir.exists(): | |
| shutil.rmtree(self._volatile_dir, ignore_errors=True) | |
| if self._owns_base_dir and self._base_dir.exists(): | |
| shutil.rmtree(self._base_dir, ignore_errors=True) | |
| self._lowerdir = None | |
| self._upperdir = None | |
| self._workdir = None | |
| self._merged = None | |
| self._mount_type = None | |
| print("overlay cleanup complete") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| return False | |