ADAM / env /process_monitor.py
YuShu
Initial commit
36ba3ef
import time
import re
import warnings
from typing import List
import psutil
import subprocess
import logging
import threading
import utils as U
class SubprocessMonitor:
def __init__(
self,
commands: List[str],
name: str,
ready_match: str = r".*",
log_path: str = "logs",
callback_match: str = r"^(?!x)x$", # regex that will never match
callback: callable = None,
finished_callback: callable = None,
):
self.commands = commands
start_time = time.strftime("%Y%m%d_%H%M%S")
self.name = name
self.logger = logging.getLogger(name)
handler = logging.FileHandler(U.f_join(log_path, f"{start_time}.log"))
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self.process = None
self.ready_match = ready_match
self.ready_event = None
self.ready_line = None
self.callback_match = callback_match
self.callback = callback
self.finished_callback = finished_callback
self.thread = None
def _start(self):
self.logger.info(f"Starting subprocess with commands: {self.commands}")
self.process = psutil.Popen(
self.commands,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
print(f"Subprocess {self.name} started with PID {self.process.pid}.")
for line in iter(self.process.stdout.readline, ""):
self.logger.info(line.strip())
if re.search(self.ready_match, line):
self.ready_line = line
self.logger.info("Subprocess is ready.")
self.ready_event.set()
if re.search(self.callback_match, line):
self.callback()
if not self.ready_event.is_set():
self.ready_event.set()
warnings.warn(f"Subprocess {self.name} failed to start.")
if self.finished_callback:
self.finished_callback()
def run(self):
self.ready_event = threading.Event()
self.ready_line = None
self.thread = threading.Thread(target=self._start)
self.thread.start()
self.ready_event.wait()
def stop(self):
self.logger.info("Stopping subprocess.")
if self.process and self.process.is_running():
self.process.terminate()
self.process.wait()
@property
def is_running(self):
if self.process is None:
return False
return self.process.is_running()