beaupreda's picture
Upload sensAI-Generic-Object-Detection with upload_repo.py
13170f7 verified
Raw
History Blame Contribute Delete
4.48 kB
"""Periodic memory reporting for the worker pool.
Logs system-wide and per-process RSS once per hour so that operators can
spot slow memory growth without attaching a profiler.
"""
import multiprocessing as mp
import threading
import time
from typing import Protocol
from log_utils import setup_logger
logger = setup_logger("MemoryMonitor")
_DAILY = 60 * 60 * 24
_HOURLY = 60 * 60
class _HasWorkerInfo(Protocol):
"""Minimal interface a worker object must expose for memory reporting."""
worker_id: int
status: str
process: mp.Process
def start_memory_reporter(
get_workers: callable,
lock: threading.Condition,
shutdown_flag: callable,
interval_s: float = _DAILY,
max_ram_gb: float = 0,
) -> threading.Thread:
"""Launch a daemon thread that logs memory usage periodically.
Args:
get_workers: Callable returning a list of worker-like objects
(each must have ``worker_id``, ``status``, and ``process``).
lock: Lock to hold while reading the worker list.
shutdown_flag: Callable returning ``True`` when the pool is
shutting down (stops the reporter loop).
interval_s: Seconds between reports (default: daily).
max_ram_gb: Cap on available RAM (GB) shown in reports.
0 means no cap.
Returns:
The started daemon thread.
"""
thread = threading.Thread(
target=_reporter_loop,
args=(get_workers, lock, shutdown_flag, interval_s, max_ram_gb),
daemon=True,
)
thread.start()
return thread
def _reporter_loop(
get_workers: callable,
lock: threading.Condition,
shutdown_flag: callable,
interval_s: float,
max_ram_gb: float,
) -> None:
_POLL_S = 5.0
while not shutdown_flag():
elapsed = 0.0
while elapsed < interval_s and not shutdown_flag():
time.sleep(min(_POLL_S, interval_s - elapsed))
elapsed += _POLL_S
if shutdown_flag():
break
try:
_log_report(get_workers, lock, interval_s, max_ram_gb)
except Exception as exc:
logger.debug(f"Memory report failed: {exc}")
def _log_report(
get_workers: callable,
lock: threading.Condition,
interval_s: float,
max_ram_gb: float,
) -> None:
"""Collect and log memory stats for the main process and each worker."""
try:
import psutil
except ImportError:
return
vm = psutil.virtual_memory()
main_proc = psutil.Process()
main_rss = main_proc.memory_info().rss / (1024 * 1024)
available_gb = vm.available / (1024**3)
if max_ram_gb > 0:
available_gb = min(available_gb, max_ram_gb)
if interval_s == _DAILY:
label = "Daily report"
elif interval_s == _HOURLY:
label = "Hourly report"
elif interval_s >= _DAILY:
label = f"Periodic report (every {interval_s / _DAILY:.0f}d)"
elif interval_s >= _HOURLY:
label = f"Periodic report (every {interval_s / _HOURLY:.0f}h)"
elif interval_s >= 60:
label = f"Periodic report (every {interval_s / 60:.0f}m)"
else:
label = f"Periodic report (every {interval_s:.0f}s)"
lines = [
f"{label} — "
f"system: {vm.used / (1024**3):.1f}/{vm.total / (1024**3):.1f} GB "
f"({vm.percent}% used), available={available_gb:.1f} GB"
f"{f' (capped from {vm.available / (1024**3):.1f} GB)' if max_ram_gb > 0 and vm.available / (1024**3) > max_ram_gb else ''}"
]
lines.append(f" main process (pid={main_proc.pid}): RSS={main_rss:.0f} MB")
worker_total = 0.0
with lock:
workers = list(get_workers())
for w in workers:
if w.status == "dead" or not w.process.is_alive():
lines.append(f" worker {w.worker_id} (pid={w.process.pid}): dead")
continue
try:
w_rss = psutil.Process(w.process.pid).memory_info().rss / (1024 * 1024)
worker_total += w_rss
lines.append(
f" worker {w.worker_id} (pid={w.process.pid}): "
f"RSS={w_rss:.0f} MB, status={w.status}"
)
except (psutil.NoSuchProcess, psutil.AccessDenied):
lines.append(f" worker {w.worker_id} (pid={w.process.pid}): " f"not accessible")
lines.append(
f" total: main={main_rss:.0f} MB + "
f"workers={worker_total:.0f} MB = {main_rss + worker_total:.0f} MB"
)
logger.info("\n".join(lines))