"""Periodic memory reporting for the worker pool. Logs system-wide and per-process RSS once per hour so that operators can spot slow memory growth without attaching a profiler. """ import multiprocessing as mp import threading import time from typing import Protocol from log_utils import setup_logger logger = setup_logger("MemoryMonitor") _DAILY = 60 * 60 * 24 _HOURLY = 60 * 60 class _HasWorkerInfo(Protocol): """Minimal interface a worker object must expose for memory reporting.""" worker_id: int status: str process: mp.Process def start_memory_reporter( get_workers: callable, lock: threading.Condition, shutdown_flag: callable, interval_s: float = _DAILY, max_ram_gb: float = 0, ) -> threading.Thread: """Launch a daemon thread that logs memory usage periodically. Args: get_workers: Callable returning a list of worker-like objects (each must have ``worker_id``, ``status``, and ``process``). lock: Lock to hold while reading the worker list. shutdown_flag: Callable returning ``True`` when the pool is shutting down (stops the reporter loop). interval_s: Seconds between reports (default: daily). max_ram_gb: Cap on available RAM (GB) shown in reports. 0 means no cap. Returns: The started daemon thread. """ thread = threading.Thread( target=_reporter_loop, args=(get_workers, lock, shutdown_flag, interval_s, max_ram_gb), daemon=True, ) thread.start() return thread def _reporter_loop( get_workers: callable, lock: threading.Condition, shutdown_flag: callable, interval_s: float, max_ram_gb: float, ) -> None: _POLL_S = 5.0 while not shutdown_flag(): elapsed = 0.0 while elapsed < interval_s and not shutdown_flag(): time.sleep(min(_POLL_S, interval_s - elapsed)) elapsed += _POLL_S if shutdown_flag(): break try: _log_report(get_workers, lock, interval_s, max_ram_gb) except Exception as exc: logger.debug(f"Memory report failed: {exc}") def _log_report( get_workers: callable, lock: threading.Condition, interval_s: float, max_ram_gb: float, ) -> None: """Collect and log memory stats for the main process and each worker.""" try: import psutil except ImportError: return vm = psutil.virtual_memory() main_proc = psutil.Process() main_rss = main_proc.memory_info().rss / (1024 * 1024) available_gb = vm.available / (1024**3) if max_ram_gb > 0: available_gb = min(available_gb, max_ram_gb) if interval_s == _DAILY: label = "Daily report" elif interval_s == _HOURLY: label = "Hourly report" elif interval_s >= _DAILY: label = f"Periodic report (every {interval_s / _DAILY:.0f}d)" elif interval_s >= _HOURLY: label = f"Periodic report (every {interval_s / _HOURLY:.0f}h)" elif interval_s >= 60: label = f"Periodic report (every {interval_s / 60:.0f}m)" else: label = f"Periodic report (every {interval_s:.0f}s)" lines = [ f"{label} — " f"system: {vm.used / (1024**3):.1f}/{vm.total / (1024**3):.1f} GB " f"({vm.percent}% used), available={available_gb:.1f} GB" f"{f' (capped from {vm.available / (1024**3):.1f} GB)' if max_ram_gb > 0 and vm.available / (1024**3) > max_ram_gb else ''}" ] lines.append(f" main process (pid={main_proc.pid}): RSS={main_rss:.0f} MB") worker_total = 0.0 with lock: workers = list(get_workers()) for w in workers: if w.status == "dead" or not w.process.is_alive(): lines.append(f" worker {w.worker_id} (pid={w.process.pid}): dead") continue try: w_rss = psutil.Process(w.process.pid).memory_info().rss / (1024 * 1024) worker_total += w_rss lines.append( f" worker {w.worker_id} (pid={w.process.pid}): " f"RSS={w_rss:.0f} MB, status={w.status}" ) except (psutil.NoSuchProcess, psutil.AccessDenied): lines.append(f" worker {w.worker_id} (pid={w.process.pid}): " f"not accessible") lines.append( f" total: main={main_rss:.0f} MB + " f"workers={worker_total:.0f} MB = {main_rss + worker_total:.0f} MB" ) logger.info("\n".join(lines))