"""Hardware specification extraction for roofline analysis. Extracts CPU, GPU, memory, and storage parameters via system tools and torch APIs. All functions have try/except fallbacks returning None for inaccessible fields. """ import logging import os import platform import re import subprocess from dataclasses import dataclass, field from functools import lru_cache from typing import Dict, List, Optional logger = logging.getLogger(__name__) # CUDA cores per SM by compute capability (major, minor) -> cores_per_sm # Kepler through Blackwell _CORES_PER_SM: Dict[tuple, int] = { (3, 0): 192, (3, 2): 192, (3, 5): 192, (3, 7): 192, # Kepler (5, 0): 128, (5, 2): 128, (5, 3): 128, # Maxwell (6, 0): 64, (6, 1): 128, (6, 2): 128, # Pascal (7, 0): 64, (7, 2): 64, (7, 5): 64, # Volta / Turing (8, 0): 64, (8, 6): 128, (8, 7): 128, (8, 9): 128, # Ampere / Ada (9, 0): 128, # Hopper (10, 0): 128, # Blackwell } # PCIe bandwidth (GB/s, unidirectional) by gen and width _PCIE_BW: Dict[int, float] = { 3: 0.985, # ~1 GB/s per lane 4: 1.969, 5: 3.938, 6: 7.563, } @dataclass class CPUInfo: model: Optional[str] = None physical_cores: Optional[int] = None logical_cores: Optional[int] = None frequency_mhz: Optional[float] = None cache_l2_kb: Optional[int] = None cache_l3_kb: Optional[int] = None architecture: Optional[str] = None @dataclass class MemoryInfo: total_gb: Optional[float] = None available_gb: Optional[float] = None estimated_bandwidth_gbps: Optional[float] = None @dataclass class GPUInfo: index: int = 0 name: Optional[str] = None sm_count: Optional[int] = None cuda_cores: Optional[int] = None clock_mhz: Optional[float] = None memory_clock_mhz: Optional[float] = None memory_bus_width_bits: Optional[int] = None vram_total_gb: Optional[float] = None vram_free_gb: Optional[float] = None memory_bandwidth_gbps: Optional[float] = None fp32_tflops: Optional[float] = None fp16_tflops: Optional[float] = None tensor_core_tflops: Optional[float] = None pcie_gen: Optional[int] = None pcie_width: Optional[int] = None pcie_bandwidth_gbps: Optional[float] = None compute_capability: Optional[str] = None driver_version: Optional[str] = None cuda_version: Optional[str] = None @dataclass class StorageInfo: storage_type: Optional[str] = None # "SSD" or "HDD" or "Unknown" sequential_read_mbps: Optional[float] = None @dataclass class HardwareInfo: cpu: CPUInfo = field(default_factory=CPUInfo) memory: MemoryInfo = field(default_factory=MemoryInfo) gpus: List[GPUInfo] = field(default_factory=list) storage: StorageInfo = field(default_factory=StorageInfo) system: Optional[str] = None python_version: Optional[str] = None torch_version: Optional[str] = None cuda_runtime_version: Optional[str] = None def _run_cmd(cmd: List[str], timeout: int = 10) -> Optional[str]: """Run a shell command and return stdout, or None on failure.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, ) if result.returncode == 0: return result.stdout.strip() except (subprocess.TimeoutExpired, FileNotFoundError, OSError): pass return None def _nvidia_smi_query(*fields: str) -> Optional[Dict[str, str]]: """Query nvidia-smi for given fields. Returns dict of field->value.""" field_str = ",".join(fields) out = _run_cmd([ "nvidia-smi", f"--query-gpu={field_str}", "--format=csv,noheader,nounits", ]) if not out: return None values = [v.strip() for v in out.split("\n")[0].split(",")] if len(values) != len(fields): return None return dict(zip(fields, values)) def get_cpu_info() -> CPUInfo: info = CPUInfo() try: info.architecture = platform.machine() info.logical_cores = os.cpu_count() try: import psutil info.physical_cores = psutil.cpu_count(logical=False) freq = psutil.cpu_freq() if freq: info.frequency_mhz = freq.current or freq.max except ImportError: pass system = platform.system() if system == "Linux": out = _run_cmd(["lscpu"]) if out: for line in out.split("\n"): if "Model name" in line: info.model = line.split(":", 1)[1].strip() elif "L2 cache" in line: val = line.split(":", 1)[1].strip() m = re.search(r"([\d.]+)", val) if m: kb = float(m.group(1)) if "MiB" in val or "M" in val: kb *= 1024 info.cache_l2_kb = int(kb) elif "L3 cache" in line: val = line.split(":", 1)[1].strip() m = re.search(r"([\d.]+)", val) if m: kb = float(m.group(1)) if "MiB" in val or "M" in val: kb *= 1024 info.cache_l3_kb = int(kb) elif system == "Darwin": brand = _run_cmd(["sysctl", "-n", "machdep.cpu.brand_string"]) if brand: info.model = brand l2 = _run_cmd(["sysctl", "-n", "hw.l2cachesize"]) if l2: try: info.cache_l2_kb = int(l2) // 1024 except ValueError: pass l3 = _run_cmd(["sysctl", "-n", "hw.l3cachesize"]) if l3: try: info.cache_l3_kb = int(l3) // 1024 except ValueError: pass except Exception: logger.debug("CPU info extraction partially failed", exc_info=True) return info def get_memory_info() -> MemoryInfo: info = MemoryInfo() try: try: import psutil vm = psutil.virtual_memory() info.total_gb = round(vm.total / (1024 ** 3), 2) info.available_gb = round(vm.available / (1024 ** 3), 2) except ImportError: # Fallback: /proc/meminfo on Linux if os.path.exists("/proc/meminfo"): with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal:"): kb = int(line.split()[1]) info.total_gb = round(kb / (1024 ** 2), 2) elif line.startswith("MemAvailable:"): kb = int(line.split()[1]) info.available_gb = round(kb / (1024 ** 2), 2) # Rough estimate: DDR4 ~40 GB/s, DDR5 ~60 GB/s # Without dmidecode we can't know for sure, default to DDR4 estimate if info.total_gb: info.estimated_bandwidth_gbps = 40.0 # conservative DDR4 dual-channel except Exception: logger.debug("Memory info extraction partially failed", exc_info=True) return info def get_gpu_info() -> List[GPUInfo]: gpus: List[GPUInfo] = [] try: import torch if not torch.cuda.is_available(): return gpus device_count = torch.cuda.device_count() # Get driver/cuda version from nvidia-smi driver_version = None smi_cuda_version = None nv = _nvidia_smi_query("driver_version") if nv: driver_version = nv.get("driver_version") # nvidia-smi reports the max supported CUDA runtime nv2 = _run_cmd(["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"]) smi_out = _run_cmd(["nvidia-smi"]) if smi_out: m = re.search(r"CUDA Version:\s+([\d.]+)", smi_out) if m: smi_cuda_version = m.group(1) for i in range(device_count): gpu = GPUInfo(index=i) props = torch.cuda.get_device_properties(i) gpu.name = props.name gpu.sm_count = props.multi_processor_count gpu.vram_total_gb = round(props.total_mem / (1024 ** 3), 2) cc = (props.major, props.minor) gpu.compute_capability = f"{props.major}.{props.minor}" gpu.driver_version = driver_version gpu.cuda_version = smi_cuda_version # CUDA cores cores_per_sm = _CORES_PER_SM.get(cc) if cores_per_sm and gpu.sm_count: gpu.cuda_cores = gpu.sm_count * cores_per_sm # nvidia-smi per-GPU queries nv_data = _run_cmd([ "nvidia-smi", f"--id={i}", "--query-gpu=clocks.max.graphics,clocks.max.memory,memory.bus_width,pcie.link.gen.current,pcie.link.width.current,memory.free", "--format=csv,noheader,nounits", ]) if nv_data: parts = [p.strip() for p in nv_data.split(",")] if len(parts) >= 6: try: gpu.clock_mhz = float(parts[0]) except (ValueError, TypeError): pass try: gpu.memory_clock_mhz = float(parts[1]) except (ValueError, TypeError): pass try: gpu.memory_bus_width_bits = int(parts[2]) except (ValueError, TypeError): pass try: gpu.pcie_gen = int(parts[3]) except (ValueError, TypeError): pass try: gpu.pcie_width = int(parts[4]) except (ValueError, TypeError): pass try: gpu.vram_free_gb = round(float(parts[5]) / 1024, 2) except (ValueError, TypeError): pass # Derived: memory bandwidth # GDDR: bandwidth = mem_clock * bus_width * 2 (DDR) / 8 (bits->bytes) / 1000 (MHz->GHz) # HBM: bandwidth = mem_clock * bus_width * 2 / 8 / 1000 if gpu.memory_clock_mhz and gpu.memory_bus_width_bits: gpu.memory_bandwidth_gbps = round( gpu.memory_clock_mhz * gpu.memory_bus_width_bits * 2 / 8 / 1000, 1 ) # Derived: FP32 TFLOPS = cuda_cores * clock_mhz * 2 (FMA) / 1e6 if gpu.cuda_cores and gpu.clock_mhz: gpu.fp32_tflops = round(gpu.cuda_cores * gpu.clock_mhz * 2 / 1e6, 2) # FP16 is typically 2x FP32 on Volta+ if props.major >= 7: gpu.fp16_tflops = round(gpu.fp32_tflops * 2, 2) else: gpu.fp16_tflops = gpu.fp32_tflops # Tensor core TFLOPS (rough: 8x FP32 on Ampere+, 4x on Volta/Turing) if gpu.fp32_tflops: if props.major >= 8: gpu.tensor_core_tflops = round(gpu.fp32_tflops * 8, 2) elif props.major >= 7: gpu.tensor_core_tflops = round(gpu.fp32_tflops * 4, 2) # Derived: PCIe bandwidth if gpu.pcie_gen and gpu.pcie_width: per_lane = _PCIE_BW.get(gpu.pcie_gen, 0) gpu.pcie_bandwidth_gbps = round(per_lane * gpu.pcie_width, 2) gpus.append(gpu) except Exception: logger.debug("GPU info extraction partially failed", exc_info=True) return gpus def get_storage_info() -> StorageInfo: info = StorageInfo() try: system = platform.system() if system == "Linux": # Check if root device is rotational out = _run_cmd(["lsblk", "-d", "-o", "NAME,ROTA", "--noheadings"]) if out: for line in out.strip().split("\n"): parts = line.split() if len(parts) == 2: info.storage_type = "HDD" if parts[1] == "1" else "SSD" break # Quick sequential read test with dd (1GB) dd_out = _run_cmd( ["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"], timeout=15, ) # dd prints throughput to stderr, but _run_cmd only captures stdout # Try a different approach try: result = subprocess.run( ["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"], capture_output=True, text=True, timeout=15, ) stderr = result.stderr m = re.search(r"([\d.]+)\s*(GB|MB)/s", stderr) if m: speed = float(m.group(1)) if m.group(2) == "GB": speed *= 1000 info.sequential_read_mbps = round(speed, 0) except Exception: pass elif system == "Darwin": info.storage_type = "SSD" # Modern Macs use NVMe SSDs except Exception: logger.debug("Storage info extraction partially failed", exc_info=True) return info @lru_cache(maxsize=1) def get_hardware_info() -> HardwareInfo: """Aggregate all hardware info (cached).""" import torch hw = HardwareInfo() hw.cpu = get_cpu_info() hw.memory = get_memory_info() hw.gpus = get_gpu_info() hw.storage = get_storage_info() hw.system = f"{platform.system()} {platform.release()}" hw.python_version = platform.python_version() hw.torch_version = torch.__version__ hw.cuda_runtime_version = ( torch.version.cuda if torch.cuda.is_available() else None ) return hw