gpu_monitoring_system / test_system.py
meccatronis's picture
Upload test_system.py with huggingface_hub
85b135b verified
#!/usr/bin/env python3
"""
Comprehensive Test and Validation Script
Tests all components of the GPU monitoring and fan control system
to ensure proper functionality and integration.
"""
import sys
import os
import time
import json
import logging
import subprocess
import threading
from pathlib import Path
from typing import Dict, List, Any, Optional
# Add the project directory to Python path
sys.path.insert(0, str(Path(__file__).parent))
from gpu_monitoring import GPUManager, GPUStatus
from gpu_fan_controller import FanController, FanMode, ProfileType
from alert_system import AlertManager, AlertThreshold
from performance_optimizer import SystemOptimizer
from web_interface import app as web_app
logger = logging.getLogger(__name__)
class SystemTester:
"""Comprehensive system tester."""
def __init__(self):
self.test_results = []
self.gpu_manager = None
self.fan_controller = None
self.alert_manager = None
self.optimizer = None
# Setup logging
self.setup_logging()
def setup_logging(self):
"""Setup test logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('test_results.log'),
logging.StreamHandler(sys.stdout)
]
)
def log_test(self, test_name: str, success: bool, message: str = ""):
"""Log test result."""
status = "PASS" if success else "FAIL"
logger.info(f"[{status}] {test_name}: {message}")
self.test_results.append({
"test": test_name,
"success": success,
"message": message,
"timestamp": time.time()
})
def test_gpu_detection(self) -> bool:
"""Test GPU detection and initialization."""
try:
self.gpu_manager = GPUManager()
success = self.gpu_manager.initialize()
if success:
gpus = self.gpu_manager.get_gpu_list()
gpu_info = self.gpu_manager.get_gpu_info()
self.log_test(
"GPU Detection",
True,
f"Found {len(gpus)} GPU(s): {', '.join(gpus)}"
)
# Test status collection
status = self.gpu_manager.get_status()
if status:
self.log_test(
"GPU Status Collection",
True,
f"Successfully collected status for {len(status)} GPU(s)"
)
return True
else:
self.log_test("GPU Status Collection", False, "No status data collected")
return False
else:
self.log_test("GPU Detection", False, "Failed to initialize GPU manager")
return False
except Exception as e:
self.log_test("GPU Detection", False, f"Exception: {e}")
return False
def test_fan_control(self) -> bool:
"""Test fan control functionality."""
try:
self.fan_controller = FanController()
success = self.fan_controller.initialize()
if success:
# Test profile management
profiles = self.fan_controller.get_profiles()
self.log_test(
"Fan Profiles",
True,
f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}"
)
# Test profile switching
if "balanced" in profiles:
success = self.fan_controller.set_profile("balanced")
self.log_test("Profile Switching", success, "Switched to balanced profile")
# Test manual control
self.fan_controller.set_manual_pwm(100)
time.sleep(1) # Allow time for change
status = self.fan_controller.get_status()
if status and status.manual_override:
self.log_test("Manual Control", True, "Manual PWM control working")
else:
self.log_test("Manual Control", False, "Manual control not working")
return True
else:
self.log_test("Fan Control", False, "Failed to initialize fan controller")
return False
except Exception as e:
self.log_test("Fan Control", False, f"Exception: {e}")
return False
def test_alert_system(self) -> bool:
"""Test alert system functionality."""
try:
self.alert_manager = AlertManager()
# Test threshold management
threshold = AlertThreshold(
metric="temperature",
threshold=50.0,
operator=">=",
duration=5,
enabled=True,
cooldown=60
)
self.alert_manager.add_threshold(threshold)
self.log_test("Alert Thresholds", True, "Successfully added alert threshold")
# Test alert history
history = self.alert_manager.get_alert_history(1)
self.log_test("Alert History", True, f"Retrieved {len(history)} alerts from history")
return True
except Exception as e:
self.log_test("Alert System", False, f"Exception: {e}")
return False
def test_performance_optimizer(self) -> bool:
"""Test performance optimization system."""
try:
self.optimizer = SystemOptimizer()
# Test profile management
profiles = self.optimizer.profiles
self.log_test(
"Optimization Profiles",
True,
f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}"
)
# Test profile application
if "balanced" in profiles:
success = self.optimizer.apply_profile("balanced")
self.log_test("Profile Application", success, "Applied balanced profile")
# Test performance analytics
analytics = self.optimizer.get_performance_analytics(1)
if "system" in analytics:
self.log_test("Performance Analytics", True, "Successfully generated analytics")
else:
self.log_test("Performance Analytics", False, "Failed to generate analytics")
return True
except Exception as e:
self.log_test("Performance Optimizer", False, f"Exception: {e}")
return False
def test_web_interface(self) -> bool:
"""Test web interface components."""
try:
# Test API endpoints
with web_app.test_client() as client:
# Test status endpoint
response = client.get('/api/status')
if response.status_code == 200:
self.log_test("Web API - Status", True, "Status endpoint working")
else:
self.log_test("Web API - Status", False, f"Status endpoint failed: {response.status_code}")
# Test GPU list endpoint
response = client.get('/api/gpus')
if response.status_code == 200:
self.log_test("Web API - GPUs", True, "GPU list endpoint working")
else:
self.log_test("Web API - GPUs", False, f"GPU list endpoint failed: {response.status_code}")
# Test fan profiles endpoint
response = client.get('/api/fan/profiles')
if response.status_code == 200:
self.log_test("Web API - Fan Profiles", True, "Fan profiles endpoint working")
else:
self.log_test("Web API - Fan Profiles", False, f"Fan profiles endpoint failed: {response.status_code}")
return True
except Exception as e:
self.log_test("Web Interface", False, f"Exception: {e}")
return False
def test_data_persistence(self) -> bool:
"""Test data persistence and database functionality."""
try:
if not self.gpu_manager:
self.gpu_manager = GPUManager()
self.gpu_manager.initialize()
# Test data collection and storage
status = self.gpu_manager.get_status()
if status:
# Test historical data retrieval
for gpu_name in self.gpu_manager.get_gpu_list():
history = self.gpu_manager.get_historical_data(gpu_name, 1)
self.log_test(
f"Data Persistence - {gpu_name}",
True,
f"Retrieved {len(history)} historical records"
)
return True
except Exception as e:
self.log_test("Data Persistence", False, f"Exception: {e}")
return False
def test_system_integration(self) -> bool:
"""Test system integration and component interaction."""
try:
# Test that all components can work together
components_working = []
if self.gpu_manager and self.gpu_manager.get_gpu_list():
components_working.append("GPU Manager")
if self.fan_controller and self.fan_controller.get_profiles():
components_working.append("Fan Controller")
if self.alert_manager:
components_working.append("Alert Manager")
if self.optimizer and self.optimizer.profiles:
components_working.append("Performance Optimizer")
success = len(components_working) >= 3 # At least 3 components should work
self.log_test(
"System Integration",
success,
f"Working components: {', '.join(components_working)}"
)
return success
except Exception as e:
self.log_test("System Integration", False, f"Exception: {e}")
return False
def test_configuration_files(self) -> bool:
"""Test configuration file loading and validation."""
try:
config_files = [
"config/fan_profiles.json",
"config/monitoring.json",
"config/alerts.json",
"config/optimization.json"
]
valid_configs = 0
for config_file in config_files:
if Path(config_file).exists():
try:
with open(config_file, 'r') as f:
json.load(f)
valid_configs += 1
except json.JSONDecodeError:
self.log_test(f"Config Validation - {config_file}", False, "Invalid JSON")
else:
self.log_test(f"Config File - {config_file}", False, "File not found")
success = valid_configs == len(config_files)
self.log_test(
"Configuration Files",
success,
f"Valid configs: {valid_configs}/{len(config_files)}"
)
return success
except Exception as e:
self.log_test("Configuration Files", False, f"Exception: {e}")
return False
def test_permissions(self) -> bool:
"""Test system permissions for GPU access."""
try:
# Check if we can access GPU sysfs
gpu_paths = []
for card_path in Path("/sys/class/drm").glob("card*"):
device_path = card_path / "device"
if (device_path / "vendor").exists():
with open(device_path / "vendor", 'r') as f:
if f.read().strip() == "0x1002": # AMD
gpu_paths.append(str(device_path))
if gpu_paths:
# Check hwmon access
hwmon_accessible = 0
for device_path in gpu_paths:
hwmon_path = Path(device_path) / "hwmon"
if hwmon_path.exists():
hwmons = list(hwmon_path.glob("*"))
if hwmons:
# Test read access
temp_file = hwmons[0] / "temp1_input"
if temp_file.exists():
try:
with open(temp_file, 'r') as f:
f.read()
hwmon_accessible += 1
except PermissionError:
pass
success = hwmon_accessible > 0
self.log_test(
"Permissions",
success,
f"Accessible hwmon devices: {hwmon_accessible}/{len(gpu_paths)}"
)
return success
else:
self.log_test("Permissions", False, "No AMD GPUs detected")
return False
except Exception as e:
self.log_test("Permissions", False, f"Exception: {e}")
return False
def run_all_tests(self) -> Dict[str, Any]:
"""Run all tests and return comprehensive results."""
logger.info("=" * 60)
logger.info("GPU Monitoring System - Comprehensive Test Suite")
logger.info("=" * 60)
# Run individual tests
tests = [
("Configuration Files", self.test_configuration_files),
("Permissions", self.test_permissions),
("GPU Detection", self.test_gpu_detection),
("Fan Control", self.test_fan_control),
("Alert System", self.test_alert_system),
("Performance Optimizer", self.test_performance_optimizer),
("Web Interface", self.test_web_interface),
("Data Persistence", self.test_data_persistence),
("System Integration", self.test_system_integration),
]
for test_name, test_func in tests:
logger.info(f"\nRunning: {test_name}")
logger.info("-" * 40)
try:
test_func()
except Exception as e:
self.log_test(test_name, False, f"Test framework error: {e}")
# Calculate results
total_tests = len(self.test_results)
passed_tests = sum(1 for result in self.test_results if result['success'])
failed_tests = total_tests - passed_tests
# Generate summary
summary = {
"total_tests": total_tests,
"passed": passed_tests,
"failed": failed_tests,
"success_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
"test_results": self.test_results,
"timestamp": time.time(),
"system_info": self.get_system_info()
}
# Save results
self.save_test_results(summary)
# Print summary
logger.info("\n" + "=" * 60)
logger.info("TEST SUMMARY")
logger.info("=" * 60)
logger.info(f"Total Tests: {total_tests}")
logger.info(f"Passed: {passed_tests}")
logger.info(f"Failed: {failed_tests}")
logger.info(f"Success Rate: {summary['success_rate']:.1f}%")
if failed_tests > 0:
logger.info("\nFailed Tests:")
for result in self.test_results:
if not result['success']:
logger.info(f" - {result['test']}: {result['message']}")
logger.info("=" * 60)
return summary
def get_system_info(self) -> Dict[str, Any]:
"""Get system information for test context."""
try:
import psutil
return {
"platform": sys.platform,
"python_version": sys.version,
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total // (1024**3), # GB
"disk_total": psutil.disk_usage('/').total // (1024**3), # GB
"uptime": time.time() - psutil.boot_time()
}
except:
return {
"platform": sys.platform,
"python_version": sys.version
}
def save_test_results(self, summary: Dict[str, Any]):
"""Save test results to file."""
try:
# Save detailed results
with open('test_results_detailed.json', 'w') as f:
json.dump(summary, f, indent=2, default=str)
# Save summary
with open('test_summary.txt', 'w') as f:
f.write(f"GPU Monitoring System Test Results\n")
f.write(f"====================================\n")
f.write(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total Tests: {summary['total_tests']}\n")
f.write(f"Passed: {summary['passed']}\n")
f.write(f"Failed: {summary['failed']}\n")
f.write(f"Success Rate: {summary['success_rate']:.1f}%\n\n")
f.write("Detailed Results:\n")
f.write("================\n")
for result in summary['test_results']:
status = "PASS" if result['success'] else "FAIL"
f.write(f"{status}: {result['test']} - {result['message']}\n")
logger.info("Test results saved to test_results_detailed.json and test_summary.txt")
except Exception as e:
logger.error(f"Failed to save test results: {e}")
def main():
"""Main test execution."""
print("GPU Monitoring System - Comprehensive Test Suite")
print("=" * 60)
print("This test suite will validate all components of the")
print("GPU monitoring and fan control system.")
print()
# Check if running as root for full functionality
if os.geteuid() != 0:
print("WARNING: Not running as root. Some tests may fail due to permission issues.")
print("For complete testing, run: sudo python3 test_system.py")
print()
# Run tests
tester = SystemTester()
results = tester.run_all_tests()
# Exit with appropriate code
if results['failed'] > 0:
print(f"\nSome tests failed. Please review the results and fix any issues.")
sys.exit(1)
else:
print(f"\nAll tests passed! The system is ready for use.")
sys.exit(0)
if __name__ == "__main__":
main()