import importlib import threading import time import warnings from typing import TYPE_CHECKING, Any from trackio import context_vars if TYPE_CHECKING: from trackio.run import Run psutil: Any = None PSUTIL_AVAILABLE = False _psutil_lock = threading.Lock() def _ensure_psutil(): global PSUTIL_AVAILABLE, psutil if PSUTIL_AVAILABLE: return psutil with _psutil_lock: if PSUTIL_AVAILABLE: return psutil try: psutil = importlib.import_module("psutil") PSUTIL_AVAILABLE = True return psutil except ImportError: raise ImportError( "psutil is required for CPU and RAM monitoring. " "Install it with: pip install psutil" ) def cpu_available() -> bool: """ Check if CPU and RAM monitoring is available. Returns True if psutil is installed. """ try: _ensure_psutil() return True except ImportError: return False except Exception: return False def collect_cpu_metrics( prev_disk_counters: Any = None, prev_net_counters: Any = None, elapsed: float | None = None, include_static: bool = True, ) -> dict: """ Collect CPU, RAM, disk, network, and sensor metrics using psutil. Args: prev_disk_counters: Previous disk I/O counters for computing read/write rates. If None, only cumulative values are reported. prev_net_counters: Previous network I/O counters for computing send/recv rates. If None, only cumulative values are reported. elapsed: Seconds since prev_disk_counters and prev_net_counters were captured. If None or non-positive, cumulative values are reported instead of rates. include_static: Whether to include mostly static metrics such as CPU frequency and core counts. Returns: Dictionary of system metrics. """ if not PSUTIL_AVAILABLE: try: _ensure_psutil() except ImportError: return {} metrics = {} try: per_core = psutil.cpu_percent(interval=0.1, percpu=True) for i, pct in enumerate(per_core): metrics[f"cpu/{i}/utilization"] = pct if per_core: metrics["cpu/utilization"] = sum(per_core) / len(per_core) except Exception: pass if include_static: try: cpu_freq = psutil.cpu_freq() if cpu_freq: metrics["cpu/frequency"] = cpu_freq.current if cpu_freq.max > 0: metrics["cpu/frequency_max"] = cpu_freq.max except Exception: pass try: cpu_count_logical = psutil.cpu_count(logical=True) if cpu_count_logical is not None: metrics["cpu/count_logical"] = cpu_count_logical cpu_count_physical = psutil.cpu_count(logical=False) if cpu_count_physical is not None: metrics["cpu/count_physical"] = cpu_count_physical except Exception: pass try: mem = psutil.virtual_memory() metrics["memory/used"] = mem.used / (1024**3) metrics["memory/total"] = mem.total / (1024**3) metrics["memory/available"] = mem.available / (1024**3) metrics["memory/percent"] = mem.percent except Exception: pass try: swap = psutil.swap_memory() metrics["swap/used"] = swap.used / (1024**3) metrics["swap/total"] = swap.total / (1024**3) metrics["swap/percent"] = swap.percent except Exception: pass try: disk = psutil.disk_io_counters() if disk is not None: if prev_disk_counters is not None and elapsed and elapsed > 0: metrics["disk/read_mb_per_sec"] = ( (disk.read_bytes - prev_disk_counters.read_bytes) / elapsed / (1024**2) ) metrics["disk/write_mb_per_sec"] = ( (disk.write_bytes - prev_disk_counters.write_bytes) / elapsed / (1024**2) ) metrics["disk/read_iops"] = ( disk.read_count - prev_disk_counters.read_count ) / elapsed metrics["disk/write_iops"] = ( disk.write_count - prev_disk_counters.write_count ) / elapsed else: metrics["disk/read_bytes"] = disk.read_bytes metrics["disk/write_bytes"] = disk.write_bytes except Exception: pass try: net = psutil.net_io_counters() if net is not None: if prev_net_counters is not None and elapsed and elapsed > 0: metrics["network/sent_mb_per_sec"] = ( (net.bytes_sent - prev_net_counters.bytes_sent) / elapsed / (1024**2) ) metrics["network/recv_mb_per_sec"] = ( (net.bytes_recv - prev_net_counters.bytes_recv) / elapsed / (1024**2) ) else: metrics["network/sent_bytes"] = net.bytes_sent metrics["network/recv_bytes"] = net.bytes_recv except Exception: pass try: sensors = psutil.sensors_temperatures() if sensors: for chip_name, entries in sensors.items(): for i, entry in enumerate(entries): label = ( entry.label.strip() if entry.label and entry.label.strip() else f"{chip_name}_{i}" ) metrics[f"temp/{label}"] = entry.current except Exception: pass try: battery = psutil.sensors_battery() if battery is not None: metrics["battery/percent"] = battery.percent metrics["battery/power_plugged"] = int(battery.power_plugged) except Exception: pass return metrics class CpuMonitor: def __init__(self, run: "Run", interval: float = 10.0): self._run = run self._interval = interval self._stop_flag = threading.Event() self._thread: "threading.Thread | None" = None self._last_disk_counters: Any = None self._last_net_counters: Any = None self._last_time: float | None = None self._include_static_next = True def start(self): if not PSUTIL_AVAILABLE: try: _ensure_psutil() except ImportError: warnings.warn( "auto_log_cpu=True but psutil is not installed. " "CPU and RAM logging disabled. Install with: pip install psutil" ) return self._thread = threading.Thread(target=self._monitor_loop, daemon=True) self._thread.start() def stop(self): self._stop_flag.set() if self._thread is not None: self._thread.join(timeout=2.0) def _monitor_loop(self): try: self._last_disk_counters = psutil.disk_io_counters() except Exception: pass try: self._last_net_counters = psutil.net_io_counters() except Exception: pass self._last_time = time.monotonic() while not self._stop_flag.is_set(): self._stop_flag.wait(timeout=self._interval) if self._stop_flag.is_set(): break try: now = time.monotonic() elapsed = now - self._last_time if self._last_time is not None else None metrics = collect_cpu_metrics( prev_disk_counters=self._last_disk_counters, prev_net_counters=self._last_net_counters, elapsed=elapsed, include_static=self._include_static_next, ) self._include_static_next = False try: self._last_disk_counters = psutil.disk_io_counters() except Exception: self._last_disk_counters = None try: self._last_net_counters = psutil.net_io_counters() except Exception: self._last_net_counters = None self._last_time = now if metrics: self._run.log_system(metrics) except Exception: pass def log_cpu(run: "Run | None" = None) -> dict: """ Log CPU, RAM, disk, network, and sensor metrics to the current or specified run as system metrics. Args: run: Optional Run instance. If None, uses current run from context. Returns: dict: The system metrics that were logged. Example: ```python import trackio run = trackio.init(project="my-project") trackio.log({"loss": 0.5}) trackio.log_cpu() ``` """ if run is None: run = context_vars.current_run.get() if run is None: raise RuntimeError("Call trackio.init() before trackio.log_cpu().") try: _ensure_psutil() except ImportError: warnings.warn( "trackio.log_cpu() requires psutil. Install it with: pip install trackio[cpu]" ) return {} metrics = collect_cpu_metrics() if metrics: run.log_system(metrics) return metrics