File size: 19,521 Bytes
85b135b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
#!/usr/bin/env python3
"""
Comprehensive Test and Validation Script

Tests all components of the GPU monitoring and fan control system
to ensure proper functionality and integration.
"""

import sys
import os
import time
import json
import logging
import subprocess
import threading
from pathlib import Path
from typing import Dict, List, Any, Optional

# Add the project directory to Python path
sys.path.insert(0, str(Path(__file__).parent))

from gpu_monitoring import GPUManager, GPUStatus
from gpu_fan_controller import FanController, FanMode, ProfileType
from alert_system import AlertManager, AlertThreshold
from performance_optimizer import SystemOptimizer
from web_interface import app as web_app

logger = logging.getLogger(__name__)


class SystemTester:
    """Comprehensive system tester."""
    
    def __init__(self):
        self.test_results = []
        self.gpu_manager = None
        self.fan_controller = None
        self.alert_manager = None
        self.optimizer = None
        
        # Setup logging
        self.setup_logging()
    
    def setup_logging(self):
        """Setup test logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('test_results.log'),
                logging.StreamHandler(sys.stdout)
            ]
        )
    
    def log_test(self, test_name: str, success: bool, message: str = ""):
        """Log test result."""
        status = "PASS" if success else "FAIL"
        logger.info(f"[{status}] {test_name}: {message}")
        self.test_results.append({
            "test": test_name,
            "success": success,
            "message": message,
            "timestamp": time.time()
        })
    
    def test_gpu_detection(self) -> bool:
        """Test GPU detection and initialization."""
        try:
            self.gpu_manager = GPUManager()
            success = self.gpu_manager.initialize()
            
            if success:
                gpus = self.gpu_manager.get_gpu_list()
                gpu_info = self.gpu_manager.get_gpu_info()
                
                self.log_test(
                    "GPU Detection",
                    True,
                    f"Found {len(gpus)} GPU(s): {', '.join(gpus)}"
                )
                
                # Test status collection
                status = self.gpu_manager.get_status()
                if status:
                    self.log_test(
                        "GPU Status Collection",
                        True,
                        f"Successfully collected status for {len(status)} GPU(s)"
                    )
                    return True
                else:
                    self.log_test("GPU Status Collection", False, "No status data collected")
                    return False
            else:
                self.log_test("GPU Detection", False, "Failed to initialize GPU manager")
                return False
                
        except Exception as e:
            self.log_test("GPU Detection", False, f"Exception: {e}")
            return False
    
    def test_fan_control(self) -> bool:
        """Test fan control functionality."""
        try:
            self.fan_controller = FanController()
            success = self.fan_controller.initialize()
            
            if success:
                # Test profile management
                profiles = self.fan_controller.get_profiles()
                self.log_test(
                    "Fan Profiles",
                    True,
                    f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}"
                )
                
                # Test profile switching
                if "balanced" in profiles:
                    success = self.fan_controller.set_profile("balanced")
                    self.log_test("Profile Switching", success, "Switched to balanced profile")
                
                # Test manual control
                self.fan_controller.set_manual_pwm(100)
                time.sleep(1)  # Allow time for change
                
                status = self.fan_controller.get_status()
                if status and status.manual_override:
                    self.log_test("Manual Control", True, "Manual PWM control working")
                else:
                    self.log_test("Manual Control", False, "Manual control not working")
                
                return True
            else:
                self.log_test("Fan Control", False, "Failed to initialize fan controller")
                return False
                
        except Exception as e:
            self.log_test("Fan Control", False, f"Exception: {e}")
            return False
    
    def test_alert_system(self) -> bool:
        """Test alert system functionality."""
        try:
            self.alert_manager = AlertManager()
            
            # Test threshold management
            threshold = AlertThreshold(
                metric="temperature",
                threshold=50.0,
                operator=">=",
                duration=5,
                enabled=True,
                cooldown=60
            )
            
            self.alert_manager.add_threshold(threshold)
            self.log_test("Alert Thresholds", True, "Successfully added alert threshold")
            
            # Test alert history
            history = self.alert_manager.get_alert_history(1)
            self.log_test("Alert History", True, f"Retrieved {len(history)} alerts from history")
            
            return True
            
        except Exception as e:
            self.log_test("Alert System", False, f"Exception: {e}")
            return False
    
    def test_performance_optimizer(self) -> bool:
        """Test performance optimization system."""
        try:
            self.optimizer = SystemOptimizer()
            
            # Test profile management
            profiles = self.optimizer.profiles
            self.log_test(
                "Optimization Profiles",
                True,
                f"Loaded {len(profiles)} profiles: {', '.join(profiles.keys())}"
            )
            
            # Test profile application
            if "balanced" in profiles:
                success = self.optimizer.apply_profile("balanced")
                self.log_test("Profile Application", success, "Applied balanced profile")
            
            # Test performance analytics
            analytics = self.optimizer.get_performance_analytics(1)
            if "system" in analytics:
                self.log_test("Performance Analytics", True, "Successfully generated analytics")
            else:
                self.log_test("Performance Analytics", False, "Failed to generate analytics")
            
            return True
            
        except Exception as e:
            self.log_test("Performance Optimizer", False, f"Exception: {e}")
            return False
    
    def test_web_interface(self) -> bool:
        """Test web interface components."""
        try:
            # Test API endpoints
            with web_app.test_client() as client:
                # Test status endpoint
                response = client.get('/api/status')
                if response.status_code == 200:
                    self.log_test("Web API - Status", True, "Status endpoint working")
                else:
                    self.log_test("Web API - Status", False, f"Status endpoint failed: {response.status_code}")
                
                # Test GPU list endpoint
                response = client.get('/api/gpus')
                if response.status_code == 200:
                    self.log_test("Web API - GPUs", True, "GPU list endpoint working")
                else:
                    self.log_test("Web API - GPUs", False, f"GPU list endpoint failed: {response.status_code}")
                
                # Test fan profiles endpoint
                response = client.get('/api/fan/profiles')
                if response.status_code == 200:
                    self.log_test("Web API - Fan Profiles", True, "Fan profiles endpoint working")
                else:
                    self.log_test("Web API - Fan Profiles", False, f"Fan profiles endpoint failed: {response.status_code}")
            
            return True
            
        except Exception as e:
            self.log_test("Web Interface", False, f"Exception: {e}")
            return False
    
    def test_data_persistence(self) -> bool:
        """Test data persistence and database functionality."""
        try:
            if not self.gpu_manager:
                self.gpu_manager = GPUManager()
                self.gpu_manager.initialize()
            
            # Test data collection and storage
            status = self.gpu_manager.get_status()
            if status:
                # Test historical data retrieval
                for gpu_name in self.gpu_manager.get_gpu_list():
                    history = self.gpu_manager.get_historical_data(gpu_name, 1)
                    self.log_test(
                        f"Data Persistence - {gpu_name}",
                        True,
                        f"Retrieved {len(history)} historical records"
                    )
            
            return True
            
        except Exception as e:
            self.log_test("Data Persistence", False, f"Exception: {e}")
            return False
    
    def test_system_integration(self) -> bool:
        """Test system integration and component interaction."""
        try:
            # Test that all components can work together
            components_working = []
            
            if self.gpu_manager and self.gpu_manager.get_gpu_list():
                components_working.append("GPU Manager")
            
            if self.fan_controller and self.fan_controller.get_profiles():
                components_working.append("Fan Controller")
            
            if self.alert_manager:
                components_working.append("Alert Manager")
            
            if self.optimizer and self.optimizer.profiles:
                components_working.append("Performance Optimizer")
            
            success = len(components_working) >= 3  # At least 3 components should work
            self.log_test(
                "System Integration",
                success,
                f"Working components: {', '.join(components_working)}"
            )
            
            return success
            
        except Exception as e:
            self.log_test("System Integration", False, f"Exception: {e}")
            return False
    
    def test_configuration_files(self) -> bool:
        """Test configuration file loading and validation."""
        try:
            config_files = [
                "config/fan_profiles.json",
                "config/monitoring.json",
                "config/alerts.json",
                "config/optimization.json"
            ]
            
            valid_configs = 0
            for config_file in config_files:
                if Path(config_file).exists():
                    try:
                        with open(config_file, 'r') as f:
                            json.load(f)
                        valid_configs += 1
                    except json.JSONDecodeError:
                        self.log_test(f"Config Validation - {config_file}", False, "Invalid JSON")
                else:
                    self.log_test(f"Config File - {config_file}", False, "File not found")
            
            success = valid_configs == len(config_files)
            self.log_test(
                "Configuration Files",
                success,
                f"Valid configs: {valid_configs}/{len(config_files)}"
            )
            
            return success
            
        except Exception as e:
            self.log_test("Configuration Files", False, f"Exception: {e}")
            return False
    
    def test_permissions(self) -> bool:
        """Test system permissions for GPU access."""
        try:
            # Check if we can access GPU sysfs
            gpu_paths = []
            for card_path in Path("/sys/class/drm").glob("card*"):
                device_path = card_path / "device"
                if (device_path / "vendor").exists():
                    with open(device_path / "vendor", 'r') as f:
                        if f.read().strip() == "0x1002":  # AMD
                            gpu_paths.append(str(device_path))
            
            if gpu_paths:
                # Check hwmon access
                hwmon_accessible = 0
                for device_path in gpu_paths:
                    hwmon_path = Path(device_path) / "hwmon"
                    if hwmon_path.exists():
                        hwmons = list(hwmon_path.glob("*"))
                        if hwmons:
                            # Test read access
                            temp_file = hwmons[0] / "temp1_input"
                            if temp_file.exists():
                                try:
                                    with open(temp_file, 'r') as f:
                                        f.read()
                                    hwmon_accessible += 1
                                except PermissionError:
                                    pass
                
                success = hwmon_accessible > 0
                self.log_test(
                    "Permissions",
                    success,
                    f"Accessible hwmon devices: {hwmon_accessible}/{len(gpu_paths)}"
                )
                return success
            else:
                self.log_test("Permissions", False, "No AMD GPUs detected")
                return False
                
        except Exception as e:
            self.log_test("Permissions", False, f"Exception: {e}")
            return False
    
    def run_all_tests(self) -> Dict[str, Any]:
        """Run all tests and return comprehensive results."""
        logger.info("=" * 60)
        logger.info("GPU Monitoring System - Comprehensive Test Suite")
        logger.info("=" * 60)
        
        # Run individual tests
        tests = [
            ("Configuration Files", self.test_configuration_files),
            ("Permissions", self.test_permissions),
            ("GPU Detection", self.test_gpu_detection),
            ("Fan Control", self.test_fan_control),
            ("Alert System", self.test_alert_system),
            ("Performance Optimizer", self.test_performance_optimizer),
            ("Web Interface", self.test_web_interface),
            ("Data Persistence", self.test_data_persistence),
            ("System Integration", self.test_system_integration),
        ]
        
        for test_name, test_func in tests:
            logger.info(f"\nRunning: {test_name}")
            logger.info("-" * 40)
            try:
                test_func()
            except Exception as e:
                self.log_test(test_name, False, f"Test framework error: {e}")
        
        # Calculate results
        total_tests = len(self.test_results)
        passed_tests = sum(1 for result in self.test_results if result['success'])
        failed_tests = total_tests - passed_tests
        
        # Generate summary
        summary = {
            "total_tests": total_tests,
            "passed": passed_tests,
            "failed": failed_tests,
            "success_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
            "test_results": self.test_results,
            "timestamp": time.time(),
            "system_info": self.get_system_info()
        }
        
        # Save results
        self.save_test_results(summary)
        
        # Print summary
        logger.info("\n" + "=" * 60)
        logger.info("TEST SUMMARY")
        logger.info("=" * 60)
        logger.info(f"Total Tests: {total_tests}")
        logger.info(f"Passed: {passed_tests}")
        logger.info(f"Failed: {failed_tests}")
        logger.info(f"Success Rate: {summary['success_rate']:.1f}%")
        
        if failed_tests > 0:
            logger.info("\nFailed Tests:")
            for result in self.test_results:
                if not result['success']:
                    logger.info(f"  - {result['test']}: {result['message']}")
        
        logger.info("=" * 60)
        
        return summary
    
    def get_system_info(self) -> Dict[str, Any]:
        """Get system information for test context."""
        try:
            import psutil
            
            return {
                "platform": sys.platform,
                "python_version": sys.version,
                "cpu_count": psutil.cpu_count(),
                "memory_total": psutil.virtual_memory().total // (1024**3),  # GB
                "disk_total": psutil.disk_usage('/').total // (1024**3),  # GB
                "uptime": time.time() - psutil.boot_time()
            }
        except:
            return {
                "platform": sys.platform,
                "python_version": sys.version
            }
    
    def save_test_results(self, summary: Dict[str, Any]):
        """Save test results to file."""
        try:
            # Save detailed results
            with open('test_results_detailed.json', 'w') as f:
                json.dump(summary, f, indent=2, default=str)
            
            # Save summary
            with open('test_summary.txt', 'w') as f:
                f.write(f"GPU Monitoring System Test Results\n")
                f.write(f"====================================\n")
                f.write(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
                f.write(f"Total Tests: {summary['total_tests']}\n")
                f.write(f"Passed: {summary['passed']}\n")
                f.write(f"Failed: {summary['failed']}\n")
                f.write(f"Success Rate: {summary['success_rate']:.1f}%\n\n")
                
                f.write("Detailed Results:\n")
                f.write("================\n")
                for result in summary['test_results']:
                    status = "PASS" if result['success'] else "FAIL"
                    f.write(f"{status}: {result['test']} - {result['message']}\n")
            
            logger.info("Test results saved to test_results_detailed.json and test_summary.txt")
            
        except Exception as e:
            logger.error(f"Failed to save test results: {e}")


def main():
    """Main test execution."""
    print("GPU Monitoring System - Comprehensive Test Suite")
    print("=" * 60)
    print("This test suite will validate all components of the")
    print("GPU monitoring and fan control system.")
    print()
    
    # Check if running as root for full functionality
    if os.geteuid() != 0:
        print("WARNING: Not running as root. Some tests may fail due to permission issues.")
        print("For complete testing, run: sudo python3 test_system.py")
        print()
    
    # Run tests
    tester = SystemTester()
    results = tester.run_all_tests()
    
    # Exit with appropriate code
    if results['failed'] > 0:
        print(f"\nSome tests failed. Please review the results and fix any issues.")
        sys.exit(1)
    else:
        print(f"\nAll tests passed! The system is ready for use.")
        sys.exit(0)


if __name__ == "__main__":
    main()