Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| try: | |
| import psutil | |
| except ImportError: | |
| psutil = None | |
| def _parse_pss_from_smaps_rollup(pid: int) -> int | None: | |
| """ | |
| Try to read PSS from /proc/<pid>/smaps_rollup. | |
| Returns PSS in bytes, or None if not available/readable. | |
| """ | |
| try: | |
| smaps_rollup_path = Path(f"/proc/{pid}/smaps_rollup") | |
| with smaps_rollup_path.open() as f: | |
| for line in f: | |
| if line.startswith("Pss:"): | |
| # Format: "Pss: 1234 kB" | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| pss_kb = int(parts[1]) | |
| return pss_kb * 1024 # Convert to bytes | |
| return None | |
| except (FileNotFoundError, PermissionError, ValueError, OSError): | |
| return None | |
| def _parse_pss_from_smaps(pid: int) -> int | None: | |
| """ | |
| Try to read PSS from /proc/<pid>/smaps and sum all Pss entries. | |
| Returns PSS in bytes, or None if not available/readable. | |
| """ | |
| try: | |
| smaps_path = Path(f"/proc/{pid}/smaps") | |
| total_pss_kb = 0 | |
| with smaps_path.open() as f: | |
| for line in f: | |
| if line.startswith("Pss:"): | |
| # Format: "Pss: 1234 kB" | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| total_pss_kb += int(parts[1]) | |
| if total_pss_kb > 0: | |
| return total_pss_kb * 1024 # Convert to bytes | |
| return None | |
| except (FileNotFoundError, PermissionError, ValueError, OSError): | |
| return None | |
| def _get_pss_linux(pid: int) -> int | None: | |
| """ | |
| Try to get PSS on Linux. | |
| Priority: smaps_rollup -> smaps -> None | |
| Returns PSS in bytes, or None if not available. | |
| """ | |
| # Try smaps_rollup first (lightweight) | |
| pss = _parse_pss_from_smaps_rollup(pid) | |
| if pss is not None: | |
| return pss | |
| # Fallback to smaps (heavier) | |
| pss = _parse_pss_from_smaps(pid) | |
| if pss is not None: | |
| return pss | |
| return None | |
| def _get_rss_psutil(pid: int) -> int | None: | |
| """ | |
| Get RSS using psutil for a single process. | |
| Returns RSS in bytes, or None if psutil unavailable or process not found. | |
| """ | |
| if psutil is None: | |
| return None | |
| try: | |
| process = psutil.Process(pid) | |
| return process.memory_info().rss | |
| except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired): | |
| return None | |
| def _get_single_process_memory( | |
| pid: int, prefer_pss: bool = True, use_smaps_rollup_only: bool = False | |
| ) -> int | None: | |
| """ | |
| Get memory usage for a single process (no children). | |
| Args: | |
| pid: Process ID | |
| prefer_pss: If True and on Linux, try PSS first; otherwise use RSS | |
| use_smaps_rollup_only: If True, only try smaps_rollup (faster), fallback to RSS if not available | |
| Returns: | |
| Memory usage in bytes, or None if all methods fail | |
| """ | |
| if sys.platform == "linux": | |
| if prefer_pss: | |
| if use_smaps_rollup_only: | |
| # Only try smaps_rollup, then fallback to RSS | |
| pss = _parse_pss_from_smaps_rollup(pid) | |
| if pss is not None: | |
| return pss | |
| else: | |
| # Try full PSS (smaps_rollup -> smaps) | |
| pss = _get_pss_linux(pid) | |
| if pss is not None: | |
| return pss | |
| # Fallback to RSS | |
| return _get_rss_psutil(pid) | |
| def get_memory_usage_bytes( | |
| pid: int | None = None, | |
| include_children: bool = True, | |
| prefer_pss: bool = True, | |
| ) -> int: | |
| """ | |
| Get memory usage of a process (and optionally its children). | |
| On Linux with prefer_pss=True: | |
| - Tries /proc/<pid>/smaps_rollup first (lightweight) | |
| - Falls back to /proc/<pid>/smaps if smaps_rollup unavailable (heavier) | |
| - Falls back to psutil RSS if smaps unavailable | |
| On non-Linux systems or prefer_pss=False: | |
| - Uses psutil RSS | |
| Args: | |
| pid: Process ID to monitor. If None, uses current process. | |
| include_children: If True, also includes memory of child processes. | |
| prefer_pss: If True on Linux, attempts to use PSS; otherwise uses RSS. | |
| Returns: | |
| Total memory usage in bytes (guaranteed non-negative). | |
| """ | |
| if pid is None: | |
| pid = os.getpid() | |
| total_memory = 0 | |
| # Determine if we're using smaps (heavier) vs smaps_rollup (lighter) | |
| use_smaps_rollup_only = False | |
| if sys.platform == "linux" and prefer_pss: | |
| # If we can read smaps_rollup, use rollup-only mode | |
| test_rollup = _parse_pss_from_smaps_rollup(pid) | |
| use_smaps_rollup_only = test_rollup is not None | |
| # Get current process memory | |
| memory = _get_single_process_memory( | |
| pid, prefer_pss=prefer_pss, use_smaps_rollup_only=use_smaps_rollup_only | |
| ) | |
| if memory is not None: | |
| total_memory += memory | |
| # Get children memory if requested | |
| if include_children: | |
| if psutil is None: | |
| # Cannot get children without psutil | |
| return total_memory | |
| try: | |
| parent_process = psutil.Process(pid) | |
| children = parent_process.children(recursive=True) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| # Parent process not found or no permission | |
| return total_memory | |
| for child in children: | |
| try: | |
| child_pid = child.pid | |
| child_memory = _get_single_process_memory( | |
| child_pid, | |
| prefer_pss=prefer_pss, | |
| use_smaps_rollup_only=use_smaps_rollup_only, | |
| ) | |
| if child_memory is not None: | |
| total_memory += child_memory | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| # Child process died or no permission; skip it | |
| pass | |
| return max(0, total_memory) | |
| def get_memory_usage_with_throttle( | |
| pid: int | None = None, | |
| include_children: bool = True, | |
| prefer_pss: bool = True, | |
| last_pss_check_time: float | None = None, | |
| pss_throttle_seconds: float = 2.0, | |
| ) -> tuple[int, float | None]: | |
| """ | |
| Get memory usage with throttling for PSS checks on Linux. | |
| When PSS is not available via smaps_rollup and must read smaps (expensive), | |
| this throttles checks to at most once per pss_throttle_seconds. | |
| Args: | |
| pid: Process ID. If None, uses current process. | |
| include_children: If True, includes child process memory. | |
| prefer_pss: If True on Linux, attempts to use PSS. | |
| last_pss_check_time: Timestamp of last PSS check. For throttling logic. | |
| pss_throttle_seconds: Minimum interval (seconds) between smaps reads. | |
| Returns: | |
| Tuple of (memory_bytes, new_check_time). | |
| If throttled, returns cached estimate (0) and original check time. | |
| """ | |
| current_time = time.time() | |
| # Check if we should throttle | |
| if ( | |
| prefer_pss | |
| and sys.platform == "linux" | |
| and last_pss_check_time is not None | |
| and (current_time - last_pss_check_time) < pss_throttle_seconds | |
| ): | |
| # Throttled: use RSS only as a fast estimate | |
| memory = 0 | |
| pid_to_check = pid if pid is not None else os.getpid() | |
| rss = _get_rss_psutil(pid_to_check) | |
| if rss is not None: | |
| memory += rss | |
| if include_children and psutil is not None: | |
| try: | |
| parent_process = psutil.Process(pid_to_check) | |
| for child in parent_process.children(recursive=True): | |
| try: | |
| child_rss = _get_rss_psutil(child.pid) | |
| if child_rss is not None: | |
| memory += child_rss | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| return memory, last_pss_check_time | |
| # Not throttled: do full check | |
| memory = get_memory_usage_bytes( | |
| pid=pid, include_children=include_children, prefer_pss=prefer_pss | |
| ) | |
| return memory, current_time | |