Spaces:
Sleeping
Sleeping
| # Standard Library Imports | |
| import datetime | |
| import threading | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| import enum | |
| # Third-Party Imports | |
| from loguru import logger | |
| logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files | |
| # Local Imports | |
| from .ansi_utils import ansi_color_str, ansi_link_str | |
| from .attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type | |
| from .table_utils import format_property, format_table | |
| from .extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow | |
| #from ..mew_log.mew_context import MewContext, resolve_context | |
| # Constants | |
| lastLog = None # Stores the most recent log entry | |
| debug_log = [] # Stores all log entries | |
| startTime=time.time() | |
| #settings vars | |
| enable_log = True | |
| enable_log_to_file = False | |
| log_filename = "log" | |
| log_file_path = "data/log/log.txt" | |
| enable_fancy = True | |
| enable_use_color = True | |
| type_color='yellow' | |
| value_color='bright_yellow' | |
| arrow_color = 'bright_white' | |
| def allow_curly_braces(original_string): | |
| if "{" in original_string or "}" in original_string: | |
| escaped_string = original_string.replace("{", "{{").replace("}", "}}") | |
| #print("Escaped String:", escaped_string) # Debug output | |
| return escaped_string | |
| return original_string | |
| def format_arg(arg, use_color, fancy): | |
| def is_complex_type(obj): | |
| return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__') | |
| type_str = f"<{type(arg).__name__}>" | |
| value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color) | |
| newline_if_needed = '\n' if is_complex_type(arg) else "" | |
| formatted_arg = f"{type_str}:{newline_if_needed}{value_str}" | |
| formatted_arg = allow_curly_braces(formatted_arg) | |
| return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg | |
| def format_kwarg(kw, arg, use_color, fancy): | |
| name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw | |
| arg_str = format_arg(arg, use_color, fancy) | |
| formatted_arg = f"{name_str}: {arg_str}" | |
| return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg | |
| def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs): | |
| formatted_args = [ format_arg(arg, use_color, fancy) for arg in args] | |
| formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()} | |
| return formatted_args, formatted_kwargs | |
| def make_formatted_msg(msg, *args, **kwargs): | |
| #fetch format vars from kwargs | |
| fancy = kwargs.pop('fancy', enable_fancy) | |
| use_color = kwargs.pop('use_color', enable_use_color) | |
| context_depth = kwargs.pop('context_depth', None) | |
| context = kwargs.pop('context', None) | |
| # print(f"args={args}") | |
| # print(f"kwargs={kwargs}") | |
| formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs) | |
| msgf = stringf(msg, *formatted_args, **formatted_kwargs) | |
| #context = MewContext.resolve_context(context, depth, steps=5) | |
| if fancy: | |
| formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}" | |
| else: | |
| formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}" | |
| return formatted_msg | |
| def stringf(s: str, *args, **kwargs): | |
| # s = allow_curly_braces(s) # Uncomment or modify as needed | |
| if not args and not kwargs: | |
| #print("Both args and kwargs are empty.") | |
| return s | |
| else: | |
| return s.format(*args, **kwargs) | |
| # if not args: | |
| # print("Args is empty.") | |
| # if not kwargs: | |
| # print("Kwargs is empty.") | |
| # print(s) | |
| # print(f"args: {args}") | |
| # print(f"kwargs: {kwargs}") | |
| # return s.format(*args, **kwargs) | |
| # No arguments or keyword arguments | |
| # stringf("Hello, World!") | |
| # # With positional arguments | |
| # stringf("Hello, {}!", "World") | |
| # # With keyword arguments | |
| # stringf("Hello, {name}!", name="Alice") | |
| # # With both args and kwargs empty (only prints "Hello, World!") | |
| # stringf("Hello, World!") | |
| def ensure_directory_exists(directory): | |
| path = Path(directory) | |
| path.mkdir(parents=True, exist_ok=True) | |
| def ensure_file_exists(file_path): | |
| path = Path(file_path) | |
| path.touch(exist_ok=True) | |
| #from ..mew_log.mew_log_helper import MewLogHelper | |
| # Logging Configuration | |
| # Configure Loguru's logger to use a custom format | |
| import sys | |
| import io | |
| # Set UTF-8 encoding for stdout and stderr | |
| #sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) | |
| #sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) | |
| #logger.add(sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", backtrace=True) | |
| logger.configure(handlers=[ | |
| { | |
| "sink": sys.stdout, | |
| "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <cyan>{file}:{line}:{function}</cyan> | |
| "level": "INFO" | |
| }, | |
| { | |
| "sink": sys.stdout, | |
| "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <yellow>{file}:{line}:{function}</yellow> | |
| "level": "WARNING" | |
| }, | |
| { | |
| "sink": sys.stderr, | |
| "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", #| <red>{file}:{line}:{function}</red> | |
| "level": "ERROR" | |
| } | |
| ]) | |
| # Logging Configuration | |
| def init_log(log_config): | |
| global enable_log_to_file, enable_log, log_filename, log_file_path | |
| enable_log_to_file = log_config['log_to_file'] | |
| clear_file_first = log_config['clear_file_first'] | |
| enable_log = log_config['enable_log'] | |
| log_filename = log_config['log_filename'] | |
| log_dir = 'data/log/' | |
| ensure_directory_exists(log_dir) | |
| log_file_path = f"data/log/{log_filename}.txt" | |
| ensure_file_exists(log_file_path) | |
| with open(log_file_path, 'w') as fp: | |
| pass | |
| print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}') | |
| def _update_log(formatted_msg_str): | |
| global lastLog, debug_log, enable_log_to_file | |
| lastLog = formatted_msg_str | |
| debug_log.append(lastLog) | |
| if enable_log_to_file: | |
| log_file(lastLog) | |
| def log_file(msg, file_path = None): | |
| global log_file_path | |
| if not enable_log: | |
| return | |
| if file_path is None: | |
| file_path = log_file_path | |
| curTime = time.time() | |
| elapsedTime = curTime - startTime | |
| with open(log_file_path, 'w') as fp: | |
| fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}") | |
| #logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}") | |
| def log_info(msg, *args,**kwargs): | |
| if not enable_log: | |
| return | |
| formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) | |
| _update_log(formatted_msg_str) | |
| formatted_log_str = lastLog | |
| print(formatted_log_str) | |
| #logger.info(formatted_log_str) | |
| def log_warning(msg, *args,**kwargs): | |
| if not enable_log: | |
| return | |
| formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) | |
| _update_log(formatted_msg_str) | |
| formatted_log_str = lastLog | |
| #print(formatted_log_str) | |
| logger.warning(formatted_log_str) | |
| def log_error(msg, e, *args,**kwargs): | |
| if not enable_log: | |
| return | |
| formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}" | |
| formatted_msg_str += f"{trace_error(e)}\n===>_<===" | |
| _update_log(formatted_msg_str) | |
| formatted_log_str = lastLog | |
| #print(formatted_log_str) | |
| logger.error(formatted_log_str) | |
| import functools | |
| import traceback | |
| def log_function(func): | |
| def wrapper(*args, **kwargs): | |
| def get_func_args(): | |
| arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
| func_args = dict(zip(arg_names, args)) | |
| func_args.update(kwargs) | |
| try: | |
| arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
| func_args = dict(zip(arg_names, args)) | |
| func_args.update(kwargs) | |
| func_args_str = format_table(func_args, headers=None, tablefmt='simple') | |
| #print(get_prev_caller_info(f"({func_args_str} )")) | |
| result = func(*args, **kwargs) | |
| print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}"))) | |
| return result | |
| except Exception as e: | |
| arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
| func_args = dict(zip(arg_names, args)) | |
| func_args.update(kwargs) | |
| logger.error(f"Exception: {str(e)}") | |
| logger.error(f"Format Traceback: {traceback.format_exc()}") | |
| raise # Re-raise the exception after logging | |
| return wrapper | |
| # @log_function | |
| # def dive(a, b): | |
| # return a / b | |
| # dive(2,3) | |
| # dive(12,3) | |
| # dive(14,3) | |
| # dive(21,3) | |
| # dive(21,0) | |
| error_color = 'bright_red' | |
| def format_traceback(tb_entry, depth=0, use_color=enable_use_color): | |
| arrow_str = format_arrow('T', 'double', 3 + 2 * depth) | |
| filename, lineno, funcname, line = tb_entry | |
| file_path = f"file:///{filename}" | |
| file_link = f"{filename}::{lineno}::{funcname}" | |
| trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link | |
| arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1)) | |
| #(" " * (3 + 2 * depth))+ | |
| formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"' | |
| return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str | |
| def trace_error(e, use_color=True, fancy=True): | |
| exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}" | |
| out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str | |
| if isinstance(e, BaseException): | |
| arrow_str = format_arrow('T', 'double', 3) | |
| traceback_obj = e.__traceback__ | |
| if traceback_obj: | |
| file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}" | |
| file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}" | |
| trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link | |
| formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}" | |
| out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str | |
| tb = traceback.extract_tb(traceback_obj) | |
| for i, tb_entry in enumerate(tb): | |
| out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color) | |
| return ansi_color_str(out_str, fg=error_color) if use_color else out_str | |
| # log_info(' ~ | test log_info: {}', 1) | |
| # log_warning(' ~ | test log_warning: {}', 2) | |
| def log_text_input(key: str): | |
| log_info(f'\n ~ | Text Input[{key}]: ') | |
| input_text = input() | |
| log_info(f'\n ~ | Recv input: "{input_text}"') | |
| return input_text | |
| def format_duration(seconds): | |
| hours, remainder = divmod(seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| return f"{int(hours)}h {int(minutes)}m {int(seconds)}s" | |
| def parse_duration(duration_str): | |
| parts = duration_str.split() | |
| hours = int(parts[0][:-1]) if 'h' in parts[0] else 0 | |
| minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0 | |
| seconds = int(parts[2][:-1]) if 's' in parts[2] else 0 | |
| return hours * 3600 + minutes * 60 + seconds | |
| def get_timestamp(): | |
| return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S") | |
| def format_path(directory, depth=0, max_depth=3, use_color=True): | |
| """Formats the directory structure as a string up to a specified depth, with optional color. | |
| Args: | |
| directory (str or Path): The directory path to format. | |
| depth (int): The current depth in the directory structure. | |
| max_depth (int): The maximum depth to format. | |
| use_color (bool): Whether to apply color to the output. | |
| Returns: | |
| str: The formatted directory structure. | |
| """ | |
| if max_depth is not None and depth > max_depth: | |
| return "" | |
| directory_path = Path(directory) | |
| if not directory_path.is_dir(): | |
| return "The specified path is not a directory.\n" | |
| # Build the formatted string, applying color if requested | |
| line_prefix = "│ " * depth + "├── " | |
| directory_name = directory_path.name | |
| if use_color: | |
| line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists | |
| directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed | |
| formatted_str = line_prefix + directory_name + "\n" | |
| # Sort directory contents for consistent order | |
| sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name)) | |
| for item in sorted_items: | |
| if item.is_dir(): | |
| # Recursive call for directories, increasing the depth | |
| formatted_str += format_path(item, depth + 1, max_depth, use_color) | |
| else: | |
| # Include file name with indentation, applying color if requested | |
| file_line = "│ " * (depth + 1) + "├── " + item.name | |
| if use_color: | |
| file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed | |
| formatted_str += file_line + "\n" | |
| return formatted_str | |
| # Spinner Class | |
| class LogSpinner: | |
| def __init__(self, message="", rainbow=True, anim_speed=0.1): | |
| self.message = message | |
| self.speed = anim_speed | |
| self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m'] | |
| self.spinner = ['⠇', '⠋', '⠙', '⠸', '⠼', '⠴', '⠦', '⠧'] | |
| #self.spinner = ['|', '/', '-', '\\'] | |
| self.rainbow = rainbow | |
| def __enter__(self): | |
| self.stop_spinner = False | |
| self.startTime = time.time() | |
| # spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!' | |
| # print(spinner_text) | |
| self.spinner_thread = threading.Thread(target=self.spin) | |
| self.spinner_thread.start() | |
| return self | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| self.stop_spinner = True | |
| self.spinner_thread.join() | |
| curTime = time.time() | |
| spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!' | |
| print(spinner_text) | |
| def spin(self): | |
| while not self.stop_spinner: | |
| #self.update_spinner() | |
| if self.rainbow: self.update_color_spinner() | |
| else: self.update_spinner() | |
| def update_spinner(self): | |
| for char in self.spinner: | |
| curTime = time.time() | |
| spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' | |
| print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True) | |
| time.sleep(self.speed) | |
| def update_color_spinner(self): | |
| for color in self.colors: | |
| for char in self.spinner: | |
| curTime = time.time() | |
| spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' | |
| print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True) | |
| time.sleep(self.speed) | |
| def run_unit_test(): | |
| # Example usage: | |
| class Person: | |
| def __init__(self, name, age, city): | |
| self.name = name | |
| self.age = age | |
| self.city = city | |
| # Create instances of Person | |
| person1 = Person('John', 30, 'New York') | |
| person2 = Person('Alice', 25, 'Los Angeles') | |
| person3 = Person('Bob', 30, 'Hong Kong') | |
| person4 = Person('Charlie', 35, 'Shanghai') | |
| person5 = Person('David', 40, 'Beijing') | |
| # Define other data structures | |
| data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'} | |
| data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
| data_set = {'apple', 'banana', 'orange'} | |
| data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']} | |
| data_list_of_dicts = [person1.__dict__, person2.__dict__] | |
| data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__} | |
| list_of_objs = [person3, person4, person5] | |
| data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']] | |
| dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5} | |
| dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]} | |
| # Combine all data into a complex structure | |
| complex_data = { | |
| 'data_dict': data_dict, | |
| 'data_list': data_list, | |
| 'data_set': data_set, | |
| 'data_dict_of_lists': data_dict_of_lists, | |
| 'data_list_of_dicts': data_list_of_dicts, | |
| 'data_dict_of_dicts': data_dict_of_dicts, | |
| 'list_of_objs': list_of_objs, | |
| 'data_list_of_lists': data_list_of_lists, | |
| 'dict_of_objs': dict_of_objs, | |
| 'dict_of_list_of_objs': dict_of_list_of_objs | |
| } | |
| # Log each unique data structure | |
| log_info("Data Dictionary: {}", data_dict) | |
| log_info("Data List: {}", data_list) | |
| log_info("Data Set: {}", data_set) | |
| log_info("Data Dictionary of Lists: {}", data_dict_of_lists) | |
| log_info("Data List of Dicts: {}", data_list_of_dicts) | |
| log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts) | |
| log_info("List of Objects: {}", list_of_objs) | |
| log_info("Data List of Lists: {}", data_list_of_lists) | |
| log_info("Dictionary of Objects: {}", dict_of_objs) | |
| log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs) | |
| #run_unit_test() | |