import os import sys import time import psutil import logging import threading from typing import Optional from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class EmergencyMonitor: """Monitor for detecting training hangs and generating emergency reports.""" def __init__(self, check_interval=300, report_dir=None): """ Initialize the emergency monitor. Args: check_interval (int): Interval between checks in seconds (default: 300s) report_dir (str): Directory to save emergency reports (default: emergency_reports in same directory as this file) """ self.check_interval = check_interval self.stop_event = threading.Event() self.last_activity_time = time.time() self.monitor_thread = None # Use the provided report directory or create emergency_reports in the same directory as this file if report_dir: self.report_dir = os.path.abspath(report_dir) # Use absolute path else: # Get the directory where this script resides current_dir = os.path.dirname(os.path.abspath(__file__)) self.report_dir = os.path.abspath(os.path.join(current_dir, "emergency_reports")) # Ensure the directory exists os.makedirs(self.report_dir, exist_ok=True) # Add enhanced logging with working directory for debugging logger.info(f"Current working directory: {os.getcwd()}") logger.info(f"Emergency reports will be saved to absolute path: {self.report_dir}") def start_monitoring(self): """Start the monitoring thread.""" self.stop_event.clear() self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() logger.info(f"Emergency monitoring started with interval {self.check_interval}s") def stop_monitoring(self): """Stop the monitoring thread.""" if self.monitor_thread: self.stop_event.set() self.monitor_thread.join(timeout=5) logger.info("Emergency monitoring stopped") def _monitor_loop(self): """Main monitoring loop.""" while not self.stop_event.is_set(): current_time = time.time() if current_time - self.last_activity_time > self.check_interval: # No activity detected for too long, create emergency report self._create_emergency_report() # Update time after creating report self.last_activity_time = current_time # Sleep for 1/10 of the check interval, checking for stop event frequently self.stop_event.wait(self.check_interval / 10) def update_activity(self): """Update the last activity time.""" self.last_activity_time = time.time() def _create_emergency_report(self): """Create an emergency report when training appears to be hanging.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_dir = os.path.abspath(self.report_dir) report_path = os.path.join(report_dir, f"emergency_report_{timestamp}.txt") logger.info(f"Creating emergency report at: {report_path}") try: # Create directory again just to be sure it exists os.makedirs(os.path.dirname(report_path), exist_ok=True) with open(report_path, "w") as f: # Write report header f.write(f"Emergency report generated at {timestamp}\n") f.write(f"System has been inactive for {self.check_interval} seconds\n\n") # System information f.write("=== SYSTEM INFORMATION ===\n") mem = psutil.virtual_memory() f.write(f"Memory: {mem.percent}% used ({mem.used / (1024**3):.2f} GB / {mem.total / (1024**3):.2f} GB)\n") f.write(f"CPU usage: {psutil.cpu_percent(interval=1)}%\n") # Process information process = psutil.Process() f.write(f"Process memory: {process.memory_info().rss / (1024**3):.2f} GB\n") f.write(f"Process CPU: {process.cpu_percent(interval=1)}%\n") f.write(f"Process creation time: {datetime.fromtimestamp(process.create_time()).strftime('%Y-%m-%d %H:%M:%S')}\n\n") # Thread information current_frames = sys._current_frames() f.write("=== THREAD INFORMATION ===\n") for thread_id, frame in current_frames.items(): f.write(f"\nThread ID: {thread_id}\n") import traceback stack_trace = ''.join(traceback.format_stack(frame)) f.write(stack_trace) # GPU information if available f.write("\n=== GPU INFORMATION ===\n") try: import torch if torch.cuda.is_available(): device_count = torch.cuda.device_count() f.write(f"CUDA devices: {device_count}\n") for i in range(device_count): f.write(f"GPU {i}: {torch.cuda.get_device_name(i)}\n") f.write(f" Memory allocated: {torch.cuda.memory_allocated(i) / (1024**3):.2f} GB\n") f.write(f" Memory reserved: {torch.cuda.memory_reserved(i) / (1024**3):.2f} GB\n") else: f.write("CUDA not available\n") except ImportError: f.write("PyTorch not available for GPU information\n") except Exception as e: f.write(f"Error getting GPU information: {str(e)}\n") logger.warning(f"Created emergency report at {report_path}") except Exception as e: logger.error(f"Failed to create emergency report: {str(e)}") # Try fallback location if the original path fails try: # Use script directory as fallback script_dir = os.path.dirname(os.path.abspath(__file__)) fallback_path = os.path.join(script_dir, f"emergency_report_{timestamp}.txt") with open(fallback_path, "w") as f: f.write(f"FALLBACK Emergency report generated at {timestamp}\n") f.write(f"Original path failed: {report_path}\n") f.write(f"Error: {str(e)}\n") logger.warning(f"Created fallback emergency report at {fallback_path}") except Exception as fallback_error: logger.error(f"Failed to create fallback report: {str(fallback_error)}") # For backward compatibility, alias TrainingMonitor to EmergencyMonitor TrainingMonitor = EmergencyMonitor # Usage example in train_stdp.py: # from STDP_Communicator.emergency_monitor import TrainingMonitor # monitor = TrainingMonitor().start() # try: # # training code here # monitor.update_activity() # Call this periodically # finally: # monitor.stop()