import os import sys import time from pathlib import Path try: import psutil except ImportError: psutil = None def _parse_pss_from_smaps_rollup(pid: int) -> int | None: """ Try to read PSS from /proc//smaps_rollup. Returns PSS in bytes, or None if not available/readable. """ try: smaps_rollup_path = Path(f"/proc/{pid}/smaps_rollup") with smaps_rollup_path.open() as f: for line in f: if line.startswith("Pss:"): # Format: "Pss: 1234 kB" parts = line.split() if len(parts) >= 2: pss_kb = int(parts[1]) return pss_kb * 1024 # Convert to bytes return None except (FileNotFoundError, PermissionError, ValueError, OSError): return None def _parse_pss_from_smaps(pid: int) -> int | None: """ Try to read PSS from /proc//smaps and sum all Pss entries. Returns PSS in bytes, or None if not available/readable. """ try: smaps_path = Path(f"/proc/{pid}/smaps") total_pss_kb = 0 with smaps_path.open() as f: for line in f: if line.startswith("Pss:"): # Format: "Pss: 1234 kB" parts = line.split() if len(parts) >= 2: total_pss_kb += int(parts[1]) if total_pss_kb > 0: return total_pss_kb * 1024 # Convert to bytes return None except (FileNotFoundError, PermissionError, ValueError, OSError): return None def _get_pss_linux(pid: int) -> int | None: """ Try to get PSS on Linux. Priority: smaps_rollup -> smaps -> None Returns PSS in bytes, or None if not available. """ # Try smaps_rollup first (lightweight) pss = _parse_pss_from_smaps_rollup(pid) if pss is not None: return pss # Fallback to smaps (heavier) pss = _parse_pss_from_smaps(pid) if pss is not None: return pss return None def _get_rss_psutil(pid: int) -> int | None: """ Get RSS using psutil for a single process. Returns RSS in bytes, or None if psutil unavailable or process not found. """ if psutil is None: return None try: process = psutil.Process(pid) return process.memory_info().rss except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired): return None def _get_single_process_memory( pid: int, prefer_pss: bool = True, use_smaps_rollup_only: bool = False ) -> int | None: """ Get memory usage for a single process (no children). Args: pid: Process ID prefer_pss: If True and on Linux, try PSS first; otherwise use RSS use_smaps_rollup_only: If True, only try smaps_rollup (faster), fallback to RSS if not available Returns: Memory usage in bytes, or None if all methods fail """ if sys.platform == "linux": if prefer_pss: if use_smaps_rollup_only: # Only try smaps_rollup, then fallback to RSS pss = _parse_pss_from_smaps_rollup(pid) if pss is not None: return pss else: # Try full PSS (smaps_rollup -> smaps) pss = _get_pss_linux(pid) if pss is not None: return pss # Fallback to RSS return _get_rss_psutil(pid) def get_memory_usage_bytes( pid: int | None = None, include_children: bool = True, prefer_pss: bool = True, ) -> int: """ Get memory usage of a process (and optionally its children). On Linux with prefer_pss=True: - Tries /proc//smaps_rollup first (lightweight) - Falls back to /proc//smaps if smaps_rollup unavailable (heavier) - Falls back to psutil RSS if smaps unavailable On non-Linux systems or prefer_pss=False: - Uses psutil RSS Args: pid: Process ID to monitor. If None, uses current process. include_children: If True, also includes memory of child processes. prefer_pss: If True on Linux, attempts to use PSS; otherwise uses RSS. Returns: Total memory usage in bytes (guaranteed non-negative). """ if pid is None: pid = os.getpid() total_memory = 0 # Determine if we're using smaps (heavier) vs smaps_rollup (lighter) use_smaps_rollup_only = False if sys.platform == "linux" and prefer_pss: # If we can read smaps_rollup, use rollup-only mode test_rollup = _parse_pss_from_smaps_rollup(pid) use_smaps_rollup_only = test_rollup is not None # Get current process memory memory = _get_single_process_memory( pid, prefer_pss=prefer_pss, use_smaps_rollup_only=use_smaps_rollup_only ) if memory is not None: total_memory += memory # Get children memory if requested if include_children: if psutil is None: # Cannot get children without psutil return total_memory try: parent_process = psutil.Process(pid) children = parent_process.children(recursive=True) except (psutil.NoSuchProcess, psutil.AccessDenied): # Parent process not found or no permission return total_memory for child in children: try: child_pid = child.pid child_memory = _get_single_process_memory( child_pid, prefer_pss=prefer_pss, use_smaps_rollup_only=use_smaps_rollup_only, ) if child_memory is not None: total_memory += child_memory except (psutil.NoSuchProcess, psutil.AccessDenied): # Child process died or no permission; skip it pass return max(0, total_memory) def get_memory_usage_with_throttle( pid: int | None = None, include_children: bool = True, prefer_pss: bool = True, last_pss_check_time: float | None = None, pss_throttle_seconds: float = 2.0, ) -> tuple[int, float | None]: """ Get memory usage with throttling for PSS checks on Linux. When PSS is not available via smaps_rollup and must read smaps (expensive), this throttles checks to at most once per pss_throttle_seconds. Args: pid: Process ID. If None, uses current process. include_children: If True, includes child process memory. prefer_pss: If True on Linux, attempts to use PSS. last_pss_check_time: Timestamp of last PSS check. For throttling logic. pss_throttle_seconds: Minimum interval (seconds) between smaps reads. Returns: Tuple of (memory_bytes, new_check_time). If throttled, returns cached estimate (0) and original check time. """ current_time = time.time() # Check if we should throttle if ( prefer_pss and sys.platform == "linux" and last_pss_check_time is not None and (current_time - last_pss_check_time) < pss_throttle_seconds ): # Throttled: use RSS only as a fast estimate memory = 0 pid_to_check = pid if pid is not None else os.getpid() rss = _get_rss_psutil(pid_to_check) if rss is not None: memory += rss if include_children and psutil is not None: try: parent_process = psutil.Process(pid_to_check) for child in parent_process.children(recursive=True): try: child_rss = _get_rss_psutil(child.pid) if child_rss is not None: memory += child_rss except (psutil.NoSuchProcess, psutil.AccessDenied): pass except (psutil.NoSuchProcess, psutil.AccessDenied): pass return memory, last_pss_check_time # Not throttled: do full check memory = get_memory_usage_bytes( pid=pid, include_children=include_children, prefer_pss=prefer_pss ) return memory, current_time