File size: 7,588 Bytes
69800cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | import os
import sys
import time
import psutil
import logging
import threading
from typing import Optional
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class EmergencyMonitor:
"""Monitor for detecting training hangs and generating emergency reports."""
def __init__(self, check_interval=300, report_dir=None):
"""
Initialize the emergency monitor.
Args:
check_interval (int): Interval between checks in seconds (default: 300s)
report_dir (str): Directory to save emergency reports (default: emergency_reports in same directory as this file)
"""
self.check_interval = check_interval
self.stop_event = threading.Event()
self.last_activity_time = time.time()
self.monitor_thread = None
# Use the provided report directory or create emergency_reports in the same directory as this file
if report_dir:
self.report_dir = os.path.abspath(report_dir) # Use absolute path
else:
# Get the directory where this script resides
current_dir = os.path.dirname(os.path.abspath(__file__))
self.report_dir = os.path.abspath(os.path.join(current_dir, "emergency_reports"))
# Ensure the directory exists
os.makedirs(self.report_dir, exist_ok=True)
# Add enhanced logging with working directory for debugging
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Emergency reports will be saved to absolute path: {self.report_dir}")
def start_monitoring(self):
"""Start the monitoring thread."""
self.stop_event.clear()
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info(f"Emergency monitoring started with interval {self.check_interval}s")
def stop_monitoring(self):
"""Stop the monitoring thread."""
if self.monitor_thread:
self.stop_event.set()
self.monitor_thread.join(timeout=5)
logger.info("Emergency monitoring stopped")
def _monitor_loop(self):
"""Main monitoring loop."""
while not self.stop_event.is_set():
current_time = time.time()
if current_time - self.last_activity_time > self.check_interval:
# No activity detected for too long, create emergency report
self._create_emergency_report()
# Update time after creating report
self.last_activity_time = current_time
# Sleep for 1/10 of the check interval, checking for stop event frequently
self.stop_event.wait(self.check_interval / 10)
def update_activity(self):
"""Update the last activity time."""
self.last_activity_time = time.time()
def _create_emergency_report(self):
"""Create an emergency report when training appears to be hanging."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_dir = os.path.abspath(self.report_dir)
report_path = os.path.join(report_dir, f"emergency_report_{timestamp}.txt")
logger.info(f"Creating emergency report at: {report_path}")
try:
# Create directory again just to be sure it exists
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as f:
# Write report header
f.write(f"Emergency report generated at {timestamp}\n")
f.write(f"System has been inactive for {self.check_interval} seconds\n\n")
# System information
f.write("=== SYSTEM INFORMATION ===\n")
mem = psutil.virtual_memory()
f.write(f"Memory: {mem.percent}% used ({mem.used / (1024**3):.2f} GB / {mem.total / (1024**3):.2f} GB)\n")
f.write(f"CPU usage: {psutil.cpu_percent(interval=1)}%\n")
# Process information
process = psutil.Process()
f.write(f"Process memory: {process.memory_info().rss / (1024**3):.2f} GB\n")
f.write(f"Process CPU: {process.cpu_percent(interval=1)}%\n")
f.write(f"Process creation time: {datetime.fromtimestamp(process.create_time()).strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Thread information
current_frames = sys._current_frames()
f.write("=== THREAD INFORMATION ===\n")
for thread_id, frame in current_frames.items():
f.write(f"\nThread ID: {thread_id}\n")
import traceback
stack_trace = ''.join(traceback.format_stack(frame))
f.write(stack_trace)
# GPU information if available
f.write("\n=== GPU INFORMATION ===\n")
try:
import torch
if torch.cuda.is_available():
device_count = torch.cuda.device_count()
f.write(f"CUDA devices: {device_count}\n")
for i in range(device_count):
f.write(f"GPU {i}: {torch.cuda.get_device_name(i)}\n")
f.write(f" Memory allocated: {torch.cuda.memory_allocated(i) / (1024**3):.2f} GB\n")
f.write(f" Memory reserved: {torch.cuda.memory_reserved(i) / (1024**3):.2f} GB\n")
else:
f.write("CUDA not available\n")
except ImportError:
f.write("PyTorch not available for GPU information\n")
except Exception as e:
f.write(f"Error getting GPU information: {str(e)}\n")
logger.warning(f"Created emergency report at {report_path}")
except Exception as e:
logger.error(f"Failed to create emergency report: {str(e)}")
# Try fallback location if the original path fails
try:
# Use script directory as fallback
script_dir = os.path.dirname(os.path.abspath(__file__))
fallback_path = os.path.join(script_dir, f"emergency_report_{timestamp}.txt")
with open(fallback_path, "w") as f:
f.write(f"FALLBACK Emergency report generated at {timestamp}\n")
f.write(f"Original path failed: {report_path}\n")
f.write(f"Error: {str(e)}\n")
logger.warning(f"Created fallback emergency report at {fallback_path}")
except Exception as fallback_error:
logger.error(f"Failed to create fallback report: {str(fallback_error)}")
# For backward compatibility, alias TrainingMonitor to EmergencyMonitor
TrainingMonitor = EmergencyMonitor
# Usage example in train_stdp.py:
# from STDP_Communicator.emergency_monitor import TrainingMonitor
# monitor = TrainingMonitor().start()
# try:
# # training code here
# monitor.update_activity() # Call this periodically
# finally:
# monitor.stop()
|