| |
| """ |
| Comprehensive Test and Validation Script |
| |
| Tests all components of the GPU monitoring and fan control system |
| to ensure proper functionality and integration. |
| """ |
|
|
| import sys |
| import os |
| import time |
| import json |
| import logging |
| import subprocess |
| import threading |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| from gpu_monitoring import GPUManager, GPUStatus |
| from gpu_fan_controller import FanController, FanMode, ProfileType |
| from alert_system import AlertManager, AlertThreshold |
| from performance_optimizer import SystemOptimizer |
| from web_interface import app as web_app |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SystemTester: |
| """Comprehensive system tester.""" |
| |
| def __init__(self): |
| self.test_results = [] |
| self.gpu_manager = None |
| self.fan_controller = None |
| self.alert_manager = None |
| self.optimizer = None |
| |
| |
| self.setup_logging() |
| |
| def setup_logging(self): |
| """Setup test logging.""" |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler('test_results.log'), |
| logging.StreamHandler(sys.stdout) |
| ] |
| ) |
| |
| def log_test(self, test_name: str, success: bool, message: str = ""): |
| """Log test result.""" |
| status = "PASS" if success else "FAIL" |
| logger.info(f"[{status}] {test_name}: {message}") |
| self.test_results.append({ |
| "test": test_name, |
| "success": success, |
| "message": message, |
| "timestamp": time.time() |
| }) |
| |
| def test_gpu_detection(self) -> bool: |
| """Test GPU detection and initialization.""" |
| try: |
| self.gpu_manager = GPUManager() |
| success = self.gpu_manager.initialize() |
| |
| if success: |
| gpus = self.gpu_manager.get_gpu_list() |
| gpu_info = self.gpu_manager.get_gpu_info() |
| |
| self.log_test( |
| "GPU Detection", |
| True, |
| f"Found {len(gpus)} GPU(s): {', '.join(gpus)}" |
| ) |
| |
| |
| status = self.gpu_manager.get_status() |
| if status: |
| self.log_test( |
| "GPU Status Collection", |
| True, |
| f"Successfully collected status for {len(status)} GPU(s)" |
| ) |
| return True |
| else: |
| self.log_test("GPU Status Collection", False, "No status data collected") |
| return False |
| else: |
| self.log_test("GPU Detection", False, "Failed to initialize GPU manager") |
| return False |
| |
| except Exception as e: |
| self.log_test("GPU Detection", False, f"Exception: {e}") |
| return False |
| |
| def test_fan_control(self) -> bool: |
| """Test fan control functionality.""" |
| try: |
| self.fan_controller = FanController() |
| success = self.fan_controller.initialize() |
| |
| if success: |
| |
| profiles = self.fan_controller.get_profiles() |
| self.log_test( |
| "Fan Profiles", |
| True, |
| f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}" |
| ) |
| |
| |
| if "balanced" in profiles: |
| success = self.fan_controller.set_profile("balanced") |
| self.log_test("Profile Switching", success, "Switched to balanced profile") |
| |
| |
| self.fan_controller.set_manual_pwm(100) |
| time.sleep(1) |
| |
| status = self.fan_controller.get_status() |
| if status and status.manual_override: |
| self.log_test("Manual Control", True, "Manual PWM control working") |
| else: |
| self.log_test("Manual Control", False, "Manual control not working") |
| |
| return True |
| else: |
| self.log_test("Fan Control", False, "Failed to initialize fan controller") |
| return False |
| |
| except Exception as e: |
| self.log_test("Fan Control", False, f"Exception: {e}") |
| return False |
| |
| def test_alert_system(self) -> bool: |
| """Test alert system functionality.""" |
| try: |
| self.alert_manager = AlertManager() |
| |
| |
| threshold = AlertThreshold( |
| metric="temperature", |
| threshold=50.0, |
| operator=">=", |
| duration=5, |
| enabled=True, |
| cooldown=60 |
| ) |
| |
| self.alert_manager.add_threshold(threshold) |
| self.log_test("Alert Thresholds", True, "Successfully added alert threshold") |
| |
| |
| history = self.alert_manager.get_alert_history(1) |
| self.log_test("Alert History", True, f"Retrieved {len(history)} alerts from history") |
| |
| return True |
| |
| except Exception as e: |
| self.log_test("Alert System", False, f"Exception: {e}") |
| return False |
| |
| def test_performance_optimizer(self) -> bool: |
| """Test performance optimization system.""" |
| try: |
| self.optimizer = SystemOptimizer() |
| |
| |
| profiles = self.optimizer.profiles |
| self.log_test( |
| "Optimization Profiles", |
| True, |
| f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}" |
| ) |
| |
| |
| if "balanced" in profiles: |
| success = self.optimizer.apply_profile("balanced") |
| self.log_test("Profile Application", success, "Applied balanced profile") |
| |
| |
| analytics = self.optimizer.get_performance_analytics(1) |
| if "system" in analytics: |
| self.log_test("Performance Analytics", True, "Successfully generated analytics") |
| else: |
| self.log_test("Performance Analytics", False, "Failed to generate analytics") |
| |
| return True |
| |
| except Exception as e: |
| self.log_test("Performance Optimizer", False, f"Exception: {e}") |
| return False |
| |
| def test_web_interface(self) -> bool: |
| """Test web interface components.""" |
| try: |
| |
| with web_app.test_client() as client: |
| |
| response = client.get('/api/status') |
| if response.status_code == 200: |
| self.log_test("Web API - Status", True, "Status endpoint working") |
| else: |
| self.log_test("Web API - Status", False, f"Status endpoint failed: {response.status_code}") |
| |
| |
| response = client.get('/api/gpus') |
| if response.status_code == 200: |
| self.log_test("Web API - GPUs", True, "GPU list endpoint working") |
| else: |
| self.log_test("Web API - GPUs", False, f"GPU list endpoint failed: {response.status_code}") |
| |
| |
| response = client.get('/api/fan/profiles') |
| if response.status_code == 200: |
| self.log_test("Web API - Fan Profiles", True, "Fan profiles endpoint working") |
| else: |
| self.log_test("Web API - Fan Profiles", False, f"Fan profiles endpoint failed: {response.status_code}") |
| |
| return True |
| |
| except Exception as e: |
| self.log_test("Web Interface", False, f"Exception: {e}") |
| return False |
| |
| def test_data_persistence(self) -> bool: |
| """Test data persistence and database functionality.""" |
| try: |
| if not self.gpu_manager: |
| self.gpu_manager = GPUManager() |
| self.gpu_manager.initialize() |
| |
| |
| status = self.gpu_manager.get_status() |
| if status: |
| |
| for gpu_name in self.gpu_manager.get_gpu_list(): |
| history = self.gpu_manager.get_historical_data(gpu_name, 1) |
| self.log_test( |
| f"Data Persistence - {gpu_name}", |
| True, |
| f"Retrieved {len(history)} historical records" |
| ) |
| |
| return True |
| |
| except Exception as e: |
| self.log_test("Data Persistence", False, f"Exception: {e}") |
| return False |
| |
| def test_system_integration(self) -> bool: |
| """Test system integration and component interaction.""" |
| try: |
| |
| components_working = [] |
| |
| if self.gpu_manager and self.gpu_manager.get_gpu_list(): |
| components_working.append("GPU Manager") |
| |
| if self.fan_controller and self.fan_controller.get_profiles(): |
| components_working.append("Fan Controller") |
| |
| if self.alert_manager: |
| components_working.append("Alert Manager") |
| |
| if self.optimizer and self.optimizer.profiles: |
| components_working.append("Performance Optimizer") |
| |
| success = len(components_working) >= 3 |
| self.log_test( |
| "System Integration", |
| success, |
| f"Working components: {', '.join(components_working)}" |
| ) |
| |
| return success |
| |
| except Exception as e: |
| self.log_test("System Integration", False, f"Exception: {e}") |
| return False |
| |
| def test_configuration_files(self) -> bool: |
| """Test configuration file loading and validation.""" |
| try: |
| config_files = [ |
| "config/fan_profiles.json", |
| "config/monitoring.json", |
| "config/alerts.json", |
| "config/optimization.json" |
| ] |
| |
| valid_configs = 0 |
| for config_file in config_files: |
| if Path(config_file).exists(): |
| try: |
| with open(config_file, 'r') as f: |
| json.load(f) |
| valid_configs += 1 |
| except json.JSONDecodeError: |
| self.log_test(f"Config Validation - {config_file}", False, "Invalid JSON") |
| else: |
| self.log_test(f"Config File - {config_file}", False, "File not found") |
| |
| success = valid_configs == len(config_files) |
| self.log_test( |
| "Configuration Files", |
| success, |
| f"Valid configs: {valid_configs}/{len(config_files)}" |
| ) |
| |
| return success |
| |
| except Exception as e: |
| self.log_test("Configuration Files", False, f"Exception: {e}") |
| return False |
| |
| def test_permissions(self) -> bool: |
| """Test system permissions for GPU access.""" |
| try: |
| |
| gpu_paths = [] |
| for card_path in Path("/sys/class/drm").glob("card*"): |
| device_path = card_path / "device" |
| if (device_path / "vendor").exists(): |
| with open(device_path / "vendor", 'r') as f: |
| if f.read().strip() == "0x1002": |
| gpu_paths.append(str(device_path)) |
| |
| if gpu_paths: |
| |
| hwmon_accessible = 0 |
| for device_path in gpu_paths: |
| hwmon_path = Path(device_path) / "hwmon" |
| if hwmon_path.exists(): |
| hwmons = list(hwmon_path.glob("*")) |
| if hwmons: |
| |
| temp_file = hwmons[0] / "temp1_input" |
| if temp_file.exists(): |
| try: |
| with open(temp_file, 'r') as f: |
| f.read() |
| hwmon_accessible += 1 |
| except PermissionError: |
| pass |
| |
| success = hwmon_accessible > 0 |
| self.log_test( |
| "Permissions", |
| success, |
| f"Accessible hwmon devices: {hwmon_accessible}/{len(gpu_paths)}" |
| ) |
| return success |
| else: |
| self.log_test("Permissions", False, "No AMD GPUs detected") |
| return False |
| |
| except Exception as e: |
| self.log_test("Permissions", False, f"Exception: {e}") |
| return False |
| |
| def run_all_tests(self) -> Dict[str, Any]: |
| """Run all tests and return comprehensive results.""" |
| logger.info("=" * 60) |
| logger.info("GPU Monitoring System - Comprehensive Test Suite") |
| logger.info("=" * 60) |
| |
| |
| tests = [ |
| ("Configuration Files", self.test_configuration_files), |
| ("Permissions", self.test_permissions), |
| ("GPU Detection", self.test_gpu_detection), |
| ("Fan Control", self.test_fan_control), |
| ("Alert System", self.test_alert_system), |
| ("Performance Optimizer", self.test_performance_optimizer), |
| ("Web Interface", self.test_web_interface), |
| ("Data Persistence", self.test_data_persistence), |
| ("System Integration", self.test_system_integration), |
| ] |
| |
| for test_name, test_func in tests: |
| logger.info(f"\nRunning: {test_name}") |
| logger.info("-" * 40) |
| try: |
| test_func() |
| except Exception as e: |
| self.log_test(test_name, False, f"Test framework error: {e}") |
| |
| |
| total_tests = len(self.test_results) |
| passed_tests = sum(1 for result in self.test_results if result['success']) |
| failed_tests = total_tests - passed_tests |
| |
| |
| summary = { |
| "total_tests": total_tests, |
| "passed": passed_tests, |
| "failed": failed_tests, |
| "success_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0, |
| "test_results": self.test_results, |
| "timestamp": time.time(), |
| "system_info": self.get_system_info() |
| } |
| |
| |
| self.save_test_results(summary) |
| |
| |
| logger.info("\n" + "=" * 60) |
| logger.info("TEST SUMMARY") |
| logger.info("=" * 60) |
| logger.info(f"Total Tests: {total_tests}") |
| logger.info(f"Passed: {passed_tests}") |
| logger.info(f"Failed: {failed_tests}") |
| logger.info(f"Success Rate: {summary['success_rate']:.1f}%") |
| |
| if failed_tests > 0: |
| logger.info("\nFailed Tests:") |
| for result in self.test_results: |
| if not result['success']: |
| logger.info(f" - {result['test']}: {result['message']}") |
| |
| logger.info("=" * 60) |
| |
| return summary |
| |
| def get_system_info(self) -> Dict[str, Any]: |
| """Get system information for test context.""" |
| try: |
| import psutil |
| |
| return { |
| "platform": sys.platform, |
| "python_version": sys.version, |
| "cpu_count": psutil.cpu_count(), |
| "memory_total": psutil.virtual_memory().total // (1024**3), |
| "disk_total": psutil.disk_usage('/').total // (1024**3), |
| "uptime": time.time() - psutil.boot_time() |
| } |
| except: |
| return { |
| "platform": sys.platform, |
| "python_version": sys.version |
| } |
| |
| def save_test_results(self, summary: Dict[str, Any]): |
| """Save test results to file.""" |
| try: |
| |
| with open('test_results_detailed.json', 'w') as f: |
| json.dump(summary, f, indent=2, default=str) |
| |
| |
| with open('test_summary.txt', 'w') as f: |
| f.write(f"GPU Monitoring System Test Results\n") |
| f.write(f"====================================\n") |
| f.write(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") |
| f.write(f"Total Tests: {summary['total_tests']}\n") |
| f.write(f"Passed: {summary['passed']}\n") |
| f.write(f"Failed: {summary['failed']}\n") |
| f.write(f"Success Rate: {summary['success_rate']:.1f}%\n\n") |
| |
| f.write("Detailed Results:\n") |
| f.write("================\n") |
| for result in summary['test_results']: |
| status = "PASS" if result['success'] else "FAIL" |
| f.write(f"{status}: {result['test']} - {result['message']}\n") |
| |
| logger.info("Test results saved to test_results_detailed.json and test_summary.txt") |
| |
| except Exception as e: |
| logger.error(f"Failed to save test results: {e}") |
|
|
|
|
| def main(): |
| """Main test execution.""" |
| print("GPU Monitoring System - Comprehensive Test Suite") |
| print("=" * 60) |
| print("This test suite will validate all components of the") |
| print("GPU monitoring and fan control system.") |
| print() |
| |
| |
| if os.geteuid() != 0: |
| print("WARNING: Not running as root. Some tests may fail due to permission issues.") |
| print("For complete testing, run: sudo python3 test_system.py") |
| print() |
| |
| |
| tester = SystemTester() |
| results = tester.run_all_tests() |
| |
| |
| if results['failed'] > 0: |
| print(f"\nSome tests failed. Please review the results and fix any issues.") |
| sys.exit(1) |
| else: |
| print(f"\nAll tests passed! The system is ready for use.") |
| sys.exit(0) |
|
|
|
|
| if __name__ == "__main__": |
| main() |