|
|
import time |
|
|
import re |
|
|
import warnings |
|
|
from typing import List |
|
|
|
|
|
import psutil |
|
|
import subprocess |
|
|
import logging |
|
|
import threading |
|
|
|
|
|
import utils as U |
|
|
|
|
|
|
|
|
class SubprocessMonitor: |
|
|
def __init__( |
|
|
self, |
|
|
commands: List[str], |
|
|
name: str, |
|
|
ready_match: str = r".*", |
|
|
log_path: str = "logs", |
|
|
callback_match: str = r"^(?!x)x$", |
|
|
callback: callable = None, |
|
|
finished_callback: callable = None, |
|
|
): |
|
|
self.commands = commands |
|
|
start_time = time.strftime("%Y%m%d_%H%M%S") |
|
|
self.name = name |
|
|
self.logger = logging.getLogger(name) |
|
|
handler = logging.FileHandler(U.f_join(log_path, f"{start_time}.log")) |
|
|
formatter = logging.Formatter( |
|
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
|
) |
|
|
handler.setFormatter(formatter) |
|
|
self.logger.addHandler(handler) |
|
|
self.logger.setLevel(logging.INFO) |
|
|
self.process = None |
|
|
self.ready_match = ready_match |
|
|
self.ready_event = None |
|
|
self.ready_line = None |
|
|
self.callback_match = callback_match |
|
|
self.callback = callback |
|
|
self.finished_callback = finished_callback |
|
|
self.thread = None |
|
|
|
|
|
def _start(self): |
|
|
self.logger.info(f"Starting subprocess with commands: {self.commands}") |
|
|
|
|
|
self.process = psutil.Popen( |
|
|
self.commands, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.STDOUT, |
|
|
universal_newlines=True, |
|
|
) |
|
|
print(f"Subprocess {self.name} started with PID {self.process.pid}.") |
|
|
for line in iter(self.process.stdout.readline, ""): |
|
|
self.logger.info(line.strip()) |
|
|
if re.search(self.ready_match, line): |
|
|
self.ready_line = line |
|
|
self.logger.info("Subprocess is ready.") |
|
|
self.ready_event.set() |
|
|
if re.search(self.callback_match, line): |
|
|
self.callback() |
|
|
if not self.ready_event.is_set(): |
|
|
self.ready_event.set() |
|
|
warnings.warn(f"Subprocess {self.name} failed to start.") |
|
|
if self.finished_callback: |
|
|
self.finished_callback() |
|
|
|
|
|
def run(self): |
|
|
self.ready_event = threading.Event() |
|
|
self.ready_line = None |
|
|
self.thread = threading.Thread(target=self._start) |
|
|
self.thread.start() |
|
|
self.ready_event.wait() |
|
|
|
|
|
def stop(self): |
|
|
self.logger.info("Stopping subprocess.") |
|
|
if self.process and self.process.is_running(): |
|
|
self.process.terminate() |
|
|
self.process.wait() |
|
|
|
|
|
@property |
|
|
def is_running(self): |
|
|
if self.process is None: |
|
|
return False |
|
|
return self.process.is_running() |
|
|
|