Spaces:
Paused
Paused
| """Hardware specification extraction for roofline analysis. | |
| Extracts CPU, GPU, memory, and storage parameters via system tools | |
| and torch APIs. All functions have try/except fallbacks returning None | |
| for inaccessible fields. | |
| """ | |
| import logging | |
| import os | |
| import platform | |
| import re | |
| import subprocess | |
| from dataclasses import dataclass, field | |
| from functools import lru_cache | |
| from typing import Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| # CUDA cores per SM by compute capability (major, minor) -> cores_per_sm | |
| # Kepler through Blackwell | |
| _CORES_PER_SM: Dict[tuple, int] = { | |
| (3, 0): 192, (3, 2): 192, (3, 5): 192, (3, 7): 192, # Kepler | |
| (5, 0): 128, (5, 2): 128, (5, 3): 128, # Maxwell | |
| (6, 0): 64, (6, 1): 128, (6, 2): 128, # Pascal | |
| (7, 0): 64, (7, 2): 64, (7, 5): 64, # Volta / Turing | |
| (8, 0): 64, (8, 6): 128, (8, 7): 128, (8, 9): 128, # Ampere / Ada | |
| (9, 0): 128, # Hopper | |
| (10, 0): 128, # Blackwell | |
| } | |
| # PCIe bandwidth (GB/s, unidirectional) by gen and width | |
| _PCIE_BW: Dict[int, float] = { | |
| 3: 0.985, # ~1 GB/s per lane | |
| 4: 1.969, | |
| 5: 3.938, | |
| 6: 7.563, | |
| } | |
| class CPUInfo: | |
| model: Optional[str] = None | |
| physical_cores: Optional[int] = None | |
| logical_cores: Optional[int] = None | |
| frequency_mhz: Optional[float] = None | |
| cache_l2_kb: Optional[int] = None | |
| cache_l3_kb: Optional[int] = None | |
| architecture: Optional[str] = None | |
| class MemoryInfo: | |
| total_gb: Optional[float] = None | |
| available_gb: Optional[float] = None | |
| estimated_bandwidth_gbps: Optional[float] = None | |
| class GPUInfo: | |
| index: int = 0 | |
| name: Optional[str] = None | |
| sm_count: Optional[int] = None | |
| cuda_cores: Optional[int] = None | |
| clock_mhz: Optional[float] = None | |
| memory_clock_mhz: Optional[float] = None | |
| memory_bus_width_bits: Optional[int] = None | |
| vram_total_gb: Optional[float] = None | |
| vram_free_gb: Optional[float] = None | |
| memory_bandwidth_gbps: Optional[float] = None | |
| fp32_tflops: Optional[float] = None | |
| fp16_tflops: Optional[float] = None | |
| tensor_core_tflops: Optional[float] = None | |
| pcie_gen: Optional[int] = None | |
| pcie_width: Optional[int] = None | |
| pcie_bandwidth_gbps: Optional[float] = None | |
| compute_capability: Optional[str] = None | |
| driver_version: Optional[str] = None | |
| cuda_version: Optional[str] = None | |
| class StorageInfo: | |
| storage_type: Optional[str] = None # "SSD" or "HDD" or "Unknown" | |
| sequential_read_mbps: Optional[float] = None | |
| class HardwareInfo: | |
| cpu: CPUInfo = field(default_factory=CPUInfo) | |
| memory: MemoryInfo = field(default_factory=MemoryInfo) | |
| gpus: List[GPUInfo] = field(default_factory=list) | |
| storage: StorageInfo = field(default_factory=StorageInfo) | |
| system: Optional[str] = None | |
| python_version: Optional[str] = None | |
| torch_version: Optional[str] = None | |
| cuda_runtime_version: Optional[str] = None | |
| def _run_cmd(cmd: List[str], timeout: int = 10) -> Optional[str]: | |
| """Run a shell command and return stdout, or None on failure.""" | |
| try: | |
| result = subprocess.run( | |
| cmd, capture_output=True, text=True, timeout=timeout, | |
| ) | |
| if result.returncode == 0: | |
| return result.stdout.strip() | |
| except (subprocess.TimeoutExpired, FileNotFoundError, OSError): | |
| pass | |
| return None | |
| def _nvidia_smi_query(*fields: str) -> Optional[Dict[str, str]]: | |
| """Query nvidia-smi for given fields. Returns dict of field->value.""" | |
| field_str = ",".join(fields) | |
| out = _run_cmd([ | |
| "nvidia-smi", | |
| f"--query-gpu={field_str}", | |
| "--format=csv,noheader,nounits", | |
| ]) | |
| if not out: | |
| return None | |
| values = [v.strip() for v in out.split("\n")[0].split(",")] | |
| if len(values) != len(fields): | |
| return None | |
| return dict(zip(fields, values)) | |
| def get_cpu_info() -> CPUInfo: | |
| info = CPUInfo() | |
| try: | |
| info.architecture = platform.machine() | |
| info.logical_cores = os.cpu_count() | |
| try: | |
| import psutil | |
| info.physical_cores = psutil.cpu_count(logical=False) | |
| freq = psutil.cpu_freq() | |
| if freq: | |
| info.frequency_mhz = freq.current or freq.max | |
| except ImportError: | |
| pass | |
| system = platform.system() | |
| if system == "Linux": | |
| out = _run_cmd(["lscpu"]) | |
| if out: | |
| for line in out.split("\n"): | |
| if "Model name" in line: | |
| info.model = line.split(":", 1)[1].strip() | |
| elif "L2 cache" in line: | |
| val = line.split(":", 1)[1].strip() | |
| m = re.search(r"([\d.]+)", val) | |
| if m: | |
| kb = float(m.group(1)) | |
| if "MiB" in val or "M" in val: | |
| kb *= 1024 | |
| info.cache_l2_kb = int(kb) | |
| elif "L3 cache" in line: | |
| val = line.split(":", 1)[1].strip() | |
| m = re.search(r"([\d.]+)", val) | |
| if m: | |
| kb = float(m.group(1)) | |
| if "MiB" in val or "M" in val: | |
| kb *= 1024 | |
| info.cache_l3_kb = int(kb) | |
| elif system == "Darwin": | |
| brand = _run_cmd(["sysctl", "-n", "machdep.cpu.brand_string"]) | |
| if brand: | |
| info.model = brand | |
| l2 = _run_cmd(["sysctl", "-n", "hw.l2cachesize"]) | |
| if l2: | |
| try: | |
| info.cache_l2_kb = int(l2) // 1024 | |
| except ValueError: | |
| pass | |
| l3 = _run_cmd(["sysctl", "-n", "hw.l3cachesize"]) | |
| if l3: | |
| try: | |
| info.cache_l3_kb = int(l3) // 1024 | |
| except ValueError: | |
| pass | |
| except Exception: | |
| logger.debug("CPU info extraction partially failed", exc_info=True) | |
| return info | |
| def get_memory_info() -> MemoryInfo: | |
| info = MemoryInfo() | |
| try: | |
| try: | |
| import psutil | |
| vm = psutil.virtual_memory() | |
| info.total_gb = round(vm.total / (1024 ** 3), 2) | |
| info.available_gb = round(vm.available / (1024 ** 3), 2) | |
| except ImportError: | |
| # Fallback: /proc/meminfo on Linux | |
| if os.path.exists("/proc/meminfo"): | |
| with open("/proc/meminfo") as f: | |
| for line in f: | |
| if line.startswith("MemTotal:"): | |
| kb = int(line.split()[1]) | |
| info.total_gb = round(kb / (1024 ** 2), 2) | |
| elif line.startswith("MemAvailable:"): | |
| kb = int(line.split()[1]) | |
| info.available_gb = round(kb / (1024 ** 2), 2) | |
| # Rough estimate: DDR4 ~40 GB/s, DDR5 ~60 GB/s | |
| # Without dmidecode we can't know for sure, default to DDR4 estimate | |
| if info.total_gb: | |
| info.estimated_bandwidth_gbps = 40.0 # conservative DDR4 dual-channel | |
| except Exception: | |
| logger.debug("Memory info extraction partially failed", exc_info=True) | |
| return info | |
| def get_gpu_info() -> List[GPUInfo]: | |
| gpus: List[GPUInfo] = [] | |
| try: | |
| import torch | |
| if not torch.cuda.is_available(): | |
| return gpus | |
| device_count = torch.cuda.device_count() | |
| # Get driver/cuda version from nvidia-smi | |
| driver_version = None | |
| smi_cuda_version = None | |
| nv = _nvidia_smi_query("driver_version") | |
| if nv: | |
| driver_version = nv.get("driver_version") | |
| # nvidia-smi reports the max supported CUDA runtime | |
| nv2 = _run_cmd(["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"]) | |
| smi_out = _run_cmd(["nvidia-smi"]) | |
| if smi_out: | |
| m = re.search(r"CUDA Version:\s+([\d.]+)", smi_out) | |
| if m: | |
| smi_cuda_version = m.group(1) | |
| for i in range(device_count): | |
| gpu = GPUInfo(index=i) | |
| props = torch.cuda.get_device_properties(i) | |
| gpu.name = props.name | |
| gpu.sm_count = props.multi_processor_count | |
| gpu.vram_total_gb = round(props.total_mem / (1024 ** 3), 2) | |
| cc = (props.major, props.minor) | |
| gpu.compute_capability = f"{props.major}.{props.minor}" | |
| gpu.driver_version = driver_version | |
| gpu.cuda_version = smi_cuda_version | |
| # CUDA cores | |
| cores_per_sm = _CORES_PER_SM.get(cc) | |
| if cores_per_sm and gpu.sm_count: | |
| gpu.cuda_cores = gpu.sm_count * cores_per_sm | |
| # nvidia-smi per-GPU queries | |
| nv_data = _run_cmd([ | |
| "nvidia-smi", | |
| f"--id={i}", | |
| "--query-gpu=clocks.max.graphics,clocks.max.memory,memory.bus_width,pcie.link.gen.current,pcie.link.width.current,memory.free", | |
| "--format=csv,noheader,nounits", | |
| ]) | |
| if nv_data: | |
| parts = [p.strip() for p in nv_data.split(",")] | |
| if len(parts) >= 6: | |
| try: | |
| gpu.clock_mhz = float(parts[0]) | |
| except (ValueError, TypeError): | |
| pass | |
| try: | |
| gpu.memory_clock_mhz = float(parts[1]) | |
| except (ValueError, TypeError): | |
| pass | |
| try: | |
| gpu.memory_bus_width_bits = int(parts[2]) | |
| except (ValueError, TypeError): | |
| pass | |
| try: | |
| gpu.pcie_gen = int(parts[3]) | |
| except (ValueError, TypeError): | |
| pass | |
| try: | |
| gpu.pcie_width = int(parts[4]) | |
| except (ValueError, TypeError): | |
| pass | |
| try: | |
| gpu.vram_free_gb = round(float(parts[5]) / 1024, 2) | |
| except (ValueError, TypeError): | |
| pass | |
| # Derived: memory bandwidth | |
| # GDDR: bandwidth = mem_clock * bus_width * 2 (DDR) / 8 (bits->bytes) / 1000 (MHz->GHz) | |
| # HBM: bandwidth = mem_clock * bus_width * 2 / 8 / 1000 | |
| if gpu.memory_clock_mhz and gpu.memory_bus_width_bits: | |
| gpu.memory_bandwidth_gbps = round( | |
| gpu.memory_clock_mhz * gpu.memory_bus_width_bits * 2 / 8 / 1000, 1 | |
| ) | |
| # Derived: FP32 TFLOPS = cuda_cores * clock_mhz * 2 (FMA) / 1e6 | |
| if gpu.cuda_cores and gpu.clock_mhz: | |
| gpu.fp32_tflops = round(gpu.cuda_cores * gpu.clock_mhz * 2 / 1e6, 2) | |
| # FP16 is typically 2x FP32 on Volta+ | |
| if props.major >= 7: | |
| gpu.fp16_tflops = round(gpu.fp32_tflops * 2, 2) | |
| else: | |
| gpu.fp16_tflops = gpu.fp32_tflops | |
| # Tensor core TFLOPS (rough: 8x FP32 on Ampere+, 4x on Volta/Turing) | |
| if gpu.fp32_tflops: | |
| if props.major >= 8: | |
| gpu.tensor_core_tflops = round(gpu.fp32_tflops * 8, 2) | |
| elif props.major >= 7: | |
| gpu.tensor_core_tflops = round(gpu.fp32_tflops * 4, 2) | |
| # Derived: PCIe bandwidth | |
| if gpu.pcie_gen and gpu.pcie_width: | |
| per_lane = _PCIE_BW.get(gpu.pcie_gen, 0) | |
| gpu.pcie_bandwidth_gbps = round(per_lane * gpu.pcie_width, 2) | |
| gpus.append(gpu) | |
| except Exception: | |
| logger.debug("GPU info extraction partially failed", exc_info=True) | |
| return gpus | |
| def get_storage_info() -> StorageInfo: | |
| info = StorageInfo() | |
| try: | |
| system = platform.system() | |
| if system == "Linux": | |
| # Check if root device is rotational | |
| out = _run_cmd(["lsblk", "-d", "-o", "NAME,ROTA", "--noheadings"]) | |
| if out: | |
| for line in out.strip().split("\n"): | |
| parts = line.split() | |
| if len(parts) == 2: | |
| info.storage_type = "HDD" if parts[1] == "1" else "SSD" | |
| break | |
| # Quick sequential read test with dd (1GB) | |
| dd_out = _run_cmd( | |
| ["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"], | |
| timeout=15, | |
| ) | |
| # dd prints throughput to stderr, but _run_cmd only captures stdout | |
| # Try a different approach | |
| try: | |
| result = subprocess.run( | |
| ["dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=256"], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| stderr = result.stderr | |
| m = re.search(r"([\d.]+)\s*(GB|MB)/s", stderr) | |
| if m: | |
| speed = float(m.group(1)) | |
| if m.group(2) == "GB": | |
| speed *= 1000 | |
| info.sequential_read_mbps = round(speed, 0) | |
| except Exception: | |
| pass | |
| elif system == "Darwin": | |
| info.storage_type = "SSD" # Modern Macs use NVMe SSDs | |
| except Exception: | |
| logger.debug("Storage info extraction partially failed", exc_info=True) | |
| return info | |
| def get_hardware_info() -> HardwareInfo: | |
| """Aggregate all hardware info (cached).""" | |
| import torch | |
| hw = HardwareInfo() | |
| hw.cpu = get_cpu_info() | |
| hw.memory = get_memory_info() | |
| hw.gpus = get_gpu_info() | |
| hw.storage = get_storage_info() | |
| hw.system = f"{platform.system()} {platform.release()}" | |
| hw.python_version = platform.python_version() | |
| hw.torch_version = torch.__version__ | |
| hw.cuda_runtime_version = ( | |
| torch.version.cuda if torch.cuda.is_available() else None | |
| ) | |
| return hw | |