|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
import os |
|
|
import time |
|
|
from concurrent.futures._base import Future |
|
|
from concurrent.futures.thread import ThreadPoolExecutor |
|
|
from threading import Event |
|
|
from typing import Dict, List, TextIO |
|
|
|
|
|
__all__ = ["tail_logfile", "TailLog"] |
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def tail_logfile( |
|
|
header: str, file: str, dst: TextIO, finished: Event, interval_sec: float |
|
|
): |
|
|
|
|
|
while not os.path.exists(file): |
|
|
if finished.is_set(): |
|
|
return |
|
|
time.sleep(interval_sec) |
|
|
|
|
|
with open(file, "r") as fp: |
|
|
while True: |
|
|
line = fp.readline() |
|
|
|
|
|
if line: |
|
|
dst.write(f"{header}{line}") |
|
|
else: |
|
|
if finished.is_set(): |
|
|
|
|
|
break |
|
|
else: |
|
|
|
|
|
|
|
|
time.sleep(interval_sec) |
|
|
|
|
|
|
|
|
class TailLog: |
|
|
""" |
|
|
Tails the given log files. The log files do not have to exist when the |
|
|
``start()`` method is called. The tail-er will gracefully wait until the |
|
|
log files are created by the producer and will tail the contents of the |
|
|
log files until the ``stop()`` method is called. |
|
|
|
|
|
.. warning:: ``TailLog`` will wait indefinitely for the log file to be created! |
|
|
|
|
|
Each log file's line will be suffixed with a header of the form: ``[{name}{idx}]:``, |
|
|
where the ``name`` is user-provided and ``idx`` is the index of the log file |
|
|
in the ``log_files`` mapping. |
|
|
|
|
|
Usage: |
|
|
|
|
|
:: |
|
|
|
|
|
log_files = {0: "/tmp/0_stdout.log", 1: "/tmp/1_stdout.log"} |
|
|
tailer = TailLog("trainer", log_files, sys.stdout).start() |
|
|
# actually run the trainers to produce 0_stdout.log and 1_stdout.log |
|
|
run_trainers() |
|
|
tailer.stop() |
|
|
|
|
|
# once run_trainers() start writing the ##_stdout.log files |
|
|
# the tailer will print to sys.stdout: |
|
|
# >>> [trainer0]:log_line1 |
|
|
# >>> [trainer1]:log_line1 |
|
|
# >>> [trainer0]:log_line2 |
|
|
# >>> [trainer0]:log_line3 |
|
|
# >>> [trainer1]:log_line2 |
|
|
|
|
|
.. note:: Due to buffering log lines between files may not necessarily |
|
|
be printed out in order. You should configure your application's |
|
|
logger to suffix each log line with a proper timestamp. |
|
|
|
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
name: str, |
|
|
log_files: Dict[int, str], |
|
|
dst: TextIO, |
|
|
interval_sec: float = 0.1, |
|
|
): |
|
|
n = len(log_files) |
|
|
self._threadpool = None |
|
|
if n > 0: |
|
|
self._threadpool = ThreadPoolExecutor( |
|
|
max_workers=n, |
|
|
thread_name_prefix=f"{self.__class__.__qualname__}_{name}", |
|
|
) |
|
|
|
|
|
self._name = name |
|
|
self._dst = dst |
|
|
self._log_files = log_files |
|
|
self._finished_events: Dict[int, Event] = { |
|
|
local_rank: Event() for local_rank in log_files.keys() |
|
|
} |
|
|
self._futs: List[Future] = [] |
|
|
self._interval_sec = interval_sec |
|
|
self._stopped = False |
|
|
|
|
|
def start(self) -> "TailLog": |
|
|
if not self._threadpool: |
|
|
return self |
|
|
|
|
|
for local_rank, file in self._log_files.items(): |
|
|
self._futs.append( |
|
|
self._threadpool.submit( |
|
|
tail_logfile, |
|
|
header=f"[{self._name}{local_rank}]:", |
|
|
file=file, |
|
|
dst=self._dst, |
|
|
finished=self._finished_events[local_rank], |
|
|
interval_sec=self._interval_sec, |
|
|
) |
|
|
) |
|
|
return self |
|
|
|
|
|
def stop(self) -> None: |
|
|
for finished in self._finished_events.values(): |
|
|
finished.set() |
|
|
|
|
|
for local_rank, f in enumerate(self._futs): |
|
|
try: |
|
|
f.result() |
|
|
except Exception as e: |
|
|
log.error( |
|
|
f"error in log tailor for {self._name}{local_rank}." |
|
|
f" {e.__class__.__qualname__}: {e}", |
|
|
) |
|
|
|
|
|
if self._threadpool: |
|
|
self._threadpool.shutdown(wait=True) |
|
|
|
|
|
self._stopped = True |
|
|
|
|
|
def stopped(self) -> bool: |
|
|
return self._stopped |
|
|
|