| |
| """ |
| GPU Detection and Monitoring Module |
| |
| Handles GPU detection, hardware monitoring, and data collection for AMD GPUs. |
| Supports multiple GPU models and provides a unified interface for monitoring. |
| """ |
|
|
| import os |
| import glob |
| import time |
| import logging |
| import json |
| import sqlite3 |
| from typing import Dict, List, Optional, Tuple, Any |
| from dataclasses import dataclass, asdict |
| from pathlib import Path |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class GPUInfo: |
| """GPU information data structure.""" |
| name: str |
| vendor: str |
| device_id: str |
| bus_id: str |
| hwmon_path: str |
| temp_sensor: str |
| fan_control: str |
| power_sensor: str |
| memory_total: int = 0 |
| memory_used: int = 0 |
|
|
|
|
| @dataclass |
| class GPUStatus: |
| """Current GPU status data structure.""" |
| timestamp: float |
| temperature: float |
| load: float |
| fan_speed: int |
| fan_pwm: int |
| power_draw: float |
| memory_used: int |
| memory_total: int |
| core_clock: int |
| memory_clock: int |
| voltage: float = 0.0 |
| efficiency: float = 0.0 |
|
|
|
|
| class GPUHardwareDetector: |
| """Detects and identifies GPU hardware.""" |
| |
| def __init__(self): |
| self.gpus = [] |
| self.detected_gpus = [] |
| |
| def detect_amd_gpus(self) -> List[GPUInfo]: |
| """Detect AMD GPUs in the system.""" |
| logger.info("Detecting AMD GPUs...") |
| |
| gpus = [] |
| |
| |
| for card_path in glob.glob("/sys/class/drm/card*"): |
| try: |
| |
| device_path = os.path.join(card_path, "device") |
| vendor_file = os.path.join(device_path, "vendor") |
| |
| if not os.path.exists(vendor_file): |
| continue |
| |
| with open(vendor_file, 'r') as f: |
| vendor_id = f.read().strip() |
| |
| if vendor_id != "0x1002": |
| continue |
| |
| |
| device_id_file = os.path.join(device_path, "device") |
| with open(device_id_file, 'r') as f: |
| device_id = f.read().strip() |
| |
| |
| gpu_name = self._get_gpu_name(device_id) |
| |
| |
| hwmon_path = self._find_hwmon_path(device_path) |
| |
| if hwmon_path: |
| gpu_info = GPUInfo( |
| name=gpu_name, |
| vendor="AMD", |
| device_id=device_id, |
| bus_id=os.path.basename(card_path), |
| hwmon_path=hwmon_path, |
| temp_sensor=os.path.join(hwmon_path, "temp1_input"), |
| fan_control=os.path.join(hwmon_path, "pwm1"), |
| power_sensor=os.path.join(hwmon_path, "power1_input") |
| ) |
| gpus.append(gpu_info) |
| logger.info(f"Found AMD GPU: {gpu_name} ({device_id})") |
| |
| except Exception as e: |
| logger.warning(f"Error detecting GPU at {card_path}: {e}") |
| continue |
| |
| self.detected_gpus = gpus |
| return gpus |
| |
| def _get_gpu_name(self, device_id: str) -> str: |
| """Get human-readable GPU name from device ID.""" |
| gpu_names = { |
| "0x73bf": "Radeon Pro VII", |
| "0x73ff": "Radeon Pro VII", |
| "0x7310": "Radeon Pro VII", |
| "0x7340": "Radeon Pro VII", |
| "0x73a0": "Radeon Pro VII", |
| "0x73b0": "Radeon Pro VII", |
| "0x73c0": "Radeon Pro VII", |
| "0x73d0": "Radeon Pro VII", |
| "0x73e0": "Radeon Pro VII", |
| "0x73f0": "Radeon Pro VII", |
| |
| } |
| |
| return gpu_names.get(device_id, f"AMD GPU {device_id}") |
| |
| def _find_hwmon_path(self, device_path: str) -> Optional[str]: |
| """Find the hwmon path for a GPU device.""" |
| hwmon_base = os.path.join(device_path, "hwmon") |
| |
| if not os.path.exists(hwmon_base): |
| return None |
| |
| try: |
| hwmons = os.listdir(hwmon_base) |
| if hwmons: |
| return os.path.join(hwmon_base, hwmons[0]) |
| except Exception as e: |
| logger.warning(f"Error finding hwmon for {device_path}: {e}") |
| |
| return None |
|
|
|
|
| class GPUDataCollector: |
| """Collects GPU monitoring data.""" |
| |
| def __init__(self, gpu_info: GPUInfo): |
| self.gpu_info = gpu_info |
| self.last_status = None |
| |
| def read_temperature(self) -> Optional[float]: |
| """Read GPU temperature in Celsius.""" |
| try: |
| if os.path.exists(self.gpu_info.temp_sensor): |
| with open(self.gpu_info.temp_sensor, 'r') as f: |
| temp_millic = int(f.read().strip()) |
| return temp_millic / 1000.0 |
| except Exception as e: |
| logger.debug(f"Error reading temperature: {e}") |
| return None |
| |
| def read_fan_speed(self) -> Optional[int]: |
| """Read fan speed in RPM.""" |
| fan_speed_file = self.gpu_info.fan_control.replace("pwm1", "fan1_input") |
| try: |
| if os.path.exists(fan_speed_file): |
| with open(fan_speed_file, 'r') as f: |
| return int(f.read().strip()) |
| except Exception as e: |
| logger.debug(f"Error reading fan speed: {e}") |
| return None |
| |
| def read_fan_pwm(self) -> Optional[int]: |
| """Read fan PWM value (0-255).""" |
| try: |
| if os.path.exists(self.gpu_info.fan_control): |
| with open(self.gpu_info.fan_control, 'r') as f: |
| return int(f.read().strip()) |
| except Exception as e: |
| logger.debug(f"Error reading fan PWM: {e}") |
| return None |
| |
| def read_power_draw(self) -> Optional[float]: |
| """Read power draw in watts.""" |
| try: |
| if os.path.exists(self.gpu_info.power_sensor): |
| with open(self.gpu_info.power_sensor, 'r') as f: |
| power_microw = int(f.read().strip()) |
| return power_microw / 1000000.0 |
| except Exception as e: |
| logger.debug(f"Error reading power: {e}") |
| return None |
| |
| def read_memory_info(self) -> Tuple[Optional[int], Optional[int]]: |
| """Read VRAM usage in MB.""" |
| try: |
| device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path)) |
| vram_used_file = os.path.join(device_path, "mem_info_vram_used") |
| vram_total_file = os.path.join(device_path, "mem_info_vram_total") |
| |
| if os.path.exists(vram_used_file) and os.path.exists(vram_total_file): |
| with open(vram_used_file, 'r') as f: |
| used = int(f.read().strip()) // (1024 * 1024) |
| with open(vram_total_file, 'r') as f: |
| total = int(f.read().strip()) // (1024 * 1024) |
| return used, total |
| except Exception as e: |
| logger.debug(f"Error reading memory info: {e}") |
| return None, None |
| |
| def read_gpu_load(self) -> Optional[float]: |
| """Read GPU load percentage.""" |
| try: |
| device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path)) |
| load_file = os.path.join(device_path, "gpu_busy_percent") |
| |
| if os.path.exists(load_file): |
| with open(load_file, 'r') as f: |
| return float(f.read().strip()) |
| except Exception as e: |
| logger.debug(f"Error reading GPU load: {e}") |
| return None |
| |
| def read_clocks(self) -> Tuple[Optional[int], Optional[int]]: |
| """Read core and memory clocks in MHz.""" |
| try: |
| device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path)) |
| sclk_file = os.path.join(device_path, "pp_dpm_sclk") |
| mclk_file = os.path.join(device_path, "pp_dpm_mclk") |
| |
| core_clock = self._parse_clock_file(sclk_file) |
| memory_clock = self._parse_clock_file(mclk_file) |
| |
| return core_clock, memory_clock |
| except Exception as e: |
| logger.debug(f"Error reading clocks: {e}") |
| return None, None |
| |
| def _parse_clock_file(self, clock_file: str) -> Optional[int]: |
| """Parse clock file to extract current clock speed.""" |
| try: |
| if os.path.exists(clock_file): |
| with open(clock_file, 'r') as f: |
| for line in f: |
| if '*' in line: |
| parts = line.strip().split(':') |
| if len(parts) >= 2: |
| clock_str = parts[1].strip().split()[0] |
| return int(clock_str.replace('Mhz', '')) |
| except Exception as e: |
| logger.debug(f"Error parsing clock file {clock_file}: {e}") |
| return None |
| |
| def collect_status(self) -> Optional[GPUStatus]: |
| """Collect all GPU status information.""" |
| try: |
| |
| temp = self.read_temperature() |
| fan_speed = self.read_fan_speed() |
| fan_pwm = self.read_fan_pwm() |
| power = self.read_power_draw() |
| mem_used, mem_total = self.read_memory_info() |
| load = self.read_gpu_load() |
| core_clock, mem_clock = self.read_clocks() |
| |
| |
| efficiency = 0.0 |
| if power and load and power > 0: |
| efficiency = load / power |
| |
| status = GPUStatus( |
| timestamp=time.time(), |
| temperature=temp or 0.0, |
| load=load or 0.0, |
| fan_speed=fan_speed or 0, |
| fan_pwm=fan_pwm or 0, |
| power_draw=power or 0.0, |
| memory_used=mem_used or 0, |
| memory_total=mem_total or 0, |
| core_clock=core_clock or 0, |
| memory_clock=mem_clock or 0, |
| voltage=0.0, |
| efficiency=efficiency |
| ) |
| |
| self.last_status = status |
| return status |
| |
| except Exception as e: |
| logger.error(f"Error collecting GPU status: {e}") |
| return None |
|
|
|
|
| class GPUDataManager: |
| """Manages GPU data storage and retrieval.""" |
| |
| def __init__(self, db_path: str = "gpu_monitoring.db"): |
| self.db_path = db_path |
| self.init_database() |
| |
| def init_database(self): |
| """Initialize the SQLite database.""" |
| try: |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS gpu_status ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp REAL, |
| gpu_name TEXT, |
| temperature REAL, |
| load REAL, |
| fan_speed INTEGER, |
| fan_pwm INTEGER, |
| power_draw REAL, |
| memory_used INTEGER, |
| memory_total INTEGER, |
| core_clock INTEGER, |
| memory_clock INTEGER, |
| voltage REAL, |
| efficiency REAL |
| ) |
| ''') |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS gpu_info ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| gpu_name TEXT UNIQUE, |
| vendor TEXT, |
| device_id TEXT, |
| bus_id TEXT, |
| hwmon_path TEXT, |
| detected_at REAL |
| ) |
| ''') |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS alerts ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp REAL, |
| gpu_name TEXT, |
| alert_type TEXT, |
| message TEXT, |
| value REAL, |
| threshold REAL |
| ) |
| ''') |
| |
| conn.commit() |
| logger.info("Database initialized successfully") |
| |
| except Exception as e: |
| logger.error(f"Error initializing database: {e}") |
| |
| def save_gpu_info(self, gpu_info: GPUInfo): |
| """Save GPU information to database.""" |
| try: |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| INSERT OR REPLACE INTO gpu_info |
| (gpu_name, vendor, device_id, bus_id, hwmon_path, detected_at) |
| VALUES (?, ?, ?, ?, ?, ?) |
| ''', (gpu_info.name, gpu_info.vendor, gpu_info.device_id, |
| gpu_info.bus_id, gpu_info.hwmon_path, time.time())) |
| |
| conn.commit() |
| logger.debug(f"Saved GPU info: {gpu_info.name}") |
| |
| except Exception as e: |
| logger.error(f"Error saving GPU info: {e}") |
| |
| def save_status(self, gpu_name: str, status: GPUStatus): |
| """Save GPU status to database.""" |
| try: |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| INSERT INTO gpu_status |
| (timestamp, gpu_name, temperature, load, fan_speed, fan_pwm, |
| power_draw, memory_used, memory_total, core_clock, memory_clock, |
| voltage, efficiency) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| ''', (status.timestamp, gpu_name, status.temperature, status.load, |
| status.fan_speed, status.fan_pwm, status.power_draw, |
| status.memory_used, status.memory_total, status.core_clock, |
| status.memory_clock, status.voltage, status.efficiency)) |
| |
| conn.commit() |
| |
| except Exception as e: |
| logger.error(f"Error saving status: {e}") |
| |
| def get_recent_status(self, gpu_name: str, limit: int = 100) -> List[Dict[str, Any]]: |
| """Get recent status entries for a GPU.""" |
| try: |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| SELECT timestamp, temperature, load, fan_speed, fan_pwm, |
| power_draw, memory_used, memory_total, core_clock, |
| memory_clock, voltage, efficiency |
| FROM gpu_status |
| WHERE gpu_name = ? |
| ORDER BY timestamp DESC |
| LIMIT ? |
| ''', (gpu_name, limit)) |
| |
| rows = cursor.fetchall() |
| |
| |
| columns = ['timestamp', 'temperature', 'load', 'fan_speed', 'fan_pwm', |
| 'power_draw', 'memory_used', 'memory_total', 'core_clock', |
| 'memory_clock', 'voltage', 'efficiency'] |
| |
| return [dict(zip(columns, row)) for row in rows] |
| |
| except Exception as e: |
| logger.error(f"Error getting recent status: {e}") |
| return [] |
| |
| def get_gpu_info(self, gpu_name: Optional[str] = None) -> List[Dict[str, Any]]: |
| """Get GPU information from database.""" |
| try: |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| if gpu_name: |
| cursor.execute('SELECT * FROM gpu_info WHERE gpu_name = ?', (gpu_name,)) |
| else: |
| cursor.execute('SELECT * FROM gpu_info') |
| |
| rows = cursor.fetchall() |
| |
| if not rows: |
| return [] |
| |
| columns = [desc[0] for desc in cursor.description] |
| return [dict(zip(columns, row)) for row in rows] |
| |
| except Exception as e: |
| logger.error(f"Error getting GPU info: {e}") |
| return [] |
| |
| def cleanup_old_data(self, days: int = 7): |
| """Remove data older than specified days.""" |
| try: |
| cutoff_time = time.time() - (days * 24 * 3600) |
| |
| with sqlite3.connect(self.db_path) as conn: |
| cursor = conn.cursor() |
| |
| cursor.execute('DELETE FROM gpu_status WHERE timestamp < ?', (cutoff_time,)) |
| cursor.execute('DELETE FROM alerts WHERE timestamp < ?', (cutoff_time,)) |
| |
| conn.commit() |
| logger.info(f"Cleaned up data older than {days} days") |
| |
| except Exception as e: |
| logger.error(f"Error cleaning up old data: {e}") |
|
|
|
|
| class GPUManager: |
| """Main GPU management class.""" |
| |
| def __init__(self): |
| self.detector = GPUHardwareDetector() |
| self.data_manager = GPUDataManager() |
| self.collectors = {} |
| |
| def initialize(self): |
| """Initialize GPU detection and data collection.""" |
| logger.info("Initializing GPU manager...") |
| |
| |
| gpus = self.detector.detect_amd_gpus() |
| |
| if not gpus: |
| logger.warning("No AMD GPUs detected") |
| return False |
| |
| |
| for gpu in gpus: |
| self.collectors[gpu.name] = GPUDataCollector(gpu) |
| self.data_manager.save_gpu_info(gpu) |
| logger.info(f"Initialized collector for {gpu.name}") |
| |
| logger.info(f"Successfully initialized {len(gpus)} GPU(s)") |
| return True |
| |
| def get_status(self, gpu_name: Optional[str] = None) -> Dict[str, Optional[GPUStatus]]: |
| """Get current status for specified GPU or all GPUs.""" |
| results = {} |
| |
| if gpu_name: |
| if gpu_name in self.collectors: |
| status = self.collectors[gpu_name].collect_status() |
| results[gpu_name] = status |
| else: |
| results[gpu_name] = None |
| else: |
| for name, collector in self.collectors.items(): |
| status = collector.collect_status() |
| results[name] = status |
| |
| |
| for name, status in results.items(): |
| if status: |
| self.data_manager.save_status(name, status) |
| |
| return results |
| |
| def get_gpu_list(self) -> List[str]: |
| """Get list of detected GPU names.""" |
| return list(self.collectors.keys()) |
| |
| def get_gpu_info(self, gpu_name: Optional[str] = None) -> List[Dict[str, Any]]: |
| """Get GPU information.""" |
| return self.data_manager.get_gpu_info(gpu_name) |
| |
| def get_historical_data(self, gpu_name: str, hours: int = 24) -> List[Dict[str, Any]]: |
| """Get historical data for a GPU.""" |
| cutoff_time = time.time() - (hours * 3600) |
| |
| try: |
| with sqlite3.connect(self.data_manager.db_path) as conn: |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| SELECT timestamp, temperature, load, fan_speed, fan_pwm, |
| power_draw, memory_used, memory_total, core_clock, |
| memory_clock, voltage, efficiency |
| FROM gpu_status |
| WHERE gpu_name = ? AND timestamp >= ? |
| ORDER BY timestamp ASC |
| ''', (gpu_name, cutoff_time)) |
| |
| rows = cursor.fetchall() |
| |
| if not rows: |
| return [] |
| |
| columns = ['timestamp', 'temperature', 'load', 'fan_speed', 'fan_pwm', |
| 'power_draw', 'memory_used', 'memory_total', 'core_clock', |
| 'memory_clock', 'voltage', 'efficiency'] |
| |
| return [dict(zip(columns, row)) for row in rows] |
| |
| except Exception as e: |
| logger.error(f"Error getting historical data: {e}") |
| return [] |
|
|
|
|
| |
| if __name__ == "__main__": |
| import sys |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| |
| |
| manager = GPUManager() |
| |
| if manager.initialize(): |
| print("GPU detection successful!") |
| |
| |
| gpus = manager.get_gpu_list() |
| print(f"Detected GPUs: {gpus}") |
| |
| |
| status = manager.get_status() |
| for gpu_name, gpu_status in status.items(): |
| if gpu_status: |
| print(f"\n{gpu_name} Status:") |
| print(f" Temperature: {gpu_status.temperature}°C") |
| print(f" Load: {gpu_status.load}%") |
| print(f" Fan Speed: {gpu_status.fan_speed} RPM") |
| print(f" Fan PWM: {gpu_status.fan_pwm}") |
| print(f" Power: {gpu_status.power_draw}W") |
| print(f" Memory: {gpu_status.memory_used}/{gpu_status.memory_total} MB") |
| print(f" Core Clock: {gpu_status.core_clock} MHz") |
| print(f" Memory Clock: {gpu_status.memory_clock} MHz") |
| else: |
| print("GPU detection failed!") |
| sys.exit(1) |