import logging import os import contextvars from logging.handlers import RotatingFileHandler # 日志配置常量 BASE_LOG_DIR = 'logs' MAX_BYTES = 10 * 1024 * 1024 # 10MB BACKUP_COUNT = 5 # ContextVar for task_id task_id_var = contextvars.ContextVar("task_id", default=None) class TaskIdFormatter(logging.Formatter): """ Custom Formatter that injects task_id if available, or 'N/A' if not. Does NOT rely on Filter injection, making it robust for third-party loggers. """ def format(self, record): # 1. Try to get task_id from ContextVar task_id = task_id_var.get() # 2. If not in ContextVar, check if it's already in record (e.g. passed via extra) if not hasattr(record, 'task_id'): record.task_id = task_id if task_id else 'N/A' elif record.task_id is None: # Ensure it's not None if attribute exists record.task_id = 'N/A' return super().format(record) def setup_logging(service_name: str, level=logging.INFO): """ 集中配置日志系统,每个服务使用单独的日志文件。 Args: service_name (str): 服务的名称(例如 'app' 或 'worker'),用于命名 logger 和日志文件。 level (int): 日志级别。 """ # 确保日志目录存在 os.makedirs(BASE_LOG_DIR, exist_ok=True) # 1. 确定日志文件路径 log_file_name = f'{service_name}.log' log_file_path = os.path.join(BASE_LOG_DIR, log_file_name) # 2. 获取 logger 实例 logger = logging.getLogger(service_name) logger.setLevel(level) # 3. 使用自定义格式化器 # 注意:这里我们移除了 Filter,改用 Formatter 处理 formatter = TaskIdFormatter( '%(asctime)s - [%(task_id)s] - %(name)s - %(levelname)s - %(message)s' ) # 4. 控制台处理器 (StreamHandler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 5. 文件处理器 (RotatingFileHandler) file_handler = RotatingFileHandler( log_file_path, maxBytes=MAX_BYTES, backupCount=BACKUP_COUNT, encoding='utf-8' ) file_handler.setFormatter(formatter) # 6. 配置 root logger 或 propagation # 获取 log_file_path 的绝对路径进行比较 abs_log_file_path = os.path.abspath(log_file_path) # Helper function to add handlers if not present def attach_handlers_to(target_logger_name): target = logging.getLogger(target_logger_name) target.setLevel(level) # 检查是否已经添加了对应的 FileHandler has_file_handler = any( isinstance(h, RotatingFileHandler) and os.path.abspath(h.baseFilename) == abs_log_file_path for h in target.handlers ) if not has_file_handler: target.addHandler(file_handler) # 避免控制台重复 if not any(isinstance(h, logging.StreamHandler) for h in target.handlers): target.addHandler(console_handler) # 针对 worker 进程,配置 services 和 ls_ocr logger attach_handlers_to('services') # 虽然已经移除 OCR,但保留机制以防其他 module 使用 services logger # 避免重复添加 handlers 到主 logger attach_handlers_to(service_name) return logger # 辅助函数,用于简化调用 def get_app_logger(): return setup_logging(service_name='app') def get_process_worker_logger(): return setup_logging(service_name='process_worker') def get_upload_worker_logger(): return setup_logging(service_name='upload_worker') class RequestLogger: """请求日志记录器(用于Flask)""" @staticmethod def log_request(request, response, duration: float = None): """ 记录HTTP请求日志 Args: request: Flask request对象 response: Flask response对象 duration: 请求处理时间(秒) """ logger = get_app_logger() # 构建日志消息 msg_parts = [ f"{request.method} {request.path}", f"status={response.status_code}", ] if duration is not None: msg_parts.append(f"duration={duration:.3f}s") # 添加查询参数 if request.query_string: msg_parts.append(f"query={request.query_string.decode('utf-8')}") # 添加客户端IP client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) msg_parts.append(f"ip={client_ip}") msg = " | ".join(msg_parts) # 根据状态码选择日志级别 if response.status_code >= 500: logger.error(msg) elif response.status_code >= 400: logger.warning(msg) else: logger.info(msg) @staticmethod def log_error(request, error: Exception): """ 记录错误日志 Args: request: Flask request对象 error: 异常对象 """ logger = get_app_logger() logger.exception( f"请求错误 | {request.method} {request.path} | " f"error={type(error).__name__}: {str(error)}" )