| import os |
| import shutil |
| import subprocess |
| import tempfile |
| import time |
| from pathlib import Path |
|
|
|
|
| class OverlayFSManager: |
| """manages overlayfs stacks for sub second filesystem state resets""" |
|
|
| def __init__(self, base_dir: str | None = None): |
| """ |
| base dir is the parent directory where overlay stack directories are created |
| if none a temporary directory is used |
| """ |
| if base_dir is not None: |
| self._base_dir = Path(base_dir) |
| self._base_dir.mkdir(parents=True, exist_ok=True) |
| self._owns_base_dir = False |
| else: |
| self._base_dir = Path(tempfile.mkdtemp(prefix="overlayfs_")) |
| self._owns_base_dir = True |
|
|
| self._lowerdir: Path | None = None |
| self._upperdir: Path | None = None |
| self._workdir: Path | None = None |
| self._merged: Path | None = None |
| self._mounted = False |
| self._mount_type: str | None = None |
|
|
| @property |
| def lowerdir(self) -> Path | None: |
| return self._lowerdir |
|
|
| @property |
| def upperdir(self) -> Path | None: |
| return self._upperdir |
|
|
| @property |
| def workdir(self) -> Path | None: |
| return self._workdir |
|
|
| @property |
| def merged(self) -> Path | None: |
| return self._merged |
|
|
| @property |
| def is_mounted(self) -> bool: |
| return self._mounted |
|
|
| @property |
| def mount_type(self) -> str | None: |
| return self._mount_type |
|
|
| def create_stack(self, lowerdir: str | Path) -> Path: |
| """ |
| creates the overlay directory stack given a lowerdir path |
| returns the path to the merged directory |
| """ |
| lowerdir = Path(lowerdir).resolve() |
| if not lowerdir.is_dir(): |
| raise FileNotFoundError(f"lowerdir does not exist {lowerdir}") |
|
|
| self._lowerdir = lowerdir |
| self._upperdir = self._base_dir / "upper" |
| self._workdir = self._base_dir / "work" |
| self._merged = self._base_dir / "merged" |
|
|
| self._upperdir.mkdir(exist_ok=True) |
| self._workdir.mkdir(exist_ok=True) |
| self._merged.mkdir(exist_ok=True) |
|
|
| print(f"overlay stack created at {self._base_dir}") |
| return self._merged |
|
|
| def mount(self) -> None: |
| """ |
| mounts the overlay filesystem trying kernel overlayfs first |
| then falling back to fuse overlayfs for unprivileged contexts |
| """ |
| if self._mounted: |
| raise RuntimeError("overlay already mounted") |
|
|
| if self._merged is None: |
| raise RuntimeError("create stack must be called before mount") |
|
|
| try: |
| print("overlay kernel mount start") |
| self._mount_kernel() |
| self._mount_type = "kernel" |
| print("overlay mounted via kernel overlayfs") |
| except (PermissionError, OSError, subprocess.CalledProcessError) as exc: |
| print(f"overlay kernel mount failed {type(exc).__name__.lower()}") |
| try: |
| print("overlay fuse mount start") |
| self._mount_fuse() |
| self._mount_type = "fuse" |
| print("overlay mounted via fuse overlayfs") |
| except (FileNotFoundError, OSError, subprocess.CalledProcessError) as fuse_exc: |
| print(f"overlay fuse mount failed {type(fuse_exc).__name__.lower()}") |
| self._mount_copy() |
| self._mount_type = "copy" |
| print("overlay mounted via copy fallback") |
|
|
| self._mounted = True |
|
|
| def _mount_copy(self) -> None: |
| if self._lowerdir is None or self._merged is None: |
| raise RuntimeError("copy fallback requires lowerdir and merged path") |
|
|
| self._clear_directory(self._merged) |
| shutil.copytree(self._lowerdir, self._merged, dirs_exist_ok=True, symlinks=True) |
|
|
| def _mount_kernel(self) -> None: |
| mount_opts = ( |
| f"lowerdir={self._lowerdir}," |
| f"upperdir={self._upperdir}," |
| f"workdir={self._workdir}" |
| ) |
| result = subprocess.run( |
| ["mount", "-t", "overlay", "overlay", "-o", mount_opts, str(self._merged)], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode != 0: |
| raise PermissionError(f"kernel mount failed {result.stderr.strip()}") |
|
|
| def _mount_fuse(self) -> None: |
| fuse_bin = shutil.which("fuse-overlayfs") |
| if fuse_bin is None: |
| raise FileNotFoundError("fuse-overlayfs binary not found in path") |
| print(f"overlay fuse binary {fuse_bin}") |
|
|
| mount_opts = ( |
| f"lowerdir={self._lowerdir}," |
| f"upperdir={self._upperdir}," |
| f"workdir={self._workdir}" |
| ) |
| result = subprocess.run( |
| [fuse_bin, "-o", mount_opts, str(self._merged)], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode != 0: |
| raise OSError(f"fuse overlayfs mount failed {result.stderr.strip()}") |
|
|
| def reset(self) -> float: |
| """ |
| resets the overlay by clearing upperdir contents and recreating workdir |
| returns the reset latency in milliseconds |
| """ |
| if not self._mounted: |
| raise RuntimeError("overlay is not mounted") |
|
|
| start = time.perf_counter() |
|
|
| mount_type = self._mount_type |
|
|
| if mount_type == "copy": |
| if self._merged is None: |
| raise RuntimeError("copy fallback merged path missing") |
| self._mount_copy() |
| self._mount_type = "copy" |
| else: |
| self.unmount() |
|
|
| for entry in self._upperdir.iterdir(): |
| if entry.is_dir(): |
| shutil.rmtree(entry) |
| else: |
| entry.unlink() |
|
|
| if self._workdir.exists(): |
| shutil.rmtree(self._workdir) |
| self._workdir.mkdir() |
|
|
| if self._merged is not None: |
| self._merged.mkdir(exist_ok=True) |
|
|
| if mount_type == "kernel": |
| self._mount_kernel() |
| self._mount_type = "kernel" |
| else: |
| self._mount_fuse() |
| self._mount_type = "fuse" |
|
|
| self._mounted = True |
|
|
| elapsed_ms = (time.perf_counter() - start) * 1000.0 |
|
|
| print(f"overlay reset {elapsed_ms:.1f}ms") |
| return elapsed_ms |
|
|
| def unmount(self) -> None: |
| """unmounts the overlay filesystem""" |
| if not self._mounted: |
| return |
|
|
| if self._mount_type == "copy": |
| self._mounted = False |
| self._mount_type = None |
| print("overlay unmounted") |
| return |
|
|
| if self._mount_type == "fuse": |
| result = subprocess.run( |
| ["fusermount", "-u", str(self._merged)], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode != 0: |
| subprocess.run( |
| ["fusermount3", "-u", str(self._merged)], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| else: |
| subprocess.run( |
| ["umount", str(self._merged)], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
|
|
| self._mounted = False |
| self._mount_type = None |
| print("overlay unmounted") |
|
|
| def _clear_directory(self, directory: Path) -> None: |
| directory.mkdir(parents=True, exist_ok=True) |
| for entry in directory.iterdir(): |
| if entry.is_dir() and not entry.is_symlink(): |
| shutil.rmtree(entry) |
| else: |
| entry.unlink() |
|
|
| def cleanup(self) -> None: |
| """unmounts if mounted and removes all overlay directories""" |
| self.unmount() |
|
|
| for d in [self._upperdir, self._workdir, self._merged]: |
| if d is not None and d.exists(): |
| shutil.rmtree(d, ignore_errors=True) |
|
|
| if self._owns_base_dir and self._base_dir.exists(): |
| shutil.rmtree(self._base_dir, ignore_errors=True) |
|
|
| self._lowerdir = None |
| self._upperdir = None |
| self._workdir = None |
| self._merged = None |
| self._mount_type = None |
| print("overlay cleanup complete") |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.cleanup() |
| return False |
|
|