gpu_monitoring_system / gpu_monitoring.py
meccatronis's picture
Upload gpu_monitoring.py with huggingface_hub
3838a08 verified
#!/usr/bin/env python3
"""
GPU Detection and Monitoring Module
Handles GPU detection, hardware monitoring, and data collection for AMD GPUs.
Supports multiple GPU models and provides a unified interface for monitoring.
"""
import os
import glob
import time
import logging
import json
import sqlite3
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class GPUInfo:
"""GPU information data structure."""
name: str
vendor: str
device_id: str
bus_id: str
hwmon_path: str
temp_sensor: str
fan_control: str
power_sensor: str
memory_total: int = 0
memory_used: int = 0
@dataclass
class GPUStatus:
"""Current GPU status data structure."""
timestamp: float
temperature: float
load: float
fan_speed: int
fan_pwm: int
power_draw: float
memory_used: int
memory_total: int
core_clock: int
memory_clock: int
voltage: float = 0.0
efficiency: float = 0.0
class GPUHardwareDetector:
"""Detects and identifies GPU hardware."""
def __init__(self):
self.gpus = []
self.detected_gpus = []
def detect_amd_gpus(self) -> List[GPUInfo]:
"""Detect AMD GPUs in the system."""
logger.info("Detecting AMD GPUs...")
gpus = []
# Look for AMD GPUs in sysfs
for card_path in glob.glob("/sys/class/drm/card*"):
try:
# Check if this is an AMD GPU
device_path = os.path.join(card_path, "device")
vendor_file = os.path.join(device_path, "vendor")
if not os.path.exists(vendor_file):
continue
with open(vendor_file, 'r') as f:
vendor_id = f.read().strip()
if vendor_id != "0x1002": # AMD vendor ID
continue
# Get device information
device_id_file = os.path.join(device_path, "device")
with open(device_id_file, 'r') as f:
device_id = f.read().strip()
# Get GPU name
gpu_name = self._get_gpu_name(device_id)
# Find hwmon path
hwmon_path = self._find_hwmon_path(device_path)
if hwmon_path:
gpu_info = GPUInfo(
name=gpu_name,
vendor="AMD",
device_id=device_id,
bus_id=os.path.basename(card_path),
hwmon_path=hwmon_path,
temp_sensor=os.path.join(hwmon_path, "temp1_input"),
fan_control=os.path.join(hwmon_path, "pwm1"),
power_sensor=os.path.join(hwmon_path, "power1_input")
)
gpus.append(gpu_info)
logger.info(f"Found AMD GPU: {gpu_name} ({device_id})")
except Exception as e:
logger.warning(f"Error detecting GPU at {card_path}: {e}")
continue
self.detected_gpus = gpus
return gpus
def _get_gpu_name(self, device_id: str) -> str:
"""Get human-readable GPU name from device ID."""
gpu_names = {
"0x73bf": "Radeon Pro VII",
"0x73ff": "Radeon Pro VII",
"0x7310": "Radeon Pro VII",
"0x7340": "Radeon Pro VII",
"0x73a0": "Radeon Pro VII",
"0x73b0": "Radeon Pro VII",
"0x73c0": "Radeon Pro VII",
"0x73d0": "Radeon Pro VII",
"0x73e0": "Radeon Pro VII",
"0x73f0": "Radeon Pro VII",
# Add more GPU mappings as needed
}
return gpu_names.get(device_id, f"AMD GPU {device_id}")
def _find_hwmon_path(self, device_path: str) -> Optional[str]:
"""Find the hwmon path for a GPU device."""
hwmon_base = os.path.join(device_path, "hwmon")
if not os.path.exists(hwmon_base):
return None
try:
hwmons = os.listdir(hwmon_base)
if hwmons:
return os.path.join(hwmon_base, hwmons[0])
except Exception as e:
logger.warning(f"Error finding hwmon for {device_path}: {e}")
return None
class GPUDataCollector:
"""Collects GPU monitoring data."""
def __init__(self, gpu_info: GPUInfo):
self.gpu_info = gpu_info
self.last_status = None
def read_temperature(self) -> Optional[float]:
"""Read GPU temperature in Celsius."""
try:
if os.path.exists(self.gpu_info.temp_sensor):
with open(self.gpu_info.temp_sensor, 'r') as f:
temp_millic = int(f.read().strip())
return temp_millic / 1000.0
except Exception as e:
logger.debug(f"Error reading temperature: {e}")
return None
def read_fan_speed(self) -> Optional[int]:
"""Read fan speed in RPM."""
fan_speed_file = self.gpu_info.fan_control.replace("pwm1", "fan1_input")
try:
if os.path.exists(fan_speed_file):
with open(fan_speed_file, 'r') as f:
return int(f.read().strip())
except Exception as e:
logger.debug(f"Error reading fan speed: {e}")
return None
def read_fan_pwm(self) -> Optional[int]:
"""Read fan PWM value (0-255)."""
try:
if os.path.exists(self.gpu_info.fan_control):
with open(self.gpu_info.fan_control, 'r') as f:
return int(f.read().strip())
except Exception as e:
logger.debug(f"Error reading fan PWM: {e}")
return None
def read_power_draw(self) -> Optional[float]:
"""Read power draw in watts."""
try:
if os.path.exists(self.gpu_info.power_sensor):
with open(self.gpu_info.power_sensor, 'r') as f:
power_microw = int(f.read().strip())
return power_microw / 1000000.0
except Exception as e:
logger.debug(f"Error reading power: {e}")
return None
def read_memory_info(self) -> Tuple[Optional[int], Optional[int]]:
"""Read VRAM usage in MB."""
try:
device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path))
vram_used_file = os.path.join(device_path, "mem_info_vram_used")
vram_total_file = os.path.join(device_path, "mem_info_vram_total")
if os.path.exists(vram_used_file) and os.path.exists(vram_total_file):
with open(vram_used_file, 'r') as f:
used = int(f.read().strip()) // (1024 * 1024)
with open(vram_total_file, 'r') as f:
total = int(f.read().strip()) // (1024 * 1024)
return used, total
except Exception as e:
logger.debug(f"Error reading memory info: {e}")
return None, None
def read_gpu_load(self) -> Optional[float]:
"""Read GPU load percentage."""
try:
device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path))
load_file = os.path.join(device_path, "gpu_busy_percent")
if os.path.exists(load_file):
with open(load_file, 'r') as f:
return float(f.read().strip())
except Exception as e:
logger.debug(f"Error reading GPU load: {e}")
return None
def read_clocks(self) -> Tuple[Optional[int], Optional[int]]:
"""Read core and memory clocks in MHz."""
try:
device_path = os.path.dirname(os.path.dirname(self.gpu_info.hwmon_path))
sclk_file = os.path.join(device_path, "pp_dpm_sclk")
mclk_file = os.path.join(device_path, "pp_dpm_mclk")
core_clock = self._parse_clock_file(sclk_file)
memory_clock = self._parse_clock_file(mclk_file)
return core_clock, memory_clock
except Exception as e:
logger.debug(f"Error reading clocks: {e}")
return None, None
def _parse_clock_file(self, clock_file: str) -> Optional[int]:
"""Parse clock file to extract current clock speed."""
try:
if os.path.exists(clock_file):
with open(clock_file, 'r') as f:
for line in f:
if '*' in line: # Active clock
parts = line.strip().split(':')
if len(parts) >= 2:
clock_str = parts[1].strip().split()[0]
return int(clock_str.replace('Mhz', ''))
except Exception as e:
logger.debug(f"Error parsing clock file {clock_file}: {e}")
return None
def collect_status(self) -> Optional[GPUStatus]:
"""Collect all GPU status information."""
try:
# Read all sensors
temp = self.read_temperature()
fan_speed = self.read_fan_speed()
fan_pwm = self.read_fan_pwm()
power = self.read_power_draw()
mem_used, mem_total = self.read_memory_info()
load = self.read_gpu_load()
core_clock, mem_clock = self.read_clocks()
# Calculate efficiency if we have power and load data
efficiency = 0.0
if power and load and power > 0:
efficiency = load / power
status = GPUStatus(
timestamp=time.time(),
temperature=temp or 0.0,
load=load or 0.0,
fan_speed=fan_speed or 0,
fan_pwm=fan_pwm or 0,
power_draw=power or 0.0,
memory_used=mem_used or 0,
memory_total=mem_total or 0,
core_clock=core_clock or 0,
memory_clock=mem_clock or 0,
voltage=0.0, # Not implemented yet
efficiency=efficiency
)
self.last_status = status
return status
except Exception as e:
logger.error(f"Error collecting GPU status: {e}")
return None
class GPUDataManager:
"""Manages GPU data storage and retrieval."""
def __init__(self, db_path: str = "gpu_monitoring.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the SQLite database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS gpu_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
gpu_name TEXT,
temperature REAL,
load REAL,
fan_speed INTEGER,
fan_pwm INTEGER,
power_draw REAL,
memory_used INTEGER,
memory_total INTEGER,
core_clock INTEGER,
memory_clock INTEGER,
voltage REAL,
efficiency REAL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS gpu_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gpu_name TEXT UNIQUE,
vendor TEXT,
device_id TEXT,
bus_id TEXT,
hwmon_path TEXT,
detected_at REAL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
gpu_name TEXT,
alert_type TEXT,
message TEXT,
value REAL,
threshold REAL
)
''')
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
def save_gpu_info(self, gpu_info: GPUInfo):
"""Save GPU information to database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO gpu_info
(gpu_name, vendor, device_id, bus_id, hwmon_path, detected_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (gpu_info.name, gpu_info.vendor, gpu_info.device_id,
gpu_info.bus_id, gpu_info.hwmon_path, time.time()))
conn.commit()
logger.debug(f"Saved GPU info: {gpu_info.name}")
except Exception as e:
logger.error(f"Error saving GPU info: {e}")
def save_status(self, gpu_name: str, status: GPUStatus):
"""Save GPU status to database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO gpu_status
(timestamp, gpu_name, temperature, load, fan_speed, fan_pwm,
power_draw, memory_used, memory_total, core_clock, memory_clock,
voltage, efficiency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (status.timestamp, gpu_name, status.temperature, status.load,
status.fan_speed, status.fan_pwm, status.power_draw,
status.memory_used, status.memory_total, status.core_clock,
status.memory_clock, status.voltage, status.efficiency))
conn.commit()
except Exception as e:
logger.error(f"Error saving status: {e}")
def get_recent_status(self, gpu_name: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get recent status entries for a GPU."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, temperature, load, fan_speed, fan_pwm,
power_draw, memory_used, memory_total, core_clock,
memory_clock, voltage, efficiency
FROM gpu_status
WHERE gpu_name = ?
ORDER BY timestamp DESC
LIMIT ?
''', (gpu_name, limit))
rows = cursor.fetchall()
# Convert to list of dictionaries
columns = ['timestamp', 'temperature', 'load', 'fan_speed', 'fan_pwm',
'power_draw', 'memory_used', 'memory_total', 'core_clock',
'memory_clock', 'voltage', 'efficiency']
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logger.error(f"Error getting recent status: {e}")
return []
def get_gpu_info(self, gpu_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get GPU information from database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if gpu_name:
cursor.execute('SELECT * FROM gpu_info WHERE gpu_name = ?', (gpu_name,))
else:
cursor.execute('SELECT * FROM gpu_info')
rows = cursor.fetchall()
if not rows:
return []
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
return []
def cleanup_old_data(self, days: int = 7):
"""Remove data older than specified days."""
try:
cutoff_time = time.time() - (days * 24 * 3600)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM gpu_status WHERE timestamp < ?', (cutoff_time,))
cursor.execute('DELETE FROM alerts WHERE timestamp < ?', (cutoff_time,))
conn.commit()
logger.info(f"Cleaned up data older than {days} days")
except Exception as e:
logger.error(f"Error cleaning up old data: {e}")
class GPUManager:
"""Main GPU management class."""
def __init__(self):
self.detector = GPUHardwareDetector()
self.data_manager = GPUDataManager()
self.collectors = {}
def initialize(self):
"""Initialize GPU detection and data collection."""
logger.info("Initializing GPU manager...")
# Detect GPUs
gpus = self.detector.detect_amd_gpus()
if not gpus:
logger.warning("No AMD GPUs detected")
return False
# Initialize collectors for each GPU
for gpu in gpus:
self.collectors[gpu.name] = GPUDataCollector(gpu)
self.data_manager.save_gpu_info(gpu)
logger.info(f"Initialized collector for {gpu.name}")
logger.info(f"Successfully initialized {len(gpus)} GPU(s)")
return True
def get_status(self, gpu_name: Optional[str] = None) -> Dict[str, Optional[GPUStatus]]:
"""Get current status for specified GPU or all GPUs."""
results = {}
if gpu_name:
if gpu_name in self.collectors:
status = self.collectors[gpu_name].collect_status()
results[gpu_name] = status
else:
results[gpu_name] = None
else:
for name, collector in self.collectors.items():
status = collector.collect_status()
results[name] = status
# Save to database
for name, status in results.items():
if status:
self.data_manager.save_status(name, status)
return results
def get_gpu_list(self) -> List[str]:
"""Get list of detected GPU names."""
return list(self.collectors.keys())
def get_gpu_info(self, gpu_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get GPU information."""
return self.data_manager.get_gpu_info(gpu_name)
def get_historical_data(self, gpu_name: str, hours: int = 24) -> List[Dict[str, Any]]:
"""Get historical data for a GPU."""
cutoff_time = time.time() - (hours * 3600)
try:
with sqlite3.connect(self.data_manager.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, temperature, load, fan_speed, fan_pwm,
power_draw, memory_used, memory_total, core_clock,
memory_clock, voltage, efficiency
FROM gpu_status
WHERE gpu_name = ? AND timestamp >= ?
ORDER BY timestamp ASC
''', (gpu_name, cutoff_time))
rows = cursor.fetchall()
if not rows:
return []
columns = ['timestamp', 'temperature', 'load', 'fan_speed', 'fan_pwm',
'power_draw', 'memory_used', 'memory_total', 'core_clock',
'memory_clock', 'voltage', 'efficiency']
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logger.error(f"Error getting historical data: {e}")
return []
# Example usage and testing
if __name__ == "__main__":
import sys
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Test GPU detection and monitoring
manager = GPUManager()
if manager.initialize():
print("GPU detection successful!")
# Get GPU list
gpus = manager.get_gpu_list()
print(f"Detected GPUs: {gpus}")
# Get current status
status = manager.get_status()
for gpu_name, gpu_status in status.items():
if gpu_status:
print(f"\n{gpu_name} Status:")
print(f" Temperature: {gpu_status.temperature}°C")
print(f" Load: {gpu_status.load}%")
print(f" Fan Speed: {gpu_status.fan_speed} RPM")
print(f" Fan PWM: {gpu_status.fan_pwm}")
print(f" Power: {gpu_status.power_draw}W")
print(f" Memory: {gpu_status.memory_used}/{gpu_status.memory_total} MB")
print(f" Core Clock: {gpu_status.core_clock} MHz")
print(f" Memory Clock: {gpu_status.memory_clock} MHz")
else:
print("GPU detection failed!")
sys.exit(1)