File size: 8,210 Bytes
754d92a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import os
import sys
import time
from pathlib import Path

try:
    import psutil
except ImportError:
    psutil = None


def _parse_pss_from_smaps_rollup(pid: int) -> int | None:
    """
    Try to read PSS from /proc/<pid>/smaps_rollup.
    Returns PSS in bytes, or None if not available/readable.
    """
    try:
        smaps_rollup_path = Path(f"/proc/{pid}/smaps_rollup")
        with smaps_rollup_path.open() as f:
            for line in f:
                if line.startswith("Pss:"):
                    # Format: "Pss:            1234 kB"
                    parts = line.split()
                    if len(parts) >= 2:
                        pss_kb = int(parts[1])
                        return pss_kb * 1024  # Convert to bytes
        return None
    except (FileNotFoundError, PermissionError, ValueError, OSError):
        return None


def _parse_pss_from_smaps(pid: int) -> int | None:
    """
    Try to read PSS from /proc/<pid>/smaps and sum all Pss entries.
    Returns PSS in bytes, or None if not available/readable.
    """
    try:
        smaps_path = Path(f"/proc/{pid}/smaps")
        total_pss_kb = 0
        with smaps_path.open() as f:
            for line in f:
                if line.startswith("Pss:"):
                    # Format: "Pss:            1234 kB"
                    parts = line.split()
                    if len(parts) >= 2:
                        total_pss_kb += int(parts[1])
        if total_pss_kb > 0:
            return total_pss_kb * 1024  # Convert to bytes
        return None
    except (FileNotFoundError, PermissionError, ValueError, OSError):
        return None


def _get_pss_linux(pid: int) -> int | None:
    """
    Try to get PSS on Linux.
    Priority: smaps_rollup -> smaps -> None
    Returns PSS in bytes, or None if not available.
    """
    # Try smaps_rollup first (lightweight)
    pss = _parse_pss_from_smaps_rollup(pid)
    if pss is not None:
        return pss

    # Fallback to smaps (heavier)
    pss = _parse_pss_from_smaps(pid)
    if pss is not None:
        return pss

    return None


def _get_rss_psutil(pid: int) -> int | None:
    """
    Get RSS using psutil for a single process.
    Returns RSS in bytes, or None if psutil unavailable or process not found.
    """
    if psutil is None:
        return None

    try:
        process = psutil.Process(pid)
        return process.memory_info().rss
    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired):
        return None


def _get_single_process_memory(
    pid: int, prefer_pss: bool = True, use_smaps_rollup_only: bool = False
) -> int | None:
    """
    Get memory usage for a single process (no children).

    Args:
        pid: Process ID
        prefer_pss: If True and on Linux, try PSS first; otherwise use RSS
        use_smaps_rollup_only: If True, only try smaps_rollup (faster), fallback to RSS if not available

    Returns:
        Memory usage in bytes, or None if all methods fail
    """
    if sys.platform == "linux":
        if prefer_pss:
            if use_smaps_rollup_only:
                # Only try smaps_rollup, then fallback to RSS
                pss = _parse_pss_from_smaps_rollup(pid)
                if pss is not None:
                    return pss
            else:
                # Try full PSS (smaps_rollup -> smaps)
                pss = _get_pss_linux(pid)
                if pss is not None:
                    return pss

    # Fallback to RSS
    return _get_rss_psutil(pid)


def get_memory_usage_bytes(
    pid: int | None = None,
    include_children: bool = True,
    prefer_pss: bool = True,
) -> int:
    """
    Get memory usage of a process (and optionally its children).

    On Linux with prefer_pss=True:
      - Tries /proc/<pid>/smaps_rollup first (lightweight)
      - Falls back to /proc/<pid>/smaps if smaps_rollup unavailable (heavier)
      - Falls back to psutil RSS if smaps unavailable

    On non-Linux systems or prefer_pss=False:
      - Uses psutil RSS

    Args:
        pid: Process ID to monitor. If None, uses current process.
        include_children: If True, also includes memory of child processes.
        prefer_pss: If True on Linux, attempts to use PSS; otherwise uses RSS.

    Returns:
        Total memory usage in bytes (guaranteed non-negative).
    """
    if pid is None:
        pid = os.getpid()

    total_memory = 0

    # Determine if we're using smaps (heavier) vs smaps_rollup (lighter)
    use_smaps_rollup_only = False
    if sys.platform == "linux" and prefer_pss:
        # If we can read smaps_rollup, use rollup-only mode
        test_rollup = _parse_pss_from_smaps_rollup(pid)
        use_smaps_rollup_only = test_rollup is not None

    # Get current process memory
    memory = _get_single_process_memory(
        pid, prefer_pss=prefer_pss, use_smaps_rollup_only=use_smaps_rollup_only
    )
    if memory is not None:
        total_memory += memory

    # Get children memory if requested
    if include_children:
        if psutil is None:
            # Cannot get children without psutil
            return total_memory

        try:
            parent_process = psutil.Process(pid)
            children = parent_process.children(recursive=True)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            # Parent process not found or no permission
            return total_memory

        for child in children:
            try:
                child_pid = child.pid
                child_memory = _get_single_process_memory(
                    child_pid,
                    prefer_pss=prefer_pss,
                    use_smaps_rollup_only=use_smaps_rollup_only,
                )
                if child_memory is not None:
                    total_memory += child_memory
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                # Child process died or no permission; skip it
                pass

    return max(0, total_memory)


def get_memory_usage_with_throttle(
    pid: int | None = None,
    include_children: bool = True,
    prefer_pss: bool = True,
    last_pss_check_time: float | None = None,
    pss_throttle_seconds: float = 2.0,
) -> tuple[int, float | None]:
    """
    Get memory usage with throttling for PSS checks on Linux.

    When PSS is not available via smaps_rollup and must read smaps (expensive),
    this throttles checks to at most once per pss_throttle_seconds.

    Args:
        pid: Process ID. If None, uses current process.
        include_children: If True, includes child process memory.
        prefer_pss: If True on Linux, attempts to use PSS.
        last_pss_check_time: Timestamp of last PSS check. For throttling logic.
        pss_throttle_seconds: Minimum interval (seconds) between smaps reads.

    Returns:
        Tuple of (memory_bytes, new_check_time).
        If throttled, returns cached estimate (0) and original check time.
    """
    current_time = time.time()

    # Check if we should throttle
    if (
        prefer_pss
        and sys.platform == "linux"
        and last_pss_check_time is not None
        and (current_time - last_pss_check_time) < pss_throttle_seconds
    ):
        # Throttled: use RSS only as a fast estimate
        memory = 0
        pid_to_check = pid if pid is not None else os.getpid()
        rss = _get_rss_psutil(pid_to_check)
        if rss is not None:
            memory += rss

        if include_children and psutil is not None:
            try:
                parent_process = psutil.Process(pid_to_check)
                for child in parent_process.children(recursive=True):
                    try:
                        child_rss = _get_rss_psutil(child.pid)
                        if child_rss is not None:
                            memory += child_rss
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        pass
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass

        return memory, last_pss_check_time

    # Not throttled: do full check
    memory = get_memory_usage_bytes(
        pid=pid, include_children=include_children, prefer_pss=prefer_pss
    )
    return memory, current_time