| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import os |
| import typing |
| import traceback |
| import logging |
| import inspect |
| from logging.handlers import TimedRotatingFileHandler |
| from threading import RLock |
|
|
| from api.utils import file_utils |
|
|
|
|
| class LoggerFactory(object): |
| TYPE = "FILE" |
| LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s" |
| logging.basicConfig(format=LOG_FORMAT) |
| LEVEL = logging.DEBUG |
| logger_dict = {} |
| global_handler_dict = {} |
|
|
| LOG_DIR = None |
| PARENT_LOG_DIR = None |
| log_share = True |
|
|
| append_to_parent_log = None |
|
|
| lock = RLock() |
| |
| |
| |
| |
| |
| |
| |
| |
| levels = (10, 20, 30, 40) |
| schedule_logger_dict = {} |
|
|
| @staticmethod |
| def set_directory(directory=None, parent_log_dir=None, |
| append_to_parent_log=None, force=False): |
| if parent_log_dir: |
| LoggerFactory.PARENT_LOG_DIR = parent_log_dir |
| if append_to_parent_log: |
| LoggerFactory.append_to_parent_log = append_to_parent_log |
| with LoggerFactory.lock: |
| if not directory: |
| directory = file_utils.get_project_base_directory("logs") |
| if not LoggerFactory.LOG_DIR or force: |
| LoggerFactory.LOG_DIR = directory |
| if LoggerFactory.log_share: |
| oldmask = os.umask(000) |
| os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) |
| os.umask(oldmask) |
| else: |
| os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) |
| for loggerName, ghandler in LoggerFactory.global_handler_dict.items(): |
| for className, (logger, |
| handler) in LoggerFactory.logger_dict.items(): |
| logger.removeHandler(ghandler) |
| ghandler.close() |
| LoggerFactory.global_handler_dict = {} |
| for className, (logger, |
| handler) in LoggerFactory.logger_dict.items(): |
| logger.removeHandler(handler) |
| _handler = None |
| if handler: |
| handler.close() |
| if className != "default": |
| _handler = LoggerFactory.get_handler(className) |
| logger.addHandler(_handler) |
| LoggerFactory.assemble_global_handler(logger) |
| LoggerFactory.logger_dict[className] = logger, _handler |
|
|
| @staticmethod |
| def new_logger(name): |
| logger = logging.getLogger(name) |
| logger.propagate = False |
| logger.setLevel(LoggerFactory.LEVEL) |
| return logger |
|
|
| @staticmethod |
| def get_logger(class_name=None): |
| with LoggerFactory.lock: |
| if class_name in LoggerFactory.logger_dict.keys(): |
| logger, handler = LoggerFactory.logger_dict[class_name] |
| if not logger: |
| logger, handler = LoggerFactory.init_logger(class_name) |
| else: |
| logger, handler = LoggerFactory.init_logger(class_name) |
| return logger |
|
|
| @staticmethod |
| def get_global_handler(logger_name, level=None, log_dir=None): |
| if not LoggerFactory.LOG_DIR: |
| return logging.StreamHandler() |
| if log_dir: |
| logger_name_key = logger_name + "_" + log_dir |
| else: |
| logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR |
| |
| if logger_name_key not in LoggerFactory.global_handler_dict: |
| with LoggerFactory.lock: |
| if logger_name_key not in LoggerFactory.global_handler_dict: |
| handler = LoggerFactory.get_handler( |
| logger_name, level, log_dir) |
| LoggerFactory.global_handler_dict[logger_name_key] = handler |
| return LoggerFactory.global_handler_dict[logger_name_key] |
|
|
| @staticmethod |
| def get_handler(class_name, level=None, log_dir=None, |
| log_type=None, job_id=None): |
| if not log_type: |
| if not LoggerFactory.LOG_DIR or not class_name: |
| return logging.StreamHandler() |
| |
|
|
| if not log_dir: |
| log_file = os.path.join( |
| LoggerFactory.LOG_DIR, |
| "{}.log".format(class_name)) |
| else: |
| log_file = os.path.join(log_dir, "{}.log".format(class_name)) |
| else: |
| log_file = os.path.join(log_dir, "rag_flow_{}.log".format( |
| log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type)) |
|
|
| os.makedirs(os.path.dirname(log_file), exist_ok=True) |
| if LoggerFactory.log_share: |
| handler = ROpenHandler(log_file, |
| when='D', |
| interval=1, |
| backupCount=14, |
| delay=True) |
| else: |
| handler = TimedRotatingFileHandler(log_file, |
| when='D', |
| interval=1, |
| backupCount=14, |
| delay=True) |
| if level: |
| handler.level = level |
|
|
| return handler |
|
|
| @staticmethod |
| def init_logger(class_name): |
| with LoggerFactory.lock: |
| logger = LoggerFactory.new_logger(class_name) |
| handler = None |
| if class_name: |
| handler = LoggerFactory.get_handler(class_name) |
| logger.addHandler(handler) |
| LoggerFactory.logger_dict[class_name] = logger, handler |
|
|
| else: |
| LoggerFactory.logger_dict["default"] = logger, handler |
|
|
| LoggerFactory.assemble_global_handler(logger) |
| return logger, handler |
|
|
| @staticmethod |
| def assemble_global_handler(logger): |
| if LoggerFactory.LOG_DIR: |
| for level in LoggerFactory.levels: |
| if level >= LoggerFactory.LEVEL: |
| level_logger_name = logging._levelToName[level] |
| logger.addHandler( |
| LoggerFactory.get_global_handler( |
| level_logger_name, level)) |
| if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR: |
| for level in LoggerFactory.levels: |
| if level >= LoggerFactory.LEVEL: |
| level_logger_name = logging._levelToName[level] |
| logger.addHandler( |
| LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR)) |
|
|
|
|
| def setDirectory(directory=None): |
| LoggerFactory.set_directory(directory) |
|
|
|
|
| def setLevel(level): |
| LoggerFactory.LEVEL = level |
|
|
|
|
| def getLogger(className=None, useLevelFile=False): |
| if className is None: |
| frame = inspect.stack()[1] |
| module = inspect.getmodule(frame[0]) |
| className = 'stat' |
| return LoggerFactory.get_logger(className) |
|
|
|
|
| def exception_to_trace_string(ex): |
| return "".join(traceback.TracebackException.from_exception(ex).format()) |
|
|
|
|
| class ROpenHandler(TimedRotatingFileHandler): |
| def _open(self): |
| prevumask = os.umask(000) |
| rtv = TimedRotatingFileHandler._open(self) |
| os.umask(prevumask) |
| return rtv |
|
|
|
|
| def sql_logger(job_id='', log_type='sql'): |
| key = job_id + log_type |
| if key in LoggerFactory.schedule_logger_dict.keys(): |
| return LoggerFactory.schedule_logger_dict[key] |
| return get_job_logger(job_id=job_id, log_type=log_type) |
|
|
|
|
| def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None): |
| prefix, suffix = base_msg(job, task, role, party_id, detail) |
| return f"{prefix}{msg} ready{suffix}" |
|
|
|
|
| def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None): |
| prefix, suffix = base_msg(job, task, role, party_id, detail) |
| return f"{prefix}start to {msg}{suffix}" |
|
|
|
|
| def successful_log(msg, job=None, task=None, role=None, |
| party_id=None, detail=None): |
| prefix, suffix = base_msg(job, task, role, party_id, detail) |
| return f"{prefix}{msg} successfully{suffix}" |
|
|
|
|
| def warning_log(msg, job=None, task=None, role=None, |
| party_id=None, detail=None): |
| prefix, suffix = base_msg(job, task, role, party_id, detail) |
| return f"{prefix}{msg} is not effective{suffix}" |
|
|
|
|
| def failed_log(msg, job=None, task=None, role=None, |
| party_id=None, detail=None): |
| prefix, suffix = base_msg(job, task, role, party_id, detail) |
| return f"{prefix}failed to {msg}{suffix}" |
|
|
|
|
| def base_msg(job=None, task=None, role: str = None, |
| party_id: typing.Union[str, int] = None, detail=None): |
| if detail: |
| detail_msg = f" detail: \n{detail}" |
| else: |
| detail_msg = "" |
| if task is not None: |
| return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}" |
| elif job is not None: |
| return "", f" on {job.f_role} {job.f_party_id}{detail_msg}" |
| elif role and party_id: |
| return "", f" on {role} {party_id}{detail_msg}" |
| else: |
| return "", f"{detail_msg}" |
|
|
|
|
| def exception_to_trace_string(ex): |
| return "".join(traceback.TracebackException.from_exception(ex).format()) |
|
|
|
|
| def get_logger_base_dir(): |
| job_log_dir = file_utils.get_rag_flow_directory('logs') |
| return job_log_dir |
|
|
|
|
| def get_job_logger(job_id, log_type): |
| rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow') |
| job_log_dir = file_utils.get_rag_flow_directory('logs', job_id) |
| if not job_id: |
| log_dirs = [rag_flow_log_dir] |
| else: |
| if log_type == 'audit': |
| log_dirs = [job_log_dir, rag_flow_log_dir] |
| else: |
| log_dirs = [job_log_dir] |
| if LoggerFactory.log_share: |
| oldmask = os.umask(000) |
| os.makedirs(job_log_dir, exist_ok=True) |
| os.makedirs(rag_flow_log_dir, exist_ok=True) |
| os.umask(oldmask) |
| else: |
| os.makedirs(job_log_dir, exist_ok=True) |
| os.makedirs(rag_flow_log_dir, exist_ok=True) |
| logger = LoggerFactory.new_logger(f"{job_id}_{log_type}") |
| for job_log_dir in log_dirs: |
| handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL, |
| log_dir=job_log_dir, log_type=log_type, job_id=job_id) |
| error_handler = LoggerFactory.get_handler( |
| class_name=None, |
| level=logging.ERROR, |
| log_dir=job_log_dir, |
| log_type=log_type, |
| job_id=job_id) |
| logger.addHandler(handler) |
| logger.addHandler(error_handler) |
| with LoggerFactory.lock: |
| LoggerFactory.schedule_logger_dict[job_id + log_type] = logger |
| return logger |
|
|