detection_base / utils /hardware_info.py
Zhen Ye
feat: add benchmark profiler & roofline analysis system
078b447
"""Hardware specification extraction for roofline analysis.
Extracts CPU, GPU, memory, and storage parameters via system tools
and torch APIs. All functions have try/except fallbacks returning None
for inaccessible fields.
"""
import logging
import os
import platform
import re
import subprocess
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
# CUDA cores per SM by compute capability (major, minor) -> cores_per_sm
# Kepler through Blackwell
_CORES_PER_SM: Dict[tuple, int] = {
(3, 0): 192, (3, 2): 192, (3, 5): 192, (3, 7): 192, # Kepler
(5, 0): 128, (5, 2): 128, (5, 3): 128, # Maxwell
(6, 0): 64, (6, 1): 128, (6, 2): 128, # Pascal
(7, 0): 64, (7, 2): 64, (7, 5): 64, # Volta / Turing
(8, 0): 64, (8, 6): 128, (8, 7): 128, (8, 9): 128, # Ampere / Ada
(9, 0): 128, # Hopper
(10, 0): 128, # Blackwell
}
# PCIe bandwidth (GB/s, unidirectional) by gen and width
_PCIE_BW: Dict[int, float] = {
3: 0.985, # ~1 GB/s per lane
4: 1.969,
5: 3.938,
6: 7.563,
}
@dataclass
class CPUInfo:
model: Optional[str] = None
physical_cores: Optional[int] = None
logical_cores: Optional[int] = None
frequency_mhz: Optional[float] = None
cache_l2_kb: Optional[int] = None
cache_l3_kb: Optional[int] = None
architecture: Optional[str] = None
@dataclass
class MemoryInfo:
total_gb: Optional[float] = None
available_gb: Optional[float] = None
estimated_bandwidth_gbps: Optional[float] = None
@dataclass
class GPUInfo:
index: int = 0
name: Optional[str] = None
sm_count: Optional[int] = None
cuda_cores: Optional[int] = None
clock_mhz: Optional[float] = None
memory_clock_mhz: Optional[float] = None
memory_bus_width_bits: Optional[int] = None
vram_total_gb: Optional[float] = None
vram_free_gb: Optional[float] = None
memory_bandwidth_gbps: Optional[float] = None
fp32_tflops: Optional[float] = None
fp16_tflops: Optional[float] = None
tensor_core_tflops: Optional[float] = None
pcie_gen: Optional[int] = None
pcie_width: Optional[int] = None
pcie_bandwidth_gbps: Optional[float] = None
compute_capability: Optional[str] = None
driver_version: Optional[str] = None
cuda_version: Optional[str] = None
@dataclass
class StorageInfo:
storage_type: Optional[str] = None # "SSD" or "HDD" or "Unknown"
sequential_read_mbps: Optional[float] = None
@dataclass
class HardwareInfo:
cpu: CPUInfo = field(default_factory=CPUInfo)
memory: MemoryInfo = field(default_factory=MemoryInfo)
gpus: List[GPUInfo] = field(default_factory=list)
storage: StorageInfo = field(default_factory=StorageInfo)
system: Optional[str] = None
python_version: Optional[str] = None
torch_version: Optional[str] = None
cuda_runtime_version: Optional[str] = None
def _run_cmd(cmd: List[str], timeout: int = 10) -> Optional[str]:
"""Run a shell command and return stdout, or None on failure."""
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout,
)
if result.returncode == 0:
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
pass
return None
def _nvidia_smi_query(*fields: str) -> Optional[Dict[str, str]]:
"""Query nvidia-smi for given fields. Returns dict of field->value."""
field_str = ",".join(fields)
out = _run_cmd([
"nvidia-smi",
f"--query-gpu={field_str}",
"--format=csv,noheader,nounits",
])
if not out:
return None
values = [v.strip() for v in out.split("\n")[0].split(",")]
if len(values) != len(fields):
return None
return dict(zip(fields, values))
def get_cpu_info() -> CPUInfo:
info = CPUInfo()
try:
info.architecture = platform.machine()
info.logical_cores = os.cpu_count()
try:
import psutil
info.physical_cores = psutil.cpu_count(logical=False)
freq = psutil.cpu_freq()
if freq:
info.frequency_mhz = freq.current or freq.max
except ImportError:
pass
system = platform.system()
if system == "Linux":
out = _run_cmd(["lscpu"])
if out:
for line in out.split("\n"):
if "Model name" in line:
info.model = line.split(":", 1)[1].strip()
elif "L2 cache" in line:
val = line.split(":", 1)[1].strip()
m = re.search(r"([\d.]+)", val)
if m:
kb = float(m.group(1))
if "MiB" in val or "M" in val:
kb *= 1024
info.cache_l2_kb = int(kb)
elif "L3 cache" in line:
val = line.split(":", 1)[1].strip()
m = re.search(r"([\d.]+)", val)
if m:
kb = float(m.group(1))
if "MiB" in val or "M" in val:
kb *= 1024
info.cache_l3_kb = int(kb)
elif system == "Darwin":
brand = _run_cmd(["sysctl", "-n", "machdep.cpu.brand_string"])
if brand:
info.model = brand
l2 = _run_cmd(["sysctl", "-n", "hw.l2cachesize"])
if l2:
try:
info.cache_l2_kb = int(l2) // 1024
except ValueError:
pass
l3 = _run_cmd(["sysctl", "-n", "hw.l3cachesize"])
if l3:
try:
info.cache_l3_kb = int(l3) // 1024
except ValueError:
pass
except Exception:
logger.debug("CPU info extraction partially failed", exc_info=True)
return info
def get_memory_info() -> MemoryInfo:
info = MemoryInfo()
try:
try:
import psutil
vm = psutil.virtual_memory()
info.total_gb = round(vm.total / (1024 ** 3), 2)
info.available_gb = round(vm.available / (1024 ** 3), 2)
except ImportError:
# Fallback: /proc/meminfo on Linux
if os.path.exists("/proc/meminfo"):
with open("/proc/meminfo") as f:
for line in f:
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
info.total_gb = round(kb / (1024 ** 2), 2)
elif line.startswith("MemAvailable:"):
kb = int(line.split()[1])
info.available_gb = round(kb / (1024 ** 2), 2)
# Rough estimate: DDR4 ~40 GB/s, DDR5 ~60 GB/s
# Without dmidecode we can't know for sure, default to DDR4 estimate
if info.total_gb:
info.estimated_bandwidth_gbps = 40.0 # conservative DDR4 dual-channel
except Exception:
logger.debug("Memory info extraction partially failed", exc_info=True)
return info
def get_gpu_info() -> List[GPUInfo]:
gpus: List[GPUInfo] = []
try:
import torch
if not torch.cuda.is_available():
return gpus
device_count = torch.cuda.device_count()
# Get driver/cuda version from nvidia-smi
driver_version = None
smi_cuda_version = None
nv = _nvidia_smi_query("driver_version")
if nv:
driver_version = nv.get("driver_version")
# nvidia-smi reports the max supported CUDA runtime
nv2 = _run_cmd(["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"])
smi_out = _run_cmd(["nvidia-smi"])
if smi_out:
m = re.search(r"CUDA Version:\s+([\d.]+)", smi_out)
if m:
smi_cuda_version = m.group(1)
for i in range(device_count):
gpu = GPUInfo(index=i)
props = torch.cuda.get_device_properties(i)
gpu.name = props.name
gpu.sm_count = props.multi_processor_count
gpu.vram_total_gb = round(props.total_mem / (1024 ** 3), 2)
cc = (props.major, props.minor)
gpu.compute_capability = f"{props.major}.{props.minor}"
gpu.driver_version = driver_version
gpu.cuda_version = smi_cuda_version
# CUDA cores
cores_per_sm = _CORES_PER_SM.get(cc)
if cores_per_sm and gpu.sm_count:
gpu.cuda_cores = gpu.sm_count * cores_per_sm
# nvidia-smi per-GPU queries
nv_data = _run_cmd([
"nvidia-smi",
f"--id={i}",
"--query-gpu=clocks.max.graphics,clocks.max.memory,memory.bus_width,pcie.link.gen.current,pcie.link.width.current,memory.free",
"--format=csv,noheader,nounits",
])
if nv_data:
parts = [p.strip() for p in nv_data.split(",")]
if len(parts) >= 6:
try:
gpu.clock_mhz = float(parts[0])
except (ValueError, TypeError):
pass
try:
gpu.memory_clock_mhz = float(parts[1])
except (ValueError, TypeError):
pass
try:
gpu.memory_bus_width_bits = int(parts[2])
except (ValueError, TypeError):
pass
try:
gpu.pcie_gen = int(parts[3])
except (ValueError, TypeError):
pass
try:
gpu.pcie_width = int(parts[4])
except (ValueError, TypeError):
pass
try:
gpu.vram_free_gb = round(float(parts[5]) / 1024, 2)
except (ValueError, TypeError):
pass
# Derived: memory bandwidth
# GDDR: bandwidth = mem_clock * bus_width * 2 (DDR) / 8 (bits->bytes) / 1000 (MHz->GHz)
# HBM: bandwidth = mem_clock * bus_width * 2 / 8 / 1000
if gpu.memory_clock_mhz and gpu.memory_bus_width_bits:
gpu.memory_bandwidth_gbps = round(
gpu.memory_clock_mhz * gpu.memory_bus_width_bits * 2 / 8 / 1000, 1
)
# Derived: FP32 TFLOPS = cuda_cores * clock_mhz * 2 (FMA) / 1e6
if gpu.cuda_cores and gpu.clock_mhz:
gpu.fp32_tflops = round(gpu.cuda_cores * gpu.clock_mhz * 2 / 1e6, 2)
# FP16 is typically 2x FP32 on Volta+
if props.major >= 7:
gpu.fp16_tflops = round(gpu.fp32_tflops * 2, 2)
else:
gpu.fp16_tflops = gpu.fp32_tflops
# Tensor core TFLOPS (rough: 8x FP32 on Ampere+, 4x on Volta/Turing)
if gpu.fp32_tflops:
if props.major >= 8:
gpu.tensor_core_tflops = round(gpu.fp32_tflops * 8, 2)
elif props.major >= 7:
gpu.tensor_core_tflops = round(gpu.fp32_tflops * 4, 2)
# Derived: PCIe bandwidth
if gpu.pcie_gen and gpu.pcie_width:
per_lane = _PCIE_BW.get(gpu.pcie_gen, 0)
gpu.pcie_bandwidth_gbps = round(per_lane * gpu.pcie_width, 2)
gpus.append(gpu)
except Exception:
logger.debug("GPU info extraction partially failed", exc_info=True)
return gpus
def get_storage_info() -> StorageInfo:
info = StorageInfo()
try:
system = platform.system()
if system == "Linux":
# Check if root device is rotational
out = _run_cmd(["lsblk", "-d", "-o", "NAME,ROTA", "--noheadings"])
if out:
for line in out.strip().split("\n"):
parts = line.split()
if len(parts) == 2:
info.storage_type = "HDD" if parts[1] == "1" else "SSD"
break
# Quick sequential read test with dd (1GB)
dd_out = _run_cmd(
["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"],
timeout=15,
)
# dd prints throughput to stderr, but _run_cmd only captures stdout
# Try a different approach
try:
result = subprocess.run(
["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"],
capture_output=True, text=True, timeout=15,
)
stderr = result.stderr
m = re.search(r"([\d.]+)\s*(GB|MB)/s", stderr)
if m:
speed = float(m.group(1))
if m.group(2) == "GB":
speed *= 1000
info.sequential_read_mbps = round(speed, 0)
except Exception:
pass
elif system == "Darwin":
info.storage_type = "SSD" # Modern Macs use NVMe SSDs
except Exception:
logger.debug("Storage info extraction partially failed", exc_info=True)
return info
@lru_cache(maxsize=1)
def get_hardware_info() -> HardwareInfo:
"""Aggregate all hardware info (cached)."""
import torch
hw = HardwareInfo()
hw.cpu = get_cpu_info()
hw.memory = get_memory_info()
hw.gpus = get_gpu_info()
hw.storage = get_storage_info()
hw.system = f"{platform.system()} {platform.release()}"
hw.python_version = platform.python_version()
hw.torch_version = torch.__version__
hw.cuda_runtime_version = (
torch.version.cuda if torch.cuda.is_available() else None
)
return hw