WildnerveAI's picture
Upload 11 files
69800cb verified
import os
import sys
import time
import psutil
import logging
import threading
from typing import Optional
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class EmergencyMonitor:
"""Monitor for detecting training hangs and generating emergency reports."""
def __init__(self, check_interval=300, report_dir=None):
"""
Initialize the emergency monitor.
Args:
check_interval (int): Interval between checks in seconds (default: 300s)
report_dir (str): Directory to save emergency reports (default: emergency_reports in same directory as this file)
"""
self.check_interval = check_interval
self.stop_event = threading.Event()
self.last_activity_time = time.time()
self.monitor_thread = None
# Use the provided report directory or create emergency_reports in the same directory as this file
if report_dir:
self.report_dir = os.path.abspath(report_dir) # Use absolute path
else:
# Get the directory where this script resides
current_dir = os.path.dirname(os.path.abspath(__file__))
self.report_dir = os.path.abspath(os.path.join(current_dir, "emergency_reports"))
# Ensure the directory exists
os.makedirs(self.report_dir, exist_ok=True)
# Add enhanced logging with working directory for debugging
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Emergency reports will be saved to absolute path: {self.report_dir}")
def start_monitoring(self):
"""Start the monitoring thread."""
self.stop_event.clear()
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info(f"Emergency monitoring started with interval {self.check_interval}s")
def stop_monitoring(self):
"""Stop the monitoring thread."""
if self.monitor_thread:
self.stop_event.set()
self.monitor_thread.join(timeout=5)
logger.info("Emergency monitoring stopped")
def _monitor_loop(self):
"""Main monitoring loop."""
while not self.stop_event.is_set():
current_time = time.time()
if current_time - self.last_activity_time > self.check_interval:
# No activity detected for too long, create emergency report
self._create_emergency_report()
# Update time after creating report
self.last_activity_time = current_time
# Sleep for 1/10 of the check interval, checking for stop event frequently
self.stop_event.wait(self.check_interval / 10)
def update_activity(self):
"""Update the last activity time."""
self.last_activity_time = time.time()
def _create_emergency_report(self):
"""Create an emergency report when training appears to be hanging."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_dir = os.path.abspath(self.report_dir)
report_path = os.path.join(report_dir, f"emergency_report_{timestamp}.txt")
logger.info(f"Creating emergency report at: {report_path}")
try:
# Create directory again just to be sure it exists
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as f:
# Write report header
f.write(f"Emergency report generated at {timestamp}\n")
f.write(f"System has been inactive for {self.check_interval} seconds\n\n")
# System information
f.write("=== SYSTEM INFORMATION ===\n")
mem = psutil.virtual_memory()
f.write(f"Memory: {mem.percent}% used ({mem.used / (1024**3):.2f} GB / {mem.total / (1024**3):.2f} GB)\n")
f.write(f"CPU usage: {psutil.cpu_percent(interval=1)}%\n")
# Process information
process = psutil.Process()
f.write(f"Process memory: {process.memory_info().rss / (1024**3):.2f} GB\n")
f.write(f"Process CPU: {process.cpu_percent(interval=1)}%\n")
f.write(f"Process creation time: {datetime.fromtimestamp(process.create_time()).strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Thread information
current_frames = sys._current_frames()
f.write("=== THREAD INFORMATION ===\n")
for thread_id, frame in current_frames.items():
f.write(f"\nThread ID: {thread_id}\n")
import traceback
stack_trace = ''.join(traceback.format_stack(frame))
f.write(stack_trace)
# GPU information if available
f.write("\n=== GPU INFORMATION ===\n")
try:
import torch
if torch.cuda.is_available():
device_count = torch.cuda.device_count()
f.write(f"CUDA devices: {device_count}\n")
for i in range(device_count):
f.write(f"GPU {i}: {torch.cuda.get_device_name(i)}\n")
f.write(f" Memory allocated: {torch.cuda.memory_allocated(i) / (1024**3):.2f} GB\n")
f.write(f" Memory reserved: {torch.cuda.memory_reserved(i) / (1024**3):.2f} GB\n")
else:
f.write("CUDA not available\n")
except ImportError:
f.write("PyTorch not available for GPU information\n")
except Exception as e:
f.write(f"Error getting GPU information: {str(e)}\n")
logger.warning(f"Created emergency report at {report_path}")
except Exception as e:
logger.error(f"Failed to create emergency report: {str(e)}")
# Try fallback location if the original path fails
try:
# Use script directory as fallback
script_dir = os.path.dirname(os.path.abspath(__file__))
fallback_path = os.path.join(script_dir, f"emergency_report_{timestamp}.txt")
with open(fallback_path, "w") as f:
f.write(f"FALLBACK Emergency report generated at {timestamp}\n")
f.write(f"Original path failed: {report_path}\n")
f.write(f"Error: {str(e)}\n")
logger.warning(f"Created fallback emergency report at {fallback_path}")
except Exception as fallback_error:
logger.error(f"Failed to create fallback report: {str(fallback_error)}")
# For backward compatibility, alias TrainingMonitor to EmergencyMonitor
TrainingMonitor = EmergencyMonitor
# Usage example in train_stdp.py:
# from STDP_Communicator.emergency_monitor import TrainingMonitor
# monitor = TrainingMonitor().start()
# try:
# # training code here
# monitor.update_activity() # Call this periodically
# finally:
# monitor.stop()