Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Periodic memory reporting for the worker pool. | |
| Logs system-wide and per-process RSS once per hour so that operators can | |
| spot slow memory growth without attaching a profiler. | |
| """ | |
| import multiprocessing as mp | |
| import threading | |
| import time | |
| from typing import Protocol | |
| from log_utils import setup_logger | |
| logger = setup_logger("MemoryMonitor") | |
| _DAILY = 60 * 60 * 24 | |
| _HOURLY = 60 * 60 | |
| class _HasWorkerInfo(Protocol): | |
| """Minimal interface a worker object must expose for memory reporting.""" | |
| worker_id: int | |
| status: str | |
| process: mp.Process | |
| def start_memory_reporter( | |
| get_workers: callable, | |
| lock: threading.Condition, | |
| shutdown_flag: callable, | |
| interval_s: float = _DAILY, | |
| max_ram_gb: float = 0, | |
| ) -> threading.Thread: | |
| """Launch a daemon thread that logs memory usage periodically. | |
| Args: | |
| get_workers: Callable returning a list of worker-like objects | |
| (each must have ``worker_id``, ``status``, and ``process``). | |
| lock: Lock to hold while reading the worker list. | |
| shutdown_flag: Callable returning ``True`` when the pool is | |
| shutting down (stops the reporter loop). | |
| interval_s: Seconds between reports (default: daily). | |
| max_ram_gb: Cap on available RAM (GB) shown in reports. | |
| 0 means no cap. | |
| Returns: | |
| The started daemon thread. | |
| """ | |
| thread = threading.Thread( | |
| target=_reporter_loop, | |
| args=(get_workers, lock, shutdown_flag, interval_s, max_ram_gb), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| return thread | |
| def _reporter_loop( | |
| get_workers: callable, | |
| lock: threading.Condition, | |
| shutdown_flag: callable, | |
| interval_s: float, | |
| max_ram_gb: float, | |
| ) -> None: | |
| _POLL_S = 5.0 | |
| while not shutdown_flag(): | |
| elapsed = 0.0 | |
| while elapsed < interval_s and not shutdown_flag(): | |
| time.sleep(min(_POLL_S, interval_s - elapsed)) | |
| elapsed += _POLL_S | |
| if shutdown_flag(): | |
| break | |
| try: | |
| _log_report(get_workers, lock, interval_s, max_ram_gb) | |
| except Exception as exc: | |
| logger.debug(f"Memory report failed: {exc}") | |
| def _log_report( | |
| get_workers: callable, | |
| lock: threading.Condition, | |
| interval_s: float, | |
| max_ram_gb: float, | |
| ) -> None: | |
| """Collect and log memory stats for the main process and each worker.""" | |
| try: | |
| import psutil | |
| except ImportError: | |
| return | |
| vm = psutil.virtual_memory() | |
| main_proc = psutil.Process() | |
| main_rss = main_proc.memory_info().rss / (1024 * 1024) | |
| available_gb = vm.available / (1024**3) | |
| if max_ram_gb > 0: | |
| available_gb = min(available_gb, max_ram_gb) | |
| if interval_s == _DAILY: | |
| label = "Daily report" | |
| elif interval_s == _HOURLY: | |
| label = "Hourly report" | |
| elif interval_s >= _DAILY: | |
| label = f"Periodic report (every {interval_s / _DAILY:.0f}d)" | |
| elif interval_s >= _HOURLY: | |
| label = f"Periodic report (every {interval_s / _HOURLY:.0f}h)" | |
| elif interval_s >= 60: | |
| label = f"Periodic report (every {interval_s / 60:.0f}m)" | |
| else: | |
| label = f"Periodic report (every {interval_s:.0f}s)" | |
| lines = [ | |
| f"{label} — " | |
| f"system: {vm.used / (1024**3):.1f}/{vm.total / (1024**3):.1f} GB " | |
| f"({vm.percent}% used), available={available_gb:.1f} GB" | |
| f"{f' (capped from {vm.available / (1024**3):.1f} GB)' if max_ram_gb > 0 and vm.available / (1024**3) > max_ram_gb else ''}" | |
| ] | |
| lines.append(f" main process (pid={main_proc.pid}): RSS={main_rss:.0f} MB") | |
| worker_total = 0.0 | |
| with lock: | |
| workers = list(get_workers()) | |
| for w in workers: | |
| if w.status == "dead" or not w.process.is_alive(): | |
| lines.append(f" worker {w.worker_id} (pid={w.process.pid}): dead") | |
| continue | |
| try: | |
| w_rss = psutil.Process(w.process.pid).memory_info().rss / (1024 * 1024) | |
| worker_total += w_rss | |
| lines.append( | |
| f" worker {w.worker_id} (pid={w.process.pid}): " | |
| f"RSS={w_rss:.0f} MB, status={w.status}" | |
| ) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| lines.append(f" worker {w.worker_id} (pid={w.process.pid}): " f"not accessible") | |
| lines.append( | |
| f" total: main={main_rss:.0f} MB + " | |
| f"workers={worker_total:.0f} MB = {main_rss + worker_total:.0f} MB" | |
| ) | |
| logger.info("\n".join(lines)) | |