| import datetime |
| import functools |
| import inspect |
| import logging |
| import os |
| import shutil |
| import subprocess |
| from termcolor import colored |
|
|
|
|
| def add_fillers(text, filler="=", fill_side="both"): |
| terminal_width = shutil.get_terminal_size().columns |
| text = text.strip() |
| text_width = len(text) |
| if text_width >= terminal_width: |
| return text |
|
|
| if fill_side[0].lower() == "b": |
| leading_fill_str = filler * ((terminal_width - text_width) // 2 - 1) + " " |
| trailing_fill_str = " " + filler * ( |
| terminal_width - text_width - len(leading_fill_str) - 1 |
| ) |
| elif fill_side[0].lower() == "l": |
| leading_fill_str = filler * (terminal_width - text_width - 1) + " " |
| trailing_fill_str = "" |
| elif fill_side[0].lower() == "r": |
| leading_fill_str = "" |
| trailing_fill_str = " " + filler * (terminal_width - text_width - 1) |
| else: |
| raise ValueError("Invalid fill_side") |
|
|
| filled_str = f"{leading_fill_str}{text}{trailing_fill_str}" |
| return filled_str |
|
|
|
|
| class OSLogger(logging.Logger): |
| LOG_METHODS = { |
| "err": ("error", "red"), |
| "warn": ("warning", "light_red"), |
| "note": ("info", "light_magenta"), |
| "mesg": ("info", "light_cyan"), |
| "file": ("info", "light_blue"), |
| "line": ("info", "white"), |
| "success": ("info", "light_green"), |
| "fail": ("info", "light_red"), |
| "back": ("debug", "light_cyan"), |
| } |
| INDENT_METHODS = [ |
| "indent", |
| "set_indent", |
| "reset_indent", |
| "store_indent", |
| "restore_indent", |
| "log_indent", |
| ] |
| LEVEL_METHODS = [ |
| "set_level", |
| "store_level", |
| "restore_level", |
| "quiet", |
| "enter_quiet", |
| "exit_quiet", |
| ] |
| LEVEL_NAMES = { |
| "critical": logging.CRITICAL, |
| "error": logging.ERROR, |
| "warning": logging.WARNING, |
| "info": logging.INFO, |
| "debug": logging.DEBUG, |
| } |
|
|
| def __init__(self, name=None, prefix=False): |
| if not name: |
| frame = inspect.stack()[1] |
| module = inspect.getmodule(frame[0]) |
| name = module.__name__ |
|
|
| super().__init__(name) |
| self.setLevel(logging.INFO) |
|
|
| if prefix: |
| formatter_prefix = "[%(asctime)s] - [%(name)s] - [%(levelname)s]\n" |
| else: |
| formatter_prefix = "" |
|
|
| self.formatter = logging.Formatter(formatter_prefix + "%(message)s") |
|
|
| stream_handler = logging.StreamHandler() |
| stream_handler.setLevel(logging.INFO) |
| stream_handler.setFormatter(self.formatter) |
| self.addHandler(stream_handler) |
|
|
| self.log_indent = 0 |
| self.log_indents = [] |
|
|
| self.log_level = "info" |
| self.log_levels = [] |
|
|
| def indent(self, indent=2): |
| self.log_indent += indent |
|
|
| def set_indent(self, indent=2): |
| self.log_indent = indent |
|
|
| def reset_indent(self): |
| self.log_indent = 0 |
|
|
| def store_indent(self): |
| self.log_indents.append(self.log_indent) |
|
|
| def restore_indent(self): |
| self.log_indent = self.log_indents.pop(-1) |
|
|
| def set_level(self, level): |
| self.log_level = level |
| self.setLevel(self.LEVEL_NAMES[level]) |
|
|
| def store_level(self): |
| self.log_levels.append(self.log_level) |
|
|
| def restore_level(self): |
| self.log_level = self.log_levels.pop(-1) |
| self.set_level(self.log_level) |
|
|
| def quiet(self): |
| self.set_level("critical") |
|
|
| def enter_quiet(self, quiet=False): |
| if quiet: |
| self.store_level() |
| self.quiet() |
|
|
| def exit_quiet(self, quiet=False): |
| if quiet: |
| self.restore_level() |
|
|
| def log( |
| self, |
| level, |
| color, |
| msg, |
| indent=0, |
| fill=False, |
| fill_side="both", |
| end="\n", |
| *args, |
| **kwargs, |
| ): |
| if type(msg) == str: |
| msg_str = msg |
| else: |
| msg_str = repr(msg) |
| quotes = ["'", '"'] |
| if msg_str[0] in quotes and msg_str[-1] in quotes: |
| msg_str = msg_str[1:-1] |
|
|
| indent_str = " " * (self.log_indent + indent) |
| indented_msg = "\n".join([indent_str + line for line in msg_str.split("\n")]) |
|
|
| if fill: |
| indented_msg = add_fillers(indented_msg, fill_side=fill_side) |
|
|
| handler = self.handlers[0] |
| handler.terminator = end |
|
|
| getattr(self, level)(colored(indented_msg, color), *args, **kwargs) |
|
|
| def route_log(self, method, msg, *args, **kwargs): |
| level, method = method |
| functools.partial(self.log, level, method, msg)(*args, **kwargs) |
|
|
| def err(self, msg: str = "", *args, **kwargs): |
| self.route_log(("error", "red"), msg, *args, **kwargs) |
|
|
| def warn(self, msg: str = "", *args, **kwargs): |
| self.route_log(("warning", "light_red"), msg, *args, **kwargs) |
|
|
| def note(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "light_magenta"), msg, *args, **kwargs) |
|
|
| def mesg(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "light_cyan"), msg, *args, **kwargs) |
|
|
| def file(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "light_blue"), msg, *args, **kwargs) |
|
|
| def line(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "white"), msg, *args, **kwargs) |
|
|
| def success(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "light_green"), msg, *args, **kwargs) |
|
|
| def fail(self, msg: str = "", *args, **kwargs): |
| self.route_log(("info", "light_red"), msg, *args, **kwargs) |
|
|
| def back(self, msg: str = "", *args, **kwargs): |
| self.route_log(("debug", "light_cyan"), msg, *args, **kwargs) |
|
|
|
|
| logger = OSLogger() |
|
|
|
|
| def shell_cmd(cmd, getoutput=False, showcmd=True, env=None): |
| if showcmd: |
| logger.info(colored(f"\n$ [{os.getcwd()}]", "light_blue")) |
| logger.info(colored(f" $ {cmd}\n", "light_cyan")) |
| if getoutput: |
| output = subprocess.getoutput(cmd, env=env) |
| return output |
| else: |
| subprocess.run(cmd, shell=True, env=env) |
|
|
|
|
| class Runtimer: |
| def __enter__(self): |
| self.t1, _ = self.start_time() |
| return self |
|
|
| def __exit__(self, exc_type, exc_value, traceback): |
| self.t2, _ = self.end_time() |
| self.elapsed_time(self.t2 - self.t1) |
|
|
| def start_time(self): |
| t1 = datetime.datetime.now() |
| self.logger_time("start", t1) |
| return t1, self.time2str(t1) |
|
|
| def end_time(self): |
| t2 = datetime.datetime.now() |
| self.logger_time("end", t2) |
| return t2, self.time2str(t2) |
|
|
| def elapsed_time(self, dt=None): |
| if dt is None: |
| dt = self.t2 - self.t1 |
| self.logger_time("elapsed", dt) |
| return dt, self.time2str(dt) |
|
|
| def logger_time(self, time_type, t): |
| time_types = { |
| "start": "Start", |
| "end": "End", |
| "elapsed": "Elapsed", |
| } |
| time_str = add_fillers( |
| colored( |
| f"{time_types[time_type]} time: [ {self.time2str(t)} ]", |
| "light_magenta", |
| ), |
| fill_side="both", |
| ) |
| logger.line(time_str) |
|
|
| |
| def time2str(self, t): |
| datetime_str_format = "%Y-%m-%d %H:%M:%S" |
| if isinstance(t, datetime.datetime): |
| return t.strftime(datetime_str_format) |
| elif isinstance(t, datetime.timedelta): |
| hours = t.seconds // 3600 |
| hour_str = f"{hours} hr" if hours > 0 else "" |
| minutes = (t.seconds // 60) % 60 |
| minute_str = f"{minutes:>2} min" if minutes > 0 else "" |
| seconds = t.seconds % 60 |
| second_str = f"{seconds:>2} s" |
| time_str = " ".join([hour_str, minute_str, second_str]).strip() |
| return time_str |
| else: |
| return str(t) |
|
|